<?php
namespace Products\NotificationsBundle\Service;
use App\Util\Json;
use Cms\CoreBundle\Util\DateTimeUtils;
use Cms\CoreBundle\Util\Doctrine\EntityManager;
use DateTimeInterface;
use Doctrine\ORM\Mapping\ClassMetadata;
use Doctrine\ORM\QueryBuilder;
use PDO;
use Platform\QueueBundle\Model\AsyncMessage;
use Platform\QueueBundle\Service\AsyncQueueService;
use Products\NotificationsBundle\Entity\AbstractRecipient;
use Products\NotificationsBundle\Entity\ContactAttempts\AppContactAttempt;
use Products\NotificationsBundle\Entity\ContactAttempts\EmailContactAttempt;
use Products\NotificationsBundle\Entity\ContactAttempts\SmsContactAttempt;
use Products\NotificationsBundle\Entity\ContactAttempts\VoiceContactAttempt;
use Products\NotificationsBundle\Entity\Job;
use Products\NotificationsBundle\Entity\Notifications\Channels\ChannelsInterface;
use Products\NotificationsBundle\Entity\Notifications\Channels\TransactionalChannelsInterface;
use Products\NotificationsBundle\Entity\Profile;
use Products\NotificationsBundle\Entity\ProfileContact;
use Products\NotificationsBundle\Entity\ProfileRelationship;
use Products\NotificationsBundle\Entity\Student;
use Products\NotificationsBundle\Util\Reachability;
use Symfony\Component\HttpFoundation\Request;
/**
* Class ContactMonitor
* @package Products\NotificationsBundle\Service
*/
final class ContactMonitor
{
/**
* Mapping of notifications channel bits to their webhook counterparts.
*/
public const CHANNEL_WEBHOOKS = [
ChannelsInterface::CHANNELS__EMAIL => self::EVENTS__WEBHOOKS__EMAIL,
ChannelsInterface::CHANNELS__SMS => self::EVENTS__WEBHOOKS__SMS,
ChannelsInterface::CHANNELS__VOICE => self::EVENTS__WEBHOOKS__VOICE,
];
public const EVENTS__WEBHOOKS__EMAIL = 'app.notifications.webhooks.email';
public const EVENTS__WEBHOOKS__SMS = 'app.notifications.webhooks.sms';
public const EVENTS__WEBHOOKS__VOICE = 'app.notifications.webhooks.voice';
/**
* @var EntityManager
*/
protected EntityManager $em;
/**
* @var AsyncQueueService
*/
protected AsyncQueueService $async;
/**
* @param EntityManager $em
* @param AsyncQueueService $async
*/
public function __construct(EntityManager $em, AsyncQueueService $async)
{
$this->em = $em;
$this->async = $async;
}
/**
* @param Request $request
* @return array
*/
public function serializeRequest(Request $request): array
{
return [
'host' => $request->getHost(),
'path' => $request->getPathInfo(),
'method' => $request->getRealMethod(),
'content' => $request->getContent(),
'headers' => $request->headers->all(),
'query' => $request->query->all(),
'attributes' => $request->attributes->all(),
'request' => $request->request->all(),
'server' => $request->server->all(),
];
}
/**
* @param int $channel
* @param Request $request
*/
public function queueOrHandleWebhook(int $channel, Request $request): void
{
try {
$this->queueWebhook($channel, $request);
} catch (\Exception) {
$this->handleWebhook($channel, $request);
}
}
/**
* @param int $channel
* @param Request|array $request
*/
public function handleWebhook(int $channel, Request|array $request): void
{
// make sure request is of expected type, normalize request object if passed
if ($request instanceof Request) {
$request = $this->serializeRequest($request);
}
if ( ! is_array($request)) {
throw new \LogicException();
}
// branch on channel type
switch ($channel) {
case ChannelsInterface::CHANNELS__EMAIL:
$this->handleEmail($request);
break;
case ChannelsInterface::CHANNELS__SMS:
$this->handleSms($request);
break;
case ChannelsInterface::CHANNELS__VOICE:
$this->handleVoice($request);
break;
case ChannelsInterface::CHANNELS__APP:
$this->handleApp($request);
break;
}
throw new \LogicException();
}
/**
* @param int $channel
* @param Request $request
*/
public function queueWebhook(int $channel, Request $request): void
{
$this->async->send(
null,
new AsyncMessage(
null,
self::CHANNEL_WEBHOOKS[$channel],
$this->serializeRequest($request),
AsyncMessage::PRIORITY__LOWER,
)
);
}
/**
* @param string $type
* @param string $externalId
* @return array{id: int, job: int, recipient: int}
*/
protected function loadAttempt(string $type, string $externalId): array
{
$attempt = $this->em->createQueryBuilder()
->select('attempt.id AS id, IDENTITY(attempt.job) AS job, IDENTITY(attempt.recipient) AS recipient')
->from($type, 'attempt')
->andWhere('attempt.externalId = :id')
->setParameter('id', $externalId)
->getQuery()
->getScalarResult();
if (count($attempt) !== 1) {
throw new \RuntimeException();
}
$attempt = $attempt[0];
if ( ! $attempt['id'] || ! $attempt['job'] || ! $attempt['recipient']) {
throw new \RuntimeException();
}
return $attempt;
}
/**
* @param array $request
*/
public function handleApp(array $request): void
{
// just double check that we have an id
if (empty($request['request']['name'])) {
throw new \RuntimeException();
}
// determine some basic attempt stuff
$attempt = $this->loadAttempt(AppContactAttempt::class, $request['request']['name']);
// run the query
$this->em->createQueryBuilder()
->update(AppContactAttempt::class, 'attempt')
// set the event name for the status
->set('attempt.event', ':event')
->setParameter('event', sprintf('attempt.%s', $request['request']['status']))
// handle the timestamp
// have to use the server time which is not accurate, but twilio does not give the exact timestamp in the callback
->set('attempt.triggeredAt', 'FROM_UNIXTIME(:timestamp)')
->setParameter('timestamp', $request['server']['REQUEST_TIME'])
// filter by the id of the call
->andWhere('attempt.id = :id')
->setParameter('id', $attempt['id'])
// make sure the sequence is after what has already come before
->andWhere('(attempt.event IS NULL OR attempt.event IN (:statuses))')
->setParameter('statuses', array_slice(
AppContactAttempt::STATUSES,
0,
array_search(
sprintf('attempt.%s', $request['request']['status']),
AppContactAttempt::STATUSES
)
))
->getQuery()
->execute();
// do only if our status is not a pending status
if (in_array(sprintf('attempt.%s', $request['request']['status']), [...AppContactAttempt::SUCCESSFUL_STATUSES, ...AppContactAttempt::FAILED_STATUSES])) {
// attach stats update query
$this->trackItemDelivery(
$attempt['job'],
ChannelsInterface::CHANNELS__APP,
in_array(sprintf('attempt.%s', $request['request']['status']), AppContactAttempt::SUCCESSFUL_STATUSES)
);
// update reachability
$this->trackReachability(
$attempt['recipient'],
in_array(sprintf('sms.%s', $request['request']['MessageStatus']), AppContactAttempt::SUCCESSFUL_STATUSES)
);
}
}
/**
* @param array $request
*/
public function handleSms(array $request): void
{
// just double check that we have an id
if (empty($request['request']['MessageSid'])) {
throw new \RuntimeException();
}
// determine some basic attempt stuff
$attempt = $this->loadAttempt(SmsContactAttempt::class, $request['request']['MessageSid']);
// run the query
$qb = $this->em->createQueryBuilder()
->update(SmsContactAttempt::class, 'attempt')
// set the event name for the status
->set('attempt.event', ':event')
->setParameter('event', sprintf('sms.%s', $request['request']['MessageStatus']))
// handle the timestamp
// have to use the server time which is not accurate, but twilio does not give the exact timestamp in the callback
->set('attempt.triggeredAt', 'FROM_UNIXTIME(:timestamp)')
->setParameter('timestamp', $request['server']['REQUEST_TIME'])
// filter by the id of the call
->andWhere('attempt.id = :id')
->setParameter('id', $attempt['id'])
// make sure the sequence is after what has already come before
->andWhere('(attempt.event IS NULL OR attempt.event IN (:statuses))')
->setParameter('statuses', array_slice(
SmsContactAttempt::STATUSES,
0,
array_search(
sprintf('sms.%s', $request['request']['MessageStatus']),
SmsContactAttempt::STATUSES
)
));
if ( ! empty($request['request']['ErrorCode'])) {
$qb
->set('attempt.code', ':errorCode')
->setParameter('errorCode', $request['request']['ErrorCode']);
}
$qb
->getQuery()
->execute();
// do only if our status is not a pending status
if (in_array(sprintf('sms.%s', $request['request']['MessageStatus']), [...SmsContactAttempt::SUCCESSFUL_STATUSES, ...SmsContactAttempt::FAILED_STATUSES])) {
// attach stats update query
$this->trackItemDelivery(
$attempt['job'],
ChannelsInterface::CHANNELS__SMS,
in_array(sprintf('sms.%s', $request['request']['MessageStatus']), SmsContactAttempt::SUCCESSFUL_STATUSES)
);
// update reachability
$this->trackReachability(
$attempt['recipient'],
in_array(sprintf('sms.%s', $request['request']['MessageStatus']), SmsContactAttempt::SUCCESSFUL_STATUSES)
);
}
}
/**
* @param array $request
*/
public function handleVoice(array $request): void
{
// mapping of callback data fields to entity properties
static $mapping = [
'sequenceNumber' => 'SequenceNumber',
'duration' => 'CallDuration',
'answer' => 'AnsweredBy',
'country' => 'ToCountry',
'state' => 'ToState',
'city' => 'ToCity',
'zip' => 'ToZip',
];
// just double check that we have an id
if (empty($request['request']['CallSid'])) {
throw new \RuntimeException();
}
// determine some basic attempt stuff
$attempt = $this->loadAttempt(VoiceContactAttempt::class, $request['request']['CallSid']);
// start a query builder and attach core stuff
$qb = $this->em->createQueryBuilder()
->update(VoiceContactAttempt::class, 'attempt')
// set the event name for the status
->set('attempt.event', ':event')
->setParameter('event', sprintf('voice.%s', $request['request']['CallStatus']))
// handle the timestamp
->set('attempt.triggeredAt', ':timestamp')
->setParameter('timestamp', DateTimeUtils::make(
$request['request']['Timestamp'],
DateTimeInterface::RFC2822
))
// filter by the id of the call
->andWhere('attempt.id = :id')
->setParameter('id', $attempt['id'])
// make sure the sequence is after what has already come before
->andWhere('(attempt.sequenceNumber IS NULL OR attempt.sequenceNumber < :sequence)')
->setParameter('sequence', $request['request']['SequenceNumber']);
// handle the mapped fields
foreach ($mapping as $field => $index) {
if ( ! empty($request['request'][$index])) {
$qb
->set(sprintf('attempt.%s', $field), sprintf(':%s', $field))
->setParameter($field, $request['request'][$index]);
}
}
// run it
$qb->getQuery()->execute();
// do only if our status is not a pending status
if (in_array(sprintf('voice.%s', $request['request']['CallStatus']), [...VoiceContactAttempt::SUCCESSFUL_STATUSES, ...VoiceContactAttempt::FAILED_STATUSES])) {
// attach stats update query
$this->trackItemDelivery(
$attempt['job'],
ChannelsInterface::CHANNELS__VOICE,
in_array(sprintf('voice.%s', $request['request']['CallStatus']), VoiceContactAttempt::SUCCESSFUL_STATUSES),
function (QueryBuilder $qb) use ($request) {
if (isset($request['request']['AnsweredBy']) && $request['request']['AnsweredBy'] === 'human') {
$qb->set('job.voiceAnswered', 'job.voiceAnswered + 1');
}
}
);
// update reachability
$this->trackReachability(
$attempt['recipient'],
in_array(sprintf('voice.%s', $request['request']['CallStatus']), VoiceContactAttempt::SUCCESSFUL_STATUSES)
);
}
}
/**
* @param array $request
*/
public function handleEmail(array $request): void
{
// webhook payload is in the content, this is an array of arrays
$hooks = Json::decode($request['content'], true);
// need to loop over each, the webhook may have multiple events to handle
$data = [];
foreach ($hooks as $hook) {
// obtain the id being passed to us, will need to match this up in the db later
// also needs to be split up
// @see https://sendgrid.com/docs/glossary/message-id/
$id = strtok($hook['sg_message_id'], '.');
// handle special cases
$params = [
//'sg_event_id' => null,
'event' => null,
'reason' => null,
'type' => null,
'code' => null,
'response' => null,
'attempt' => null,
'timestamp' => null,
];
foreach (array_keys($params) as $param) {
if (array_key_exists($param, $hook)) {
$params[$param] = $hook[$param];
}
}
$params['_id'] = $id;
// save it onto the set of data for quicker access
array_unshift($data, $params);
}
// loop over all the filtered events data
foreach ($data as $params) {
// determine some basic attempt stuff
$attempt = $this->loadAttempt(EmailContactAttempt::class, $params['_id']);
// need to save off the event
$event = $params['event'];
// branch on the event type
switch ($event) {
// open event is handled specially as it is an async event not tied to sending status
case 'open':
// handle updating info on the attempt first
$result = $this->em->createQueryBuilder()
->update(EmailContactAttempt::class, 'attempt')
// set the opened at timestamp, this is in unix time
// do only if the opened timestamp has not been set yet
->set('attempt.openedAt', 'CASE WHEN attempt.openedAt IS NULL THEN FROM_UNIXTIME(:timestamp) ELSE attempt.openedAt END')
->setParameter('timestamp', $params['timestamp'])
// be sure to filter by the external id
->andWhere('attempt.id = :id')
->setParameter('id', $attempt['id'])
->getQuery()
->execute();
// update stats tracking on the job
// only do if there is a result on the last query
// if there is, that means that the last query modified a row
// if no result, then the query did not run, meaning our stuff has already been tracked
if ($result) {
$this->em->createQueryBuilder()
->update(Job::class, 'job')
// inc open tracking
->set('job.emailOpened', '(job.emailOpened + 1)')
// IMPORTANT: must always filter by message id since we are doing a bulk update query!
->andWhere('job.id = :job')
->setParameter('job', $attempt['job'])
->getQuery()
->execute();
}
break;
// handle spam report
case 'spamreport':
// handle updating info on the attempt first
$result = $this->em->createQueryBuilder()
->update(EmailContactAttempt::class, 'attempt')
// set the opened at timestamp, this is in unix time
// do only if the opened timestamp has not been set yet
->set('attempt.spammedAt', 'CASE WHEN attempt.spammedAt IS NULL THEN FROM_UNIXTIME(:timestamp) ELSE attempt.spammedAt END')
->setParameter('timestamp', $params['timestamp'])
// be sure to filter by the external id
->andWhere('attempt.id = :id')
->setParameter('id', $attempt['id'])
->getQuery()
->execute();
// update stats tracking on the job
// only do if there is a result on the last query
// if there is, that means that the last query modified a row
// if no result, then the query did not run, meaning our stuff has already been tracked
if ($result) {
$this->em->createQueryBuilder()
->update(Job::class, 'job')
// inc spam tracking
->set('job.emailSpammed', '(job.emailSpammed + 1)')
// IMPORTANT: must always filter by message id since we are doing a bulk update query!
->andWhere('job.id = :job')
->setParameter('job', $attempt['job'])
->getQuery()
->execute();
}
break;
// should be a sending status update
default:
// start the query builder and handle core things
$qb = $this->em->createQueryBuilder()
->update(EmailContactAttempt::class, 'attempt')
// set the event name for the status
->set('attempt.event', ':event')
->setParameter('event', sprintf('email.%s', $params['event']))
// handle the timestamp
->set('attempt.triggeredAt', 'FROM_UNIXTIME(:timestamp)')
->setParameter('timestamp', $params['timestamp'])
// be sure to filter by the external id
->andWhere('attempt.id = :id')
->setParameter('id', $attempt['id'])
// make sure the sequence is after what has already come before
->andWhere('(attempt.event IS NULL OR attempt.event IN (:statuses))')
->setParameter('statuses', array_slice(
EmailContactAttempt::STATUSES,
0,
array_search(
sprintf('email.%s', $params['event']),
EmailContactAttempt::STATUSES
)
))
;
// need to remove ones we've already handled
unset(
$params['_id'],
$params['event'],
$params['timestamp'],
);
// pull all applicable vars and set them in the query
foreach ($params as $key => $value) {
if ( ! empty($value)) {
$qb
->set(sprintf('attempt.%s', $key), sprintf(':%s', $key))
->setParameter($key, $value);
}
}
// attach the query to the set
$qb->getQuery()->execute();
// do only if our status is not a pending status
if (in_array(sprintf('email.%s', $event), [...EmailContactAttempt::SUCCESSFUL_STATUSES, ...EmailContactAttempt::FAILED_STATUSES])) {
// attach stats update query
$this->trackItemDelivery(
$attempt['job'],
ChannelsInterface::CHANNELS__EMAIL,
in_array(sprintf('email.%s', $event), EmailContactAttempt::SUCCESSFUL_STATUSES)
);
// update reachability
$this->trackReachability(
$attempt['recipient'],
in_array(sprintf('email.%s', $event), EmailContactAttempt::SUCCESSFUL_STATUSES)
);
}
}
}
}
/**
* @param int $job
* @param int $channel
* @param bool $result
* @param callable|null $callback
* @return int
*/
protected function trackItemDelivery(int $job, int $channel, bool $result, ?callable $callback = null): int
{
if ( ! in_array($channel, TransactionalChannelsInterface::TRANSACTIONAL_CHANNELS)) {
throw new \LogicException();
}
$secondary = array_search($channel, ChannelsInterface::USABLE_CHANNELS);
$field = ($result) ? 'Delivered' : 'Undelivered';
$qb = $this->em->createQueryBuilder()
->update(Job::class, 'job')
// inc primary result
->set(
sprintf('job.messages%s', $field),
sprintf('(job.messages%s + 1)', $field)
)
// inc secondary result
->set(
sprintf('job.%s%s', $secondary, $field),
sprintf('(job.%s%s + 1)', $secondary, $field)
)
// set the last delivery timestamp
->set('job.lastDeliveryAt', 'NOW()')
// IMPORTANT: must always filter by message id since we are doing a bulk update query!
->andWhere('job.id = :job')
->setParameter('job', $job);
if ($callback) {
$callback($qb);
}
return $qb->getQuery()->execute();
}
/**
* @param int $recipient
* @param bool $success
* @return void
*/
protected function trackReachability(int $recipient, bool $success): void
{
// obtain metadata we are going to need
/** @var array|ClassMetadata[] $metadatas */
$metadatas = [
Profile::class => $this->em->getClassMetadata(Profile::class),
ProfileContact::class => $this->em->getClassMetadata(ProfileContact::class),
AbstractRecipient::class => $this->em->getClassMetadata(AbstractRecipient::class),
Student::class => $this->em->getClassMetadata(Student::class),
ProfileRelationship::class => $this->em->getClassMetadata(ProfileRelationship::class),
];
// calculate the reachability information
$standing = ($success) ? Reachability::STANDINGS__GOOD : Reachability::STANDINGS__BAD;
$contactability = Reachability::standingsToContactability($standing);
$reachability = Reachability::contactabilitiesToReachability($contactability);
// update the reachability for the specific contact
$updated = $this->em->createQueryBuilder()
->update(AbstractRecipient::class, 'recipient')
// set the standing
->set('recipient.standing', ':standing')
->setParameter('standing', $standing)
// set the contactability
->set('recipient.contactability', ':contactability')
->setParameter('contactability', $contactability)
// set the reachability
->set('recipient.reachability', ':reachability')
->setParameter('reachability', $reachability)
// IMPORTANT: must always filter by id since we are doing a bulk update query!
->andWhere('recipient.id = :recipient')
->setParameter('recipient', $recipient)
->getQuery()
->execute();
// when the above query results in no changes, that means that any profiles or students (in being related to profiles)
// tied to the current recipient would also not change.
// in that case, the native SQL queries that follow the Doctrine query don't need to be run.
if ($updated === 0) {
return;
}
// then, force an update of the profiles attached to the contact
// need to pull all contacts for the profiles and mash their standings together
// do a native query for this, as it is too complex for dql...
try {
$this->em->getConnection()->executeStatement(
preg_replace('/\s+/', ' ', trim(sprintf(
'
UPDATE
%s profiles
INNER JOIN
(
SELECT
contacts.profile AS profile,
BIT_OR(recipients.standing) AS standing
FROM
%s contacts
LEFT JOIN
%s recipients ON recipients.id = contacts.recipient
WHERE
contacts.profile IN (
SELECT
contact_identities.profile
FROM
%s contact_identities
WHERE
contact_identities.recipient = ?
)
GROUP BY
contacts.profile
) tbl ON tbl.profile = profiles.id
SET
profiles.standing = tbl.standing,
profiles.contactability = %s,
profiles.reachability = %s
',
$metadatas[Profile::class]->getTableName(),
$metadatas[ProfileContact::class]->getTableName(),
$metadatas[AbstractRecipient::class]->getTableName(),
$metadatas[ProfileContact::class]->getTableName(),
// NOTE: the order of the fields in the set clause matters, as the later ones use the now-modified values of the ones before!
Reachability::databaseStandingsToContactability(sprintf(
'profiles.%s',
'standing',
)),
Reachability::databaseContactabilityToReachability(sprintf(
'profiles.%s',
'contactability',
)),
))),
[$recipient],
[PDO::PARAM_INT]
);
} catch (\Exception) {
// NOOP
}
// then, force an update of all students attached to the profiles
// need to pull all the profiles for a student and mash the standings together
// do a native query for this, as it is too complex for dql...
try {
// NOTE: this query attempts to (in order of deepest nested clause to outermost clause)
// 1. use the recipient information (passed into this method) to determine which profiles are affected (by way of the profile contacts)
// 2. use the affected profiles to find the affected students (by way of the profile relationships)
// 3. use the affected students to ensure that the outermost relationships query only relates to the affected students
// 4. use the outermost relationships query to pull the student id and to compress all related profile standings into one usable value
// 5. finally perform the updates on the given fields using the matched rows from the join
$this->em->getConnection()->executeStatement(
preg_replace('/\s+/', ' ', trim(sprintf(
'
UPDATE
%s students
INNER JOIN
(
SELECT
relationships.student AS student,
BIT_OR(profiles.standing) AS standing
FROM
%s relationships
LEFT JOIN
%s profiles ON profiles.id = relationships.profile
WHERE
relationships.student IN (
SELECT
relationship_identities.student
FROM
%s relationship_identities
WHERE
relationship_identities.profile IN (
SELECT
contacts.profile
FROM
%s contacts
WHERE
contacts.recipient = ?
)
)
GROUP BY
relationships.student
) tbl ON tbl.student = students.id
SET
students.standing = tbl.standing,
students.contactability = %s,
students.reachability = %s
',
$metadatas[Student::class]->getTableName(),
$metadatas[ProfileRelationship::class]->getTableName(),
$metadatas[Profile::class]->getTableName(),
$metadatas[ProfileRelationship::class]->getTableName(),
$metadatas[ProfileContact::class]->getTableName(),
// NOTE: the order of the fields in the set clause matters, as the later ones use the now-modified values of the ones before!
Reachability::databaseStandingsToContactability(sprintf(
'students.%s',
'standing',
)),
Reachability::databaseContactabilityToReachability(sprintf(
'students.%s',
'contactability',
)),
))),
[$recipient],
[PDO::PARAM_INT]
);
} catch (\Exception) {
// NOOP
}
}
}