<?php
namespace Platform\QueueBundle\Service;
use Platform\QueueBundle\Model\AsyncMessage;
use Stomp\Client;
use Stomp\Network\Connection;
use Stomp\StatefulStomp;
use Stomp\Transport\Frame;
use Stomp\Transport\Message;
use Symfony\Component\DependencyInjection\ParameterBag\ParameterBagInterface;
/**
* Class AsyncQueueService
* @package Platform\QueueBundle\Service
*/
final class AsyncQueueService
{
private const DEFAULT_QUEUE = '/queue/campussuite/worker';
private const READ_TIMEOUT = 60;
private const SUBSCRIPTION_LIMIT = 1;
private const CONNECT_MAX_RETRIES = 10;
/**
* @var array
*/
protected array $config;
/**
* @var StatefulStomp
*/
protected StatefulStomp $stomp;
/**
* @var array|int[]
*/
protected array $subscriptions = [];
/**
* @var array|Frame[]
*/
protected array $frames = [];
/**
* @param ParameterBagInterface $params
*/
public function __construct(ParameterBagInterface $params)
{
$this->config = [
'broker' => sprintf(
'failover://(%s)',
implode(',', array_map(
static function (string $broker) {
return str_replace('stomp+', '', $broker);
},
explode(',', $params->get('app.mq.default.broker'))
))
),
'user' => $params->get('app.mq.default.user'),
'password' => $params->get('app.mq.default.password'),
];
}
/**
* @param string|null $suffix
* @return string
*/
public static function qname(?string $suffix = null): string
{
if ( ! empty($suffix)) {
return sprintf(
'%s/%s',
self::DEFAULT_QUEUE,
trim($suffix, '/')
);
}
return self::DEFAULT_QUEUE;
}
/**
* @return $this
*/
public function connect(): self
{
// if we don't have a stomp object yet, we need to make one
if (empty($this->stomp)) {
// create the client, make it stateful
$connection = new Connection(
$this->config['broker'],
30
);
$connection->setReadTimeout(120);
$client = new Client($connection);
$client->setLogin(
$this->config['user'],
$this->config['password']
);
$this->stomp = new StatefulStomp($client);
}
// if we are not connected, attempt to connect
if ( ! $this->stomp->getClient()->isConnected()) {
// try to connect multiple times
// aws mq tends to "sleep" on us when not used for a while
$exception = null;
$counter = 0;
while ( ! $this->stomp->getClient()->isConnected() && $counter < self::CONNECT_MAX_RETRIES) {
// wrapping in a try clause because of the issue with aws mq "sleeping"
try {
// do connect
$this->stomp->getClient()->connect();
} catch (\Exception $e) {
// we had an error, increment the counter so that we don't attempt to connect forever
$counter++;
// track the last connection exception
$exception = $e;
// sleep for a bit to wait for it to come online
sleep(1);
}
}
// check again if we have a connection
// throw the last error if not
if ( ! $this->stomp->getClient()->isConnected()) {
throw $exception ?: new \Exception('Could not connect to MQ server.');
}
// TODO: we may be reconnecting, if so, resubscribe to the queues if we were subscribed already?
}
return $this;
}
/**
* @return $this
*/
public function close(): self
{
// if we were in the middle of processing a frame and forgot to ack or nack, nack it
if ( ! empty($this->frames)) {
// TODO: allow auto-nack option for outstanding frames?
throw new \RuntimeException('Cannot close the connection, there are outstanding frames.');
}
// if we are subscribed, properly unsubscribe first
foreach ($this->subscriptions as $subscription) {
$this->unsubscribe($subscription);
}
if ( ! empty($this->subscriptions)) {
// TODO: allow auto-unsubscribe option?
throw new \RuntimeException('Cannot close the connection, there are still active subscriptions.');
}
// disconnect the client
$this->stomp->getClient()->disconnect(true);
return $this;
}
/**
* @return bool|Frame
*/
public function listen(?int $timeout = null)
{
// check if we are even subscribed
if (empty($this->subscriptions)) {
throw new \RuntimeException('Cannot listen for messages, no subscriptions have been made.');
}
// ensure connection
$this->connect();
// read from the queue; this may block for some time...
if ( ! $timeout) {
$this->stomp->getClient()->getConnection()->setReadTimeout(self::READ_TIMEOUT);
} else {
$this->stomp->getClient()->getConnection()->setReadTimeout(0, ($timeout * 1000));
}
$frame = $this->stomp->read();
// make sure we got a proper result back
// if we did, track it
if ($frame instanceof Frame) {
$this->frames[$frame->getMessageId()] = $frame;
}
// send back the result, may be a frame or a boolean
return $frame;
}
/**
* @param Frame $frame
* @return $this
*/
public function ack(Frame $frame): self
{
// check that we haven't already processed a frame
if ( ! isset($this->frames[$frame->getMessageId()])) {
throw new \RuntimeException(sprintf(
'Frame #%s has already been handled.',
$frame->getMessageId()
));
}
// ensure connection
$this->connect();
// do the ack
$this->stomp->ack($frame);
// clear the frame from tracking
unset($this->frames[$frame->getMessageId()]);
return $this;
}
/**
* @param Frame $frame
* @return $this
*/
public function nack(Frame $frame): self
{
// check that we haven't already processed a frame
if ( ! isset($this->frames[$frame->getMessageId()])) {
throw new \RuntimeException(sprintf(
'Frame #%s has already been handled.',
$frame->getMessageId()
));
}
// ensure connection
$this->connect();
// do the nack
$this->stomp->nack($frame);
// clear the frame from tracking
unset($this->frames[$frame->getMessageId()]);
return $this;
}
/**
* @param string|null $queue
* @return int
*/
public function subscribe(?string $queue): int
{
// default the queue in case none is given
if (empty($queue)) {
$queue = self::qname();
}
// make sure we have proper queue
if ( ! str_starts_with($queue, '/queue')) {
throw new \RuntimeException('Queue name is not properly formatted; must being with "/queue".');
}
// make sure we are not above our limit
if (count($this->subscriptions) > self::SUBSCRIPTION_LIMIT) {
throw new \RuntimeException(sprintf(
'Cannot subscribe to queue "%s", doing so will exceed max subscription limit of %s.',
$queue,
self::SUBSCRIPTION_LIMIT
));
}
// ensure connection
$this->connect();
// make the subscription and save of the id of the subscription
return $this->subscriptions[] = $this->stomp->subscribe(
$queue,
null,
'client-individual',
);
}
/**
* @param int $id
* @return $this
*/
public function unsubscribe(int $id): self
{
// make sure this subscription is legit
if ( ! in_array($id, $this->subscriptions, true)) {
throw new \RuntimeException(sprintf(
'Subscription #%s has not been setup.',
$id
));
}
// ensure connection
$this->connect();
// do the unsub
$this->stomp->unsubscribe($id);
// clear the tracked subscription id
$this->subscriptions = [
...array_slice($this->subscriptions, 0, array_search($id, $this->subscriptions, true)),
...array_slice($this->subscriptions, (array_search($id, $this->subscriptions, true) + 1)),
];
return $this;
}
/**
* @param string|null $queue
* @param array|AsyncMessage[] $messages
* @param bool $transactional
* @return array|Message[]
*/
public function queue(?string $queue, array $messages, bool $transactional = true): array
{
// default the queue in case none is given
if (empty($queue)) {
$queue = self::qname();
}
// make sure we have proper queue
if ( ! str_starts_with($queue, '/queue')) {
throw new \RuntimeException('Queue name is not properly formatted; must being with "/queue".');
}
// quit if no messages to send
if (empty($messages)) {
return [];
}
// ensure connection
$this->connect();
// holder for messages
$msgs = [];
// detect errors
try {
// start transaction if doing that
if ($transactional) {
$this->stomp->begin();
}
// loop over each message and send
foreach ($messages as $message) {
$result = $this->stomp->send(
$queue,
$msgs[] = $message->stomp()
);
if ( ! $result) {
throw new \RuntimeException(sprintf(
'Sending of a single message to queue "%s" in a transaction failed.',
$queue
));
}
}
// commit transaction if doing that
if ($transactional) {
$this->stomp->commit();
}
} catch (\Exception $e) {
// rollback transaction if doing that
if ($transactional) {
$this->stomp->abort();
}
// rethrow the error
throw $e;
}
return $msgs;
}
/**
* @param string|null $queue
* @param AsyncMessage $message
* @return Message
*/
public function send(?string $queue, AsyncMessage $message): Message
{
// default the queue in case none is given
if (empty($queue)) {
$queue = self::qname();
}
// make sure we have proper queue
if ( ! str_starts_with($queue, '/queue')) {
throw new \RuntimeException('Queue name is not properly formatted; must being with "/queue".');
}
// ensure connection
$this->connect();
// send the stomp formatted message
$result = $this->stomp->send(
$queue,
$msg = $message->stomp()
);
// make sure it was successful
if ( ! $result) {
throw new \RuntimeException(sprintf(
'Sending of a single message to queue "%s" failed.',
$queue
));
}
return $msg;
}
}