src/Cms/CoreBundle/EventSubscriber/OneRoster/OneRosterStashSubscriber.php line 133

Open in your IDE?
  1. <?php
  2. namespace Cms\CoreBundle\EventSubscriber\OneRoster;
  3. use Cms\CoreBundle\Entity\AbstractOneRosterEntity;
  4. use Cms\CoreBundle\Entity\OneRoster\OneRosterUser;
  5. use Cms\CoreBundle\Entity\OneRosterJob;
  6. use Cms\CoreBundle\Entity\OneRosterSync;
  7. use Cms\CoreBundle\Events\OneRosterEvents;
  8. use Cms\CoreBundle\Model\OneRosterStateBitwise;
  9. use DateTimeInterface;
  10. use Platform\QueueBundle\Event\AsyncEvent;
  11. use Platform\QueueBundle\Model\AsyncMessage;
  12. /**
  13.  * Class OneRosterStashSubscriber
  14.  * @package Cms\CoreBundle\EventSubscriber\OneRoster
  15.  */
  16. final class OneRosterStashSubscriber extends AbstractOneRosterSubscriber
  17. {
  18.     /**
  19.      * {@inheritDoc}
  20.      */
  21.     public static function getSubscribedEvents(): array
  22.     {
  23.         return [
  24.             OneRosterEvents::EVENT__STASH => [
  25.                 ['discardableDisable'self::PRIORITY__FIRST],
  26.                 ['phaseStart'self::PRIORITY__FIRST],
  27.                 ['stash'0],
  28.                 ['phaseFin'self::PRIORITY__LAST],
  29.                 ['phaseTrigger'self::PRIORITY__LAST],
  30.                 ['discardableEnable'self::PRIORITY__LAST],
  31.             ],
  32.             OneRosterEvents::EVENT__STASH__TYPE => [
  33.                 ['discardableDisable'self::PRIORITY__FIRST],
  34.                 ['stashType'0],
  35.                 ['phaseFin'self::PRIORITY__LAST],
  36.                 ['phaseTrigger'self::PRIORITY__LAST],
  37.                 ['discardableEnable'self::PRIORITY__LAST],
  38.             ],
  39.             OneRosterEvents::EVENT__STASH__OBJECT => [
  40.                 ['discardableDisable'self::PRIORITY__FIRST],
  41.                 ['stashObject'0],
  42.                 ['phaseFin'self::PRIORITY__LAST],
  43.                 ['phaseTrigger'self::PRIORITY__LAST],
  44.                 ['discardableEnable'self::PRIORITY__LAST],
  45.             ],
  46.         ];
  47.     }
  48.     /**
  49.      * {@inheritDoc}
  50.      */
  51.     public function phase(): int
  52.     {
  53.         return OneRosterJob::PHASES__STASH;
  54.     }
  55.     /**
  56.      * {@inheritDoc}
  57.      */
  58.     public function next(): ?int
  59.     {
  60.         return OneRosterJob::PHASES__FIX;
  61.     }
  62.     /**
  63.      * Handles the basic event of stashing a particular sync that has been triggered.
  64.      *
  65.      * @param AsyncEvent $event
  66.      */
  67.     public function stash(AsyncEvent $event): void
  68.     {
  69.         // data should be an array with an id of a sync
  70.         $job $this->loadJob($event);
  71.         // determine the types we need to pull
  72.         $types array_fill_keys(array_keys(AbstractOneRosterEntity::ONEROSTER_TYPES), false);
  73.         foreach ($job->getSync()->getStrategiesAsArray() as $strategy) {
  74.             $stypes array_keys(OneRosterSync::ONEROSTER_TYPES_MAPPINGS[$strategy]);
  75.             foreach ($stypes as $stype) {
  76.                 $types[$stype] = true;
  77.             }
  78.         }
  79.         $types array_keys(array_filter($types));
  80.         // setup calls to process each type
  81.         /** @var array|AsyncMessage[] $messages */
  82.         $messages = [];
  83.         foreach ($types as $type) {
  84.             $messages[] = new AsyncMessage(
  85.                 $job,
  86.                 OneRosterEvents::EVENT__STASH__TYPE,
  87.                 [
  88.                     'job' => $job->getId(),
  89.                     'type' => $type,
  90.                     'page' => 0,
  91.                 ],
  92.                 self::MQ_PRIORITY,
  93.             );
  94.         }
  95.         // tracking
  96.         $this->oneroster->orchestratePhaseChange($jobcount($messages));
  97.         // queue them up
  98.         try {
  99.             $this->async->queue(
  100.                 null,
  101.                 $messages
  102.             );
  103.         } catch (\Exception $e) {
  104.             $this->oneroster->orchestratePhaseRevert($jobcount($messages));
  105.             throw $e;
  106.         }
  107.         // DEBUGGING
  108.         foreach ($messages as $message) {
  109.             $event->getOutput()->writeln(sprintf(
  110.                 'STASH__TYPE of "%s" triggered for sync #%s',
  111.                 $message->getPayload()['type'],
  112.                 $job->getIdentifier()
  113.             ));
  114.         }
  115.     }
  116.     /**
  117.      * Handles a stashing event for a particular type of data for a sync.
  118.      *
  119.      * @param AsyncEvent $event
  120.      */
  121.     public function stashType(AsyncEvent $event): void
  122.     {
  123.         // data should be an array with an id of a sync
  124.         $job $this->loadJob($event);
  125.         // should also tell us what type we are pulling
  126.         // this type is a string oneroster type, not a fqcn
  127.         $type $event->getParam('type');
  128.         $page $event->getParam('page');
  129.         // DEBUGGING
  130.         $event->getOutput()->writeln(sprintf(
  131.             'Obtaining page %s of data',
  132.             $page
  133.         ));
  134.         // will be working with the oneroster api
  135.         $api $this->oneroster->api($job);
  136.         // make api calls to get the data of these types and to make messages out of them
  137.         $messages = [];
  138.         // get the objects
  139.         $objects $api->getObjects($type$page);
  140.         // check pagination here so other workers can go ahead and be chewing on stuff while we are still processing
  141.         // if we have hit the limit, we need to page
  142.         if ($api->paginate($objects)) {
  143.             // increment the page
  144.             $page++;
  145.             // DEBUGGING
  146.             $event->getOutput()->writeln(sprintf(
  147.                 'Need to page, adding message for page %s of data',
  148.                 $page
  149.             ));
  150.             // tracking
  151.             $this->oneroster->orchestratePhaseChange($job1);
  152.             // queue the next page
  153.             try {
  154.                 $this->async->send(
  155.                     null,
  156.                     new AsyncMessage(
  157.                         $job,
  158.                         OneRosterEvents::EVENT__STASH__TYPE,
  159.                         [
  160.                             'job' => $job->getId(),
  161.                             'type' => $type,
  162.                             'page' => $page,
  163.                         ],
  164.                         self::MQ_PRIORITY,
  165.                     )
  166.                 );
  167.             } catch (\Exception $e) {
  168.                 $this->oneroster->orchestratePhaseRevert($job1);
  169.                 throw $e;
  170.             }
  171.         }
  172.         // determine what roles we are interested in
  173.         $roles = [];
  174.         if ($job->getSync()->hasStrategy(OneRosterSync::STRATEGIES__SSO)) {
  175.             $roles array_merge($roles, [
  176.                 AbstractOneRosterEntity::ENUMS__ROLE_TYPE__ADMINISTRATOR,
  177.                 AbstractOneRosterEntity::ENUMS__ROLE_TYPE__AIDE,
  178.                 AbstractOneRosterEntity::ENUMS__ROLE_TYPE__PROCTOR,
  179.                 AbstractOneRosterEntity::ENUMS__ROLE_TYPE__TEACHER,
  180.                 AbstractOneRosterEntity::ENUMS__ROLE_TYPE__STAFF,
  181.             ]);
  182.         }
  183.         if ($job->getSync()->hasStrategy(OneRosterSync::STRATEGIES__NOTIFICATIONS__STAFF)) {
  184.             $roles array_merge($roles, [
  185.                 AbstractOneRosterEntity::ENUMS__ROLE_TYPE__ADMINISTRATOR,
  186.                 AbstractOneRosterEntity::ENUMS__ROLE_TYPE__AIDE,
  187.                 AbstractOneRosterEntity::ENUMS__ROLE_TYPE__PROCTOR,
  188.                 AbstractOneRosterEntity::ENUMS__ROLE_TYPE__TEACHER,
  189.                 AbstractOneRosterEntity::ENUMS__ROLE_TYPE__STAFF,
  190.             ]);
  191.         }
  192.         if ($job->getSync()->hasStrategy(OneRosterSync::STRATEGIES__NOTIFICATIONS__FAMILY)) {
  193.             $roles array_merge($roles, [
  194.                 AbstractOneRosterEntity::ENUMS__ROLE_TYPE__GUARDIAN,
  195.                 AbstractOneRosterEntity::ENUMS__ROLE_TYPE__PARENT,
  196.                 AbstractOneRosterEntity::ENUMS__ROLE_TYPE__RELATIVE,
  197.                 AbstractOneRosterEntity::ENUMS__ROLE_TYPE__STUDENT,
  198.             ]);
  199.         }
  200.         if ($job->getSync()->hasStrategy(OneRosterSync::STRATEGIES__NOTIFICATIONS__STUDENTS)) {
  201.             $roles array_merge($roles, [
  202.                 AbstractOneRosterEntity::ENUMS__ROLE_TYPE__STUDENT,
  203.             ]);
  204.         }
  205.         if ($job->getSync()->hasStrategy(OneRosterSync::STRATEGIES__NOTIFICATIONS__COMMUNITY)) {
  206.             $roles array_merge($roles, [
  207.                 AbstractOneRosterEntity::ENUMS__ROLE_TYPE__COMMUNITY,
  208.             ]);
  209.         }
  210.         $roles array_unique($roles);
  211.         // loop over the object to create the messages to process each fully
  212.         foreach ($objects as $object) {
  213.             // attempt to pick off critical errors with incoming data
  214.             try {
  215.                 // check for no role on the data
  216.                 if ($type === OneRosterUser::ONEROSTER_TYPE && ! array_key_exists('role'$object)) {
  217.                     throw new \Exception('Role not given for user.');
  218.                 }
  219.             } catch (\Exception $e) {
  220.                 $this->oneroster->logIssue(
  221.                     $job,
  222.                     $this->phase(),
  223.                     OneRosterEvents::EVENT__STASH__TYPE,
  224.                     OneRosterUser::ONEROSTER_TYPE,
  225.                     $object['sourcedId'],
  226.                     $e
  227.                 );
  228.                 continue;
  229.             }
  230.             // handle special use cases that we want to skip
  231.             // TODO: this needs done better; cases are confusing/complex and makes the file messy...
  232.             $skip false;
  233.             switch (true) {
  234.                 // TODO: do we need student data when processing the other types?
  235.                 // if we are dealing with users, skip if:
  236.                 // no role in data set
  237.                 // role in data set but is empty
  238.                 // role is there but not in the array of roles that we care about
  239.                 case $type == OneRosterUser::ONEROSTER_TYPE && ( ! array_key_exists('role'$object) || empty($object['role']) || ! in_array($object['role'], $roles)):
  240.                     $skip true;
  241.                     break;
  242.                 // handle powerschool emergency contacts hack...
  243.                 // if the sync has the flag set, check and see if the ps id is given
  244.                 // if it is, if the ps id has a space in it, skip it (likely it is an emergency contact record)
  245.                 case $job->getSync()->hasFlag(OneRosterSync::FLAGS__POWERSCHOOL__SKIP_EMERGENCY_CONTACTS) && $type == OneRosterUser::ONEROSTER_TYPE && (array_key_exists('userIds'$object)):
  246.                     foreach ($object['userIds'] as $userId) {
  247.                         if (array_key_exists('type'$userId) && $userId['type'] === 'ps_id') {
  248.                             // TODO: multibyte?
  249.                             if (array_key_exists('identifier'$userId) && preg_match('/^[^ ]+ of /'$userId['identifier']) === 1) {
  250.                                 $skip true;
  251.                                 break;
  252.                             }
  253.                         }
  254.                     }
  255.                     break;
  256.             }
  257.             // if the data type is useful to us, make mq message for it
  258.             if ( ! $skip) {
  259.                 $messages[] = new AsyncMessage(
  260.                     $job,
  261.                     OneRosterEvents::EVENT__STASH__OBJECT,
  262.                     [
  263.                         'job' => $job->getId(),
  264.                         'type' => $type,
  265.                         'data' => $object,
  266.                     ],
  267.                     self::MQ_PRIORITY,
  268.                 );
  269.             }
  270.         }
  271.         // do only if we have anything to add to the system
  272.         if ($messages) {
  273.             // tracking
  274.             $this->oneroster->orchestratePhaseChange($jobcount($messages));
  275.             // queue them up
  276.             try {
  277.                 $this->async->queue(
  278.                     null,
  279.                     $messages
  280.                 );
  281.             } catch (\Exception $e) {
  282.                 $this->oneroster->orchestratePhaseRevert($jobcount($messages));
  283.                 throw $e;
  284.             }
  285.         }
  286.         // DEBUGGING
  287.         $event->getOutput()->writeln(sprintf(
  288.             'STASH__OBJECT for %s "%s" object(s) triggered for sync #%s',
  289.             count($messages),
  290.             $type,
  291.             $job->getIdentifier()
  292.         ));
  293.     }
  294.     /**
  295.      * Handles an event for stashing a particular object of a certain type for a sync.
  296.      *
  297.      * @param AsyncEvent $event
  298.      */
  299.     public function stashObject(AsyncEvent $event): void
  300.     {
  301.         // data should be an array with an id of a sync
  302.         $job $this->loadJob($event);
  303.         // should also tell us what type we are pulling
  304.         // oneroster type, not fqcn
  305.         $type $event->getParam('type');
  306.         // determine the class based on the type
  307.         $class AbstractOneRosterEntity::ONEROSTER_TYPES[$type];
  308.         // also grab the raw data from the api
  309.         $data $event->getParam('data');
  310.         // see if we already have an object of this type of this id
  311.         /** @var AbstractOneRosterEntity $entity */
  312.         $entity $this->em->getRepository($class)->findOneBy([
  313.             'sourcedId' => $data['sourcedId'],
  314.         ]);
  315.         if (empty($entity)) {
  316.             // no entity, so make a new one
  317.             $entity = new $class();
  318.             // DEBUGGING
  319.             $event->getOutput()->writeln(sprintf(
  320.                 'Entity "%s" created for #%s',
  321.                 $class,
  322.                 $data['sourcedId']
  323.             ));
  324.             // since we have made a new entity, it means that we are inherently dirty
  325.             $entity->getState()->addFlag(OneRosterStateBitwise::DIRTY_VIA_NEW);
  326.             // DEBUGGING
  327.             $event->getOutput()->writeln(
  328.                 'DIRTY_VIA_NEW flagged'
  329.             );
  330.             // no previous data, need to clear it all
  331.             $status null;
  332.             $dateLastModified null;
  333.             $checksum null;
  334.         } else {
  335.             // make sure the object is not marked as clean
  336.             $entity->getState()->removeFlag(OneRosterStateBitwise::CLEAN);
  337.             // save off the previous data that we need to check against for dirty checks
  338.             $status $entity->getStatus();
  339.             $dateLastModified $entity->getDateLastModified();
  340.             $checksum $entity->getChecksum();
  341.             // DEBUGGING
  342.             $event->getOutput()->writeln(sprintf(
  343.                 'Entity "%s" found for #%s',
  344.                 $class,
  345.                 $entity->getSourcedId()
  346.             ));
  347.         }
  348.         // go ahead and attach the job and sync to this entity
  349.         $entity
  350.             ->setSync($this->em->getReference(OneRosterSync::class, $job->getSync()->getId()))
  351.             ->setJob($this->em->getReference(OneRosterJob::class, $job->getId()));
  352.         // merge in the data from the api
  353.         $entity->merge($data);
  354.         // compare status dirty check
  355.         if ($status != $entity->getStatus()) {
  356.             // status changed, object is dirty
  357.             $entity->getState()->addFlag(OneRosterStateBitwise::DIRTY_VIA_STATUS);
  358.             // DEBUGGING
  359.             $event->getOutput()->writeln(sprintf(
  360.                 'DIRTY_VIA_STATUS flagged (%s != %s)',
  361.                 (empty($status)) ? 'null' $status,
  362.                 $entity->getStatus()
  363.             ));
  364.         }
  365.         // compare date last modified dirty check
  366.         if ($dateLastModified != $entity->getDateLastModified()) {
  367.             // date last modified changed, object is dirty
  368.             $entity->getState()->addFlag(OneRosterStateBitwise::DIRTY_VIA_DATE_MODIFIED);
  369.             // DEBUGGING
  370.             $event->getOutput()->writeln(sprintf(
  371.                 'DIRTY_VIA_DATE_MODIFIED flagged (%s != %s)',
  372.                 (empty($dateLastModified)) ? 'null' $dateLastModified->format(DateTimeInterface::RFC3339_EXTENDED),
  373.                 $entity->getDateLastModified()->format(DateTimeInterface::RFC3339_EXTENDED)
  374.             ));
  375.         }
  376.         // compare crc dirty check
  377.         // calc new crc first
  378.         $entity->setChecksum(crc32(json_encode($entity->data())));
  379.         if ($checksum != $entity->getChecksum()) {
  380.             // checksum changed, object is dirty
  381.             $entity->getState()->addFlag(OneRosterStateBitwise::DIRTY_VIA_CHECKSUM);
  382.             // DEBUGGING
  383.             $event->getOutput()->writeln(sprintf(
  384.                 'DIRTY_VIA_CHECKSUM flagged (%s != %s)',
  385.                 (empty($checksum)) ? 'null' $checksum,
  386.                 $entity->getChecksum()
  387.             ));
  388.         }
  389.         // TODO: check object conditionals for dirty data
  390.         // see if we are dirty
  391.         if ($entity->getState()->isDirty()) {
  392.             // DEBUGGING
  393.             $event->getOutput()->writeln(sprintf(
  394.                 'Record is DIRTY (%s)',
  395.                 implode(', '$entity->getState()->getFlagNames(nulltrue))
  396.             ));
  397.         } else {
  398.             // all checks are pointing to the object being clean, mark it as such
  399.             $entity->getState()->addFlag(OneRosterStateBitwise::CLEAN);
  400.             // DEBUGGING
  401.             $event->getOutput()->writeln(
  402.                 'Record is CLEAN'
  403.             );
  404.         }
  405.         // save it
  406.         $this->em->save($entity);
  407.     }
  408. }