src/Cms/CoreBundle/EventSubscriber/OneRoster/OneRosterPrepareSubscriber.php line 48

Open in your IDE?
  1. <?php
  2. namespace Cms\CoreBundle\EventSubscriber\OneRoster;
  3. use Cms\CoreBundle\Entity\AbstractOneRosterEntity;
  4. use Cms\CoreBundle\Entity\OneRosterJob;
  5. use Cms\CoreBundle\Events\OneRosterAdhocEvent;
  6. use Cms\CoreBundle\Events\OneRosterEvents;
  7. use Platform\QueueBundle\Event\AsyncEvent;
  8. /**
  9.  * Class OneRosterPrepareSubscriber
  10.  * @package Cms\CoreBundle\EventSubscriber\OneRoster
  11.  */
  12. final class OneRosterPrepareSubscriber extends AbstractOneRosterSubscriber
  13. {
  14.     /**
  15.      * {@inheritDoc}
  16.      */
  17.     public static function getSubscribedEvents(): array
  18.     {
  19.         return [
  20.             OneRosterEvents::EVENT__PREPARE => [
  21.                 ['discardableDisable'self::PRIORITY__FIRST],
  22.                 ['phaseStart'self::PRIORITY__FIRST],
  23.                 ['markStashed'1],
  24.                 ['cleanStashed'1],
  25.                 ['phaseFin'self::PRIORITY__LAST],
  26.                 ['phaseTrigger'self::PRIORITY__LAST],
  27.                 ['discardableEnable'self::PRIORITY__LAST],
  28.             ],
  29.             OneRosterEvents::EVENT__PREPARE__ADHOC => [
  30.                 ['discardableDisable'self::PRIORITY__FIRST],
  31.                 ['adhoc'0],
  32.                 ['phaseFin'self::PRIORITY__LAST],
  33.                 ['phaseTrigger'self::PRIORITY__LAST],
  34.                 ['discardableEnable'self::PRIORITY__LAST],
  35.             ],
  36.         ];
  37.     }
  38.     /**
  39.      * In case we skip the stash phase, will update all the stash entries to be tied to this current job.
  40.      *
  41.      * @param AsyncEvent $event
  42.      * @return void
  43.      */
  44.     public function markStashed(AsyncEvent $event): void
  45.     {
  46.         // data should be an array with an id of a sync
  47.         $job $this->loadJob($event);
  48.         // we only want to mark entities if we skipped the stash phase
  49.         // TODO: do we need to worry about signed/unsigned here at some point?
  50.         if ($job->getStartPhase() <= OneRosterJob::PHASES__STASH) {
  51.             return;
  52.         }
  53.         // run bulk query
  54.         $marks $this->em->createQueryBuilder()
  55.             ->update(AbstractOneRosterEntity::class, 'entities')
  56.             ->set('entities.job'':job')
  57.             ->setParameter('job'$job->getId())
  58.             ->andWhere('entities.tenant = :tenant')
  59.             ->setParameter('tenant'$job->getTenant()->getId())
  60.             ->andWhere('entities.sync = :sync')
  61.             ->setParameter('sync'$job->getSync()->getId())
  62.             ->getQuery()
  63.             ->execute();
  64.         // DEBUGGING
  65.         $event->getOutput()->writeln(sprintf(
  66.             'Marked %s stashed objects (stash phase was skipped) for sync #%s',
  67.             $marks,
  68.             $job->getIdentifier()
  69.         ));
  70.     }
  71.     /**
  72.      * Removes "old" data from the stashed entities that are no longer valid based on this current sync.
  73.      *
  74.      * @param AsyncEvent $event
  75.      * @return void
  76.      */
  77.     public function cleanStashed(AsyncEvent $event): void
  78.     {
  79.         // data should be an array with an id of a sync
  80.         $job $this->loadJob($event);
  81.         // run bulk query
  82.         $removals $this->em->createQueryBuilder()
  83.             ->delete(AbstractOneRosterEntity::class, 'entities')
  84.             ->andWhere('entities.tenant = :tenant')
  85.             ->setParameter('tenant'$job->getTenant()->getId())
  86.             ->andWhere('entities.sync = :sync')
  87.             ->setParameter('sync'$job->getSync()->getId())
  88.             ->andWhere('entities.job != :job')
  89.             ->setParameter('job'$job->getId())
  90.             ->getQuery()
  91.             ->execute();
  92.         // DEBUGGING
  93.         $event->getOutput()->writeln(sprintf(
  94.             'Removed %s stashed objects for sync #%s',
  95.             $removals,
  96.             $job->getIdentifier()
  97.         ));
  98.     }
  99.     /**
  100.      * In case we skip the stash phase, will update all the stash entries to be tied to this current job.
  101.      *
  102.      * @param AsyncEvent $event
  103.      * @return void
  104.      */
  105.     public function adhoc(AsyncEvent $event): void
  106.     {
  107.         // data should be an array with an id of a sync
  108.         $job $this->loadJob($event);
  109.         // trigger the event to do further processing
  110.         $this->dispatcher->dispatch(
  111.             new OneRosterAdhocEvent(
  112.                 $event->getParam('payload'),
  113.                 $job,
  114.                 $event->getOutput()
  115.             ),
  116.             $event->getParam('event'),
  117.         );
  118.         // DEBUGGING
  119.         $event->getOutput()->writeln(sprintf(
  120.             'Dispatched adhoc event "%s" for sync #%s',
  121.             $event->getParam('event'),
  122.             $job->getIdentifier()
  123.         ));
  124.     }
  125.     /**
  126.      * {@inheritDoc}
  127.      */
  128.     public function phase(): int
  129.     {
  130.         return OneRosterJob::PHASES__PREPARE;
  131.     }
  132.     /**
  133.      * {@inheritDoc}
  134.      */
  135.     public function next(): ?int
  136.     {
  137.         return OneRosterJob::PHASES__PROCESS;
  138.     }
  139. }