src/Cms/CoreBundle/EventSubscriber/OneRoster/OneRosterTweakSubscriber.php line 69

Open in your IDE?
  1. <?php
  2. namespace Cms\CoreBundle\EventSubscriber\OneRoster;
  3. use Cms\CoreBundle\Doctrine\Hydrators\SingleColumnHydrator;
  4. use Cms\CoreBundle\Entity\AbstractOneRosterEntity;
  5. use Cms\CoreBundle\Entity\OneRosterJob;
  6. use Cms\CoreBundle\Events\OneRosterEvents;
  7. use Cms\CoreBundle\Events\OneRosterTweakEvent;
  8. use Platform\QueueBundle\Event\AsyncEvent;
  9. use Platform\QueueBundle\Model\AsyncMessage;
  10. /**
  11.  * Class OneRosterTweakSubscriber
  12.  * @package Cms\CoreBundle\EventSubscriber\OneRoster
  13.  */
  14. final class OneRosterTweakSubscriber extends AbstractOneRosterSubscriber
  15. {
  16.     /**
  17.      * {@inheritDoc}
  18.      */
  19.     public static function getSubscribedEvents(): array
  20.     {
  21.         return [
  22.             OneRosterEvents::EVENT__TWEAK => [
  23.                 ['discardableDisable'self::PRIORITY__FIRST],
  24.                 ['phaseStart'self::PRIORITY__FIRST],
  25.                 ['tweak'0],
  26.                 ['phaseFin'self::PRIORITY__LAST],
  27.                 ['phaseTrigger'self::PRIORITY__LAST],
  28.                 ['discardableEnable'self::PRIORITY__LAST],
  29.             ],
  30.             OneRosterEvents::EVENT__TWEAK__TYPE => [
  31.                 ['discardableDisable'self::PRIORITY__FIRST],
  32.                 ['tweakType'0],
  33.                 ['phaseFin'self::PRIORITY__LAST],
  34.                 ['phaseTrigger'self::PRIORITY__LAST],
  35.                 ['discardableEnable'self::PRIORITY__LAST],
  36.             ],
  37.             OneRosterEvents::EVENT__TWEAK__OBJECT => [
  38.                 ['discardableDisable'self::PRIORITY__FIRST],
  39.                 ['tweakObject'0],
  40.                 ['phaseFin'self::PRIORITY__LAST],
  41.                 ['phaseTrigger'self::PRIORITY__LAST],
  42.                 ['discardableEnable'self::PRIORITY__LAST],
  43.             ],
  44.         ];
  45.     }
  46.     /**
  47.      * {@inheritDoc}
  48.      */
  49.     public function phase(): int
  50.     {
  51.         return OneRosterJob::PHASES__TWEAK;
  52.     }
  53.     /**
  54.      * {@inheritDoc}
  55.      */
  56.     public function next(): ?int
  57.     {
  58.         return OneRosterJob::PHASES__TIDY;
  59.     }
  60.     /**
  61.      * @param AsyncEvent $event
  62.      */
  63.     public function tweak(AsyncEvent $event): void
  64.     {
  65.         // data should be an array with an id of a sync
  66.         $job $this->loadJob($event);
  67.         // setup calls to tweak each type
  68.         /** @var array|AsyncMessage[] $messages */
  69.         $messages = [];
  70.         foreach (array_keys(AbstractOneRosterEntity::ONEROSTER_TYPES) as $type) {
  71.             $messages[] = new AsyncMessage(
  72.                 $job,
  73.                 OneRosterEvents::EVENT__TWEAK__TYPE,
  74.                 [
  75.                     'job' => $job->getId(),
  76.                     'type' => $type,
  77.                 ],
  78.                 self::MQ_PRIORITY,
  79.             );
  80.         }
  81.         // update tracking props on the sync
  82.         $this->oneroster->orchestratePhaseChange($jobcount($messages));
  83.         // queue them up
  84.         try {
  85.             $this->async->queue(
  86.                 null,
  87.                 $messages
  88.             );
  89.         } catch (\Exception $e) {
  90.             $this->oneroster->orchestratePhaseRevert($jobcount($messages));
  91.             throw $e;
  92.         }
  93.         // DEBUGGING
  94.         foreach ($messages as $message) {
  95.             $event->getOutput()->writeln(sprintf(
  96.                 'TWEAK__TYPE of "%s" triggered for sync #%s',
  97.                 $message->getPayload()['type'],
  98.                 $job->getIdentifier()
  99.             ));
  100.         }
  101.     }
  102.     /**
  103.      * Handles tweaking triggers of individual types of objects for a sync.
  104.      *
  105.      * @param AsyncEvent $event
  106.      */
  107.     public function tweakType(AsyncEvent $event): void
  108.     {
  109.         // data should be an array with an id of a sync
  110.         $job $this->loadJob($event);
  111.         // get data from the payload
  112.         [$type$class] = $this->parseForType($event);
  113.         // get the objects
  114.         $objects $this->em->getRepository($class)->createQueryBuilder('objects')
  115.             ->select('objects.sourcedId')
  116.             ->orderBy('objects.createdAt''ASC')
  117.             ->getQuery()
  118.             ->getResult(SingleColumnHydrator::HYDRATOR);
  119.         // loop over the object to create the messages to process each fully
  120.         $messages = [];
  121.         foreach ($objects as $index => $object) {
  122.             if ($index AsyncMessage::MAX_BATCH === 0) {
  123.                 $messages[] = [];
  124.             }
  125.             $messages[count($messages) - 1][] = new AsyncMessage(
  126.                 $job,
  127.                 OneRosterEvents::EVENT__TWEAK__OBJECT,
  128.                 [
  129.                     'job' => $job->getId(),
  130.                     'type' => $type,
  131.                     'id' => $object,
  132.                 ],
  133.                 self::MQ_PRIORITY,
  134.             );
  135.         }
  136.         // do only if we have messages
  137.         foreach ($messages as $batch) {
  138.             // tracking
  139.             $this->oneroster->orchestratePhaseChange($jobcount($batch));
  140.             // queue them up
  141.             try {
  142.                 $this->async->queue(
  143.                     null,
  144.                     $batch
  145.                 );
  146.             } catch (\Exception $e) {
  147.                 $this->oneroster->orchestratePhaseRevert($jobcount($batch));
  148.                 throw $e;
  149.             }
  150.             // DEBUGGING
  151.             $event->getOutput()->writeln(sprintf(
  152.                 'TWEAK__OBJECT batch: %s',
  153.                 count($batch)
  154.             ));
  155.         }
  156.         // DEBUGGING
  157.         $event->getOutput()->writeln(sprintf(
  158.             'TWEAK__OBJECT for %s "%s" object(s) triggered for sync #%s',
  159.             array_sum(array_map('count'$messages)),
  160.             $type,
  161.             $job->getIdentifier()
  162.         ));
  163.     }
  164.     /**
  165.      * Handle tweaking of a particular object for a sync.
  166.      * This triggers a "sub" event that allows things in the system to act upon the data.
  167.      *
  168.      * @param AsyncEvent $event
  169.      */
  170.     public function tweakObject(AsyncEvent $event): void
  171.     {
  172.         // data should be an array with an id of a sync
  173.         $job $this->loadJob($event);
  174.         // get data from the payload
  175.         /** @var AbstractOneRosterEntity $entity */
  176.         [$class$entity] = $this->parseForObject($event);
  177.         // TODO: use dirty checks to see if we need to process this entity at all? may not be needed in linking step...
  178.         // need to create an event for processing of this specific type
  179.         $subevent = new OneRosterTweakEvent(
  180.             $entity,
  181.             $job,
  182.             $event->getOutput()
  183.         );
  184.         // capture errors
  185.         try {
  186.             // dispatch the event
  187.             $this->dispatcher->dispatch(
  188.                 $subevent,
  189.                 OneRosterTweakEvent::EVENTS[$class],
  190.             );
  191.             // if propagation was stopped, it means there was a problem
  192.             if ($subevent->isPropagationStopped()) {
  193.                 throw new \Exception(
  194.                     'Propagation was stopped while handling event.',
  195.                 );
  196.             }
  197.         } catch (\Exception $e) {
  198.             $this->oneroster->logIssue(
  199.                 $job,
  200.                 $this->phase(),
  201.                 OneRosterTweakEvent::EVENTS[$class],
  202.                 $entity::ONEROSTER_TYPE,
  203.                 $entity,
  204.                 $e
  205.             );
  206.         }
  207.     }
  208. }