src/Cms/CoreBundle/EventSubscriber/OneRoster/OneRosterFixSubscriber.php line 198

Open in your IDE?
  1. <?php
  2. namespace Cms\CoreBundle\EventSubscriber\OneRoster;
  3. use Cms\CoreBundle\Doctrine\Hydrators\SingleColumnHydrator;
  4. use Cms\CoreBundle\Entity\AbstractOneRosterEntity;
  5. use Cms\CoreBundle\Entity\OneRosterJob;
  6. use Cms\CoreBundle\Events\OneRosterEvents;
  7. use Cms\CoreBundle\Events\OneRosterFixEvent;
  8. use Platform\QueueBundle\Event\AsyncEvent;
  9. use Platform\QueueBundle\Model\AsyncMessage;
  10. /**
  11.  * Class OneRosterFixSubscriber
  12.  * @package Cms\CoreBundle\EventSubscriber\OneRoster
  13.  */
  14. final class OneRosterFixSubscriber extends AbstractOneRosterSubscriber
  15. {
  16.     /**
  17.      * {@inheritDoc}
  18.      */
  19.     public static function getSubscribedEvents(): array
  20.     {
  21.         return [
  22.             OneRosterEvents::EVENT__FIX => [
  23.                 ['discardableDisable'self::PRIORITY__FIRST],
  24.                 ['phaseStart'self::PRIORITY__FIRST],
  25.                 ['fix'0],
  26.                 ['phaseFin'self::PRIORITY__LAST],
  27.                 ['phaseTrigger'self::PRIORITY__LAST],
  28.                 ['discardableEnable'self::PRIORITY__LAST],
  29.             ],
  30.             OneRosterEvents::EVENT__FIX__TYPE => [
  31.                 ['discardableDisable'self::PRIORITY__FIRST],
  32.                 ['fixType'0],
  33.                 ['phaseFin'self::PRIORITY__LAST],
  34.                 ['phaseTrigger'self::PRIORITY__LAST],
  35.                 ['discardableEnable'self::PRIORITY__LAST],
  36.             ],
  37.             OneRosterEvents::EVENT__FIX__OBJECT => [
  38.                 ['discardableDisable'self::PRIORITY__FIRST],
  39.                 ['fixObject'0],
  40.                 ['phaseFin'self::PRIORITY__LAST],
  41.                 ['phaseTrigger'self::PRIORITY__LAST],
  42.                 ['discardableEnable'self::PRIORITY__LAST],
  43.             ],
  44.         ];
  45.     }
  46.     /**
  47.      * {@inheritDoc}
  48.      */
  49.     public function phase(): int
  50.     {
  51.         return OneRosterJob::PHASES__FIX;
  52.     }
  53.     /**
  54.      * {@inheritDoc}
  55.      */
  56.     public function next(): ?int
  57.     {
  58.         return OneRosterJob::PHASES__PREPARE;
  59.     }
  60.     /**
  61.      * @param AsyncEvent $event
  62.      */
  63.     public function fix(AsyncEvent $event): void
  64.     {
  65.         // data should be an array with an id of a sync
  66.         $job $this->loadJob($event);
  67.         // holder for data
  68.         /** @var array|AsyncMessage[] $messages */
  69.         $messages = [];
  70.         // if we are not from our data tool, we might have some data fixing to do...
  71.         if ( ! $job->getSync()->isOptimized()) {
  72.             // setup calls to fix each type
  73.             foreach (array_keys(AbstractOneRosterEntity::ONEROSTER_TYPES) as $type) {
  74.                 $messages[] = new AsyncMessage(
  75.                     $job,
  76.                     OneRosterEvents::EVENT__FIX__TYPE,
  77.                     [
  78.                         'job' => $job->getId(),
  79.                         'type' => $type,
  80.                     ],
  81.                     self::MQ_PRIORITY,
  82.                 );
  83.             }
  84.         }
  85.         // update tracking props on the sync
  86.         $this->oneroster->orchestratePhaseChange($jobcount($messages));
  87.         // queue them up
  88.         try {
  89.             $this->async->queue(
  90.                 null,
  91.                 $messages
  92.             );
  93.         } catch (\Exception $e) {
  94.             $this->oneroster->orchestratePhaseRevert($jobcount($messages));
  95.             throw $e;
  96.         }
  97.         // DEBUGGING
  98.         foreach ($messages as $message) {
  99.             $event->getOutput()->writeln(sprintf(
  100.                 'FIX__TYPE of "%s" triggered for sync #%s',
  101.                 $message->getPayload()['type'],
  102.                 $job->getIdentifier()
  103.             ));
  104.         }
  105.     }
  106.     /**
  107.      * Handles fixing triggers of individual types of objects for a sync.
  108.      *
  109.      * @param AsyncEvent $event
  110.      */
  111.     public function fixType(AsyncEvent $event): void
  112.     {
  113.         // data should be an array with an id of a sync
  114.         $job $this->loadJob($event);
  115.         // get data from the payload
  116.         [$type$class] = $this->parseForType($event);
  117.         // get the objects
  118.         $objects $this->em->getRepository($class)->createQueryBuilder('objects')
  119.             ->select('objects.sourcedId')
  120.             ->orderBy('objects.createdAt''ASC')
  121.             ->getQuery()
  122.             ->getResult(SingleColumnHydrator::HYDRATOR);
  123.         // loop over the object to create the messages to process each fully
  124.         $messages = [];
  125.         foreach ($objects as $index => $object) {
  126.             if ($index AsyncMessage::MAX_BATCH === 0) {
  127.                 $messages[] = [];
  128.             }
  129.             $messages[count($messages) - 1][] = new AsyncMessage(
  130.                 $job,
  131.                 OneRosterEvents::EVENT__FIX__OBJECT,
  132.                 [
  133.                     'job' => $job->getId(),
  134.                     'type' => $type,
  135.                     'id' => $object,
  136.                 ],
  137.                 self::MQ_PRIORITY,
  138.             );
  139.         }
  140.         // do only if we have messages
  141.         foreach ($messages as $batch) {
  142.             // tracking
  143.             $this->oneroster->orchestratePhaseChange($jobcount($batch));
  144.             // queue them up
  145.             try {
  146.                 $this->async->queue(
  147.                     null,
  148.                     $batch
  149.                 );
  150.             } catch (\Exception $e) {
  151.                 $this->oneroster->orchestratePhaseRevert($jobcount($batch));
  152.                 throw $e;
  153.             }
  154.             // DEBUGGING
  155.             $event->getOutput()->writeln(sprintf(
  156.                 'FIX__OBJECT batch: %s',
  157.                 count($batch)
  158.             ));
  159.         }
  160.         // DEBUGGING
  161.         $event->getOutput()->writeln(sprintf(
  162.             'FIX__OBJECT for %s "%s" object(s) triggered for sync #%s',
  163.             array_sum(array_map('count'$messages)),
  164.             $type,
  165.             $job->getIdentifier()
  166.         ));
  167.     }
  168.     /**
  169.      * Handle fixing of a particular object for a sync.
  170.      * This triggers a "sub" event that allows things in the system to act upon the data.
  171.      *
  172.      * @param AsyncEvent $event
  173.      */
  174.     public function fixObject(AsyncEvent $event): void
  175.     {
  176.         // data should be an array with an id of a sync
  177.         $job $this->loadJob($event);
  178.         // get data from the payload
  179.         /** @var AbstractOneRosterEntity $entity */
  180.         [$class$entity] = $this->parseForObject($event);
  181.         // TODO: use dirty checks to see if we need to process this entity at all? may not be needed in linking step...
  182.         // need to create an event for processing of this specific type
  183.         $subevent = new OneRosterFixEvent(
  184.             $entity,
  185.             $job,
  186.             $event->getOutput()
  187.         );
  188.         // capture errors
  189.         try {
  190.             // dispatch the event
  191.             $this->dispatcher->dispatch(
  192.                 $subevent,
  193.                 OneRosterFixEvent::EVENTS[$class],
  194.             );
  195.             // if propagation was stopped, it means there was a problem
  196.             if ($subevent->isPropagationStopped()) {
  197.                 throw new \Exception(
  198.                     'Propagation was stopped while handling event.',
  199.                 );
  200.             }
  201.         } catch (\Exception $e) {
  202.             $this->oneroster->logIssue(
  203.                 $job,
  204.                 $this->phase(),
  205.                 OneRosterFixEvent::EVENTS[$class],
  206.                 $entity::ONEROSTER_TYPE,
  207.                 $entity,
  208.                 $e
  209.             );
  210.         }
  211.     }
  212. }