src/Products/NotificationsBundle/Subscriber/BroadcastSubscriber.php line 50

Open in your IDE?
  1. <?php
  2. namespace Products\NotificationsBundle\Subscriber;
  3. use Cms\CoreBundle\Util\Doctrine\EntityManager;
  4. use Platform\QueueBundle\Event\AsyncEvent;
  5. use Products\NotificationsBundle\Entity\AbstractNotification;
  6. use Products\NotificationsBundle\Entity\Job;
  7. use Products\NotificationsBundle\Entity\Notifications\Channels\ChannelsInterface;
  8. use Products\NotificationsBundle\Service\Switchboard;
  9. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  10. final class BroadcastSubscriber implements EventSubscriberInterface
  11. {
  12.     /**
  13.      * @var EntityManager
  14.      */
  15.     protected EntityManager $em;
  16.     /**
  17.      * @var Switchboard
  18.      */
  19.     protected Switchboard $switchboard;
  20.     /**
  21.      * @param EntityManager $em
  22.      * @param Switchboard $switchboard
  23.      */
  24.     public function __construct(EntityManager $emSwitchboard $switchboard)
  25.     {
  26.         $this->em $em;
  27.         $this->switchboard $switchboard;
  28.     }
  29.     /**
  30.      * {@inheritDoc}
  31.      */
  32.     public static function getSubscribedEvents(): array
  33.     {
  34.         return [
  35.             Switchboard::EVENTS__BROADCAST_JOB => ['onBroadcastJob'0],
  36.             Switchboard::EVENTS__BROADCAST_CHANNEL => ['onBroadcastChannel'0],
  37.             Switchboard::EVENTS__BROADCAST_ITEM => ['onBroadcastItem'0],
  38.         ];
  39.     }
  40.     /**
  41.      * @param AsyncEvent $event
  42.      */
  43.     public function onBroadcastJob(AsyncEvent $event)
  44.     {
  45.         $job $this->em->getRepository(Job::class)->find(
  46.             $event->getBody()->get('job')
  47.         );
  48.         if ( ! $job instanceof Job) {
  49.             throw new \Exception();
  50.         }
  51.         $result $this->switchboard->broadcastJob($job);
  52.         $event->getOutput()->writeln(sprintf(
  53.             'Broadcasting to %s channels.',
  54.             $result,
  55.         ));
  56.     }
  57.     /**
  58.      * @param AsyncEvent $event
  59.      */
  60.     public function onBroadcastChannel(AsyncEvent $event)
  61.     {
  62.         $job $this->em->getRepository(Job::class)->find(
  63.             $event->getBody()->get('job')
  64.         );
  65.         if ( ! $job instanceof Job) {
  66.             throw new \Exception();
  67.         }
  68.         $result $this->switchboard->broadcastChannel(
  69.             $job,
  70.             $channel $event->getBody()->get('channel')
  71.         );
  72.         $event->getOutput()->writeln(sprintf(
  73.             'Broadcasting %s items to channel %s.',
  74.             $result,
  75.             array_search($channelChannelsInterface::CHANNELS),
  76.         ));
  77.     }
  78.     /**
  79.      * @param AsyncEvent $event
  80.      */
  81.     public function onBroadcastItem(AsyncEvent $event)
  82.     {
  83.         $job $this->em->getRepository(Job::class)->find(
  84.             $event->getBody()->get('job')
  85.         );
  86.         if ( ! $job instanceof Job) {
  87.             throw new \Exception();
  88.         }
  89.         $this->switchboard->broadcastItem(
  90.             $job,
  91.             $event->getBody()->get('channel'),
  92.             $event->getBody()->get('item')// this will still be a scalar value, the handler for the channel should know how to load it
  93.         );
  94.     }
  95. }