Skip to content

Instantly share code, notes, and snippets.

@santaklouse
Last active March 26, 2020 23:39
Show Gist options
  • Select an option

  • Save santaklouse/8f8dc8270b784745cfbf9b788d39d5ba to your computer and use it in GitHub Desktop.

Select an option

Save santaklouse/8f8dc8270b784745cfbf9b788d39d5ba to your computer and use it in GitHub Desktop.
Symfony 3: example of Scheduled emails workflow
<?php
use Symfony\Component\HttpKernel\Kernel;
use Symfony\Component\Config\Loader\LoaderInterface;
class AppKernel extends Kernel
{
public function registerBundles()
{
$bundles = array(
...
new Symfony\Bundle\SwiftmailerBundle\SwiftmailerBundle(),
new Rewieer\TaskSchedulerBundle\RewieerTaskSchedulerBundle()
);
return $bundles;
}
...
}
swiftmailer:
disable_delivery: '%app.notifications.disabled%'
transport: '%app.notifications.mailer_transport%'
host: '%app.notifications.mailer_host%'
username: '%app.notifications.mailer_user%'
password: '%app.notifications.mailer_password%'
spool:
type: file
path: '%kernel.cache_dir%/swiftmailer/spool'
app.notifications.disabled: false
app.notifications.mailer_transport: smtp
app.notifications.mailer_host: 127.0.0.1
app.notifications.mailer_user: ~
app.notifications.mailer_password: ~
# in minutes
app.email_scheduler.check_timeout: 30
app.email_scheduler.mailer_spool_flush_timeout: 5
app.email_scheduler.default_schedule_period: 7 days
{
...
"repositories": [
...
{ "type": "git", "url": "https://github.com/rewieer/TaskSchedulerBundle.git" },
{ "type": "git", "url": "https://github.com/dragonmantank/cron-expression.git" }
],
"require": {
"php": ">=7.1",
...
"symfony/swiftmailer-bundle": "~2.6",
"rewieer/taskschedulerbundle": "^0.3.0",
"symfony/process": "^3.4"
},
...
}
...
#setup task scheduler cron job
RUN (crontab -l ; echo "* * * * * $(which php) /srv/www/app/console ts:run >> /var/log/cron.log 2>&1") | crontab \
&& touch /var/log/cron.log
CMD ["cron", "-f"]
<?php
namespace App\Command;
use DateTime;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use App\Helper\Arr;
use App\Repository\DirectoryRepository;
use App\Service\ScheduledEmail;
class TestCommand extends ContainerAwareCommand
{
protected static $defaultName = 'test:dump-queue';
/**
* @var ScheduledEmail
*/
private $scheduledEmailService;
/**
* @var DirectoryRepository
*/
private $directoryRepository;
/**
* @param string|null $name The name of the command; passing null means it must be set in configure()
*
* @throws LogicException When the command name is empty
*/
public function __construct($name = null)
{
parent::__construct($name);
}
/**
* {@inheritdoc}
*/
protected function configure()
{
$this
->setName('test:dump-queue')
->setDescription('Dump scheduled and spooled emails')
->addArgument('type', InputArgument::OPTIONAL, 'Type of queue to dump (can be: all, mailer, scheduler)', 'all')
->addOption('addTask', null, InputOption::VALUE_OPTIONAL, 'add task to scheduler')
->addOption('addEmail', null,InputOption::VALUE_NONE, 'add email to mailer spool')
->addOption('flushSpool', null,InputOption::VALUE_NONE, 'send spooled emails')
->addOption('checkSchedule', null,InputOption::VALUE_NONE, 'update schedule and run tasks if needed')
->addOption('clear', 'c',InputOption::VALUE_NONE, 'clear scheduler storage')
->setHidden(true)
;
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->scheduledEmailService = $this->getContainer()->get('app.service.scheduled_email');
$this->directoryRepository = $this->getContainer()->get('app.repository.directory');
if ($input->getOption('clear') !== FALSE) {
$this->scheduledEmailService->getRepository()->clearStorage();
}
if ($input->getOption('addTask') !== NULL) {
$output->writeln('..adding 1 task to schedule...');
$shastaEmail = $this->directoryRepository->getShastaGroupEmail();
list($paEmails, $pmEmails, $qaEmails) = [['[email protected]'],['[email protected]'],['[email protected]']];
$domainTeams = array_merge($paEmails, $pmEmails, $qaEmails);
$message = $this->createMessage($shastaEmail,
$domainTeams,
'Request Reviewed',
'reviewed-request',
[
'request_id' => 1,
'domain_team_name' => 'Team Name'
]);
$period = $input->getOption('addTask') ?: '1 hour';
$this->scheduledEmailService->addToSchedule($message, $period);
}
if ($input->getOption('addEmail') !== FALSE) {
$output->writeln('..adding 1 email to spool...');
$shastaEmail = $this->directoryRepository->getShastaGroupEmail();
list($paEmails, $pmEmails, $qaEmails) = [['[email protected]'],['[email protected]'],['[email protected]']];
$domainTeams = array_merge($paEmails, $pmEmails, $qaEmails);
$message = $this->createMessage($shastaEmail,
$domainTeams,
'Request Reviewed2',
'reviewed-request',
[
'request_id' => 2,
'domain_team_name' => 'Team Name2'
]);
$this->scheduledEmailService->addToMailerSpoolQueue($message);
}
if ($input->getOption('flushSpool') !== FALSE) {
$this->scheduledEmailService->forceFlushSpool();
}
if ($input->getOption('checkSchedule') !== FALSE) {
$this->scheduledEmailService->checkScheduleAndSendIfNeeded();
}
switch ($input->getArgument('type')) {
case 'mailer':
$this->mailer($output);
break;
case 'scheduler':
$this->scheduler($output);
break;
case 'all':
default:
$this->mailer($output);
$this->scheduler($output);
break;
}
return 0;
}
private function scheduler(OutputInterface $output)
{
$tasks = $this->getScheduledTasks();
$output->writeln('');
$output->writeln('------------------------------------');
$output->writeln(count($tasks) . ' scheduled emails: ');
$output->writeln('------------------------------------');
$output->writeln(array_map(function($task){
$email = Arr::get($task, 'message');
$to = '';
if (is_array($email->getTo())) {
$to .= implode(';', array_keys($email->getTo()));
} else {
$to .= $email->getTo();
}
if (is_array($email->getCc())) {
$to .= implode(';', array_keys($email->getCc()));
} else {
$to .= $email->getCc();
}
return implode(
[
'ID: ' . $email->getId(),
'Status: ' . Arr::get($task, 'status'),
'Subject: ' . $email->getSubject(),
'To: ' . $to,
'Start date: ' . Arr::get($task, 'scheduled_on')->format(DateTime::ATOM)
],
' | '
);
}, $tasks));
$output->writeln('------------------------------------');
}
/**
* @param OutputInterface $output
*/
private function mailer(OutputInterface $output)
{
$emails = $this->scheduledEmailService->getMailerSpooledEmails();
$output->writeln('');
$output->writeln('------------------------------------');
$output->writeln(count($emails) . ' emails in mailer spool: ');
$output->writeln('------------------------------------');
$output->writeln(array_map(function($email){
$to = '';
if (is_array($email->getTo())) {
$to .= implode(';', array_keys($email->getTo()));
} else {
$to .= $email->getTo();
}
if (is_array($email->getCc())) {
$to .= implode(';', array_keys($email->getCc()));
} else {
$to .= $email->getCc();
}
return implode(
[
'ID: '.$email->getId(),
'Subject: ' . $email->getSubject(),
'To: ' . $to
],
' | '
);
}, $emails));
$output->writeln('------------------------------------');
}
private function getScheduledTasks()
{
return $this->scheduledEmailService->getRepository()->getAll();
}
private function createMessage($to, $cc, $subject, $emailTemplate, array $emailParameters = [], $from = null)
{
$message = new \Swift_Message($subject);
$body = $this->getContainer()->get('twig')->render(
sprintf('@UpworkSuitEventsDictionary/emails/%s.html.twig', $emailTemplate),
$emailParameters
);
$message
->setFrom($from ?? '[email protected]')
->setBody($body, 'text/html');
$filterEmpty = function($emails) {
return array_filter((array)$emails, function($email){ return !empty($email); });
};
if ($to) {
$message->setTo($filterEmpty($to));
}
if ($cc) {
$message->setCc($filterEmpty($cc));
}
return $message;
}
}
<?php
namespace App\EventListener;
use App\Service\ScheduledEmail;
class EmailSentListener implements \Swift_Events_SendListener
{
private $service;
/**
* @param ScheduledEmail $scheduledEmail
*/
public function __construct(ScheduledEmail $scheduledEmail)
{
$this->service = $scheduledEmail;
}
/**
* Invoked immediately before the Message is sent.
*
* @param \Swift_Events_SendEvent $evt
*/
public function beforeSendPerformed(\Swift_Events_SendEvent $evt)
{
}
/**
* Invoked immediately after the Message is sent.
*
* @param \Swift_Events_SendEvent $evt
*/
public function sendPerformed(\Swift_Events_SendEvent $evt)
{
$repository = $this->service->getRepository();
$id = str_replace('.', '', $evt->getMessage()->getId());
$task = $repository->findById($id, false);
if (!$repository->renew($task)) {
$repository->markAsDone($task['id']);
}
}
}
<?php
namespace App\Helper;
use function is_array;
use Symfony\Component\Console\Helper\Helper;
use Symfony\Component\Console\Helper\HelperSet;
/**
* Mix of illuminate/Support/Arr and Kohana's Arr helper
*
* Class ArrHelper
* @package App\Helper
*/
class Arr extends Helper
{
private static $delimiter = '.';
/**
* Returns the canonical name of this helper.
*
* @return string The canonical name
*/
public function getName()
{
return 'arr';
}
/**
* Return the default value of the given value.
*
* @param mixed $value
* @return mixed
*/
protected static function value($value)
{
return $value instanceof \Closure ? $value() : $value;
}
/**
* Determine whether the given value is array accessible.
*
* @param mixed $value
* @return bool
*/
public static function accessible($value)
{
return self::isArray($value);
}
/**
* Add an element to an array using "dot" notation if it doesn't exist.
*
* @param array $array
* @param string $key
* @param mixed $value
* @return array
*/
public static function add($array, $key, $value)
{
if (is_null(self::get($array, $key))) {
self::set($array, $key, $value);
}
return $array;
}
/**
* Collapse an array of arrays into a single array.
*
* @param iterable $array
* @return array
*/
public static function collapse($array)
{
$results = [];
foreach ($array as $values) {
if (!is_array($values)) {
continue;
}
$results[] = $values;
}
return array_merge([], ...$results);
}
/**
* Cross join the given arrays, returning all possible permutations.
*
* @param iterable ...$arrays
* @return array
*/
public static function crossJoin(...$arrays)
{
$results = [[]];
foreach ($arrays as $index => $array) {
$append = [];
foreach ($results as $product) {
foreach ($array as $item) {
$product[$index] = $item;
$append[] = $product;
}
}
$results = $append;
}
return $results;
}
/**
* Divide an array into two arrays. One with keys and the other with values.
*
* @param array $array
* @return array
*/
public static function divide($array)
{
return [array_keys($array), array_values($array)];
}
/**
* Flatten a multi-dimensional associative array with dots.
*
* @param iterable $array
* @param string $prepend
* @return array
*/
public static function dot($array, $prepend = '')
{
$results = [];
foreach ($array as $key => $value) {
if (is_array($value) && !empty($value)) {
$results = array_merge($results, self::dot($value, $prepend . $key . '.'));
} else {
$results[$prepend . $key] = $value;
}
}
return $results;
}
/**
* Get all of the given array except for a specified array of keys.
*
* @param array $array
* @param array|string $keys
* @return array
*/
public static function except($array, $keys)
{
self::forget($array, $keys);
return $array;
}
/**
* Determine if the given key exists in the provided array.
*
* @param array $array
* @param string|int $key
* @return bool
*/
public static function exists($array, $key)
{
return array_key_exists($key, $array);
}
/**
* @param mixed $array
* @param string|int|null $key
* @param string|int|null $compareTo
* @return bool
*/
public static function existsAndEqual($array, $key, $compareTo)
{
if (!self::isArray($array)) {
return FALSE;
}
return self::get($array, $key) == $compareTo;
}
/**
* Return the first element in an array passing a given truth test.
*
* @param iterable $array
* @param callable|null $callback
* @param mixed $default
* @return mixed
*/
public static function first($array, callable $callback = null, $default = null)
{
if (is_null($callback)) {
if (empty($array)) {
return self::value($default);
}
foreach ($array as $item) {
return $item;
}
}
foreach ($array as $key => $value) {
if ($callback($value, $key)) {
return $value;
}
}
return self::value($default);
}
/**
* Return the last element in an array passing a given truth test.
*
* @param array $array
* @param callable|null $callback
* @param mixed $default
* @return mixed
*/
public static function last($array, callable $callback = null, $default = null)
{
if (is_null($callback)) {
return empty($array) ? self::value($default) : end($array);
}
return self::first(array_reverse($array, true), $callback, $default);
}
/**
* Flatten a multi-dimensional array into a single level.
*
* @param iterable $array
* @param int $depth
* @return array
*/
public static function flatten($array, $depth = INF)
{
$result = [];
foreach ($array as $item) {
$item = $item instanceof Collection ? $item->all() : $item;
if (!is_array($item)) {
$result[] = $item;
} else {
$values = $depth === 1
? array_values($item)
: self::flatten($item, $depth - 1);
foreach ($values as $value) {
$result[] = $value;
}
}
}
return $result;
}
/**
* Remove one or many array items from a given array using "dot" notation.
*
* @param array $array
* @param array|string $keys
* @return void
*/
public static function forget(&$array, $keys)
{
$original = &$array;
if (!is_array($keys)) {
$keys = (array)$keys;
}
if (count($keys) === 0) {
return;
}
foreach ($keys as $key) {
// if the exact key exists in the top-level, remove it
if (self::exists($array, $key)) {
unset($array[$key]);
continue;
}
$parts = explode('.', $key);
// clean up before each pass
$array = &$original;
while (count($parts) > 1) {
$part = array_shift($parts);
if (isset($array[$part]) && is_array($array[$part])) {
$array = &$array[$part];
} else {
continue 2;
}
}
unset($array[array_shift($parts)]);
}
}
/**
* Get an item from an array using "dot" notation.
*
* @param array $array
* @param string|int|null $key
* @param mixed $default
* @return mixed
*/
public static function get($array, $key, $default = null)
{
if (!self::accessible($array)) {
return self::value($default);
}
if (is_null($key)) {
return $array;
}
if (self::exists($array, $key)) {
return $array[$key];
}
if (strpos($key, '.') === false) {
return $array[$key] ?? self::value($default);
}
foreach (explode('.', $key) as $segment) {
if (self::accessible($array) && self::exists($array, $segment)) {
$array = $array[$segment];
} else {
return self::value($default);
}
}
return $array;
}
/**
* Check if an item or items exist in an array using "dot" notation.
*
* @param array $array
* @param string|array $keys
* @return bool
*/
public static function has($array, $keys)
{
$keys = (array)$keys;
if (!$array || $keys === []) {
return false;
}
foreach ($keys as $key) {
$subKeyArray = $array;
if (self::exists($array, $key)) {
continue;
}
foreach (explode('.', $key) as $segment) {
if (self::accessible($subKeyArray) && self::exists($subKeyArray, $segment)) {
$subKeyArray = $subKeyArray[$segment];
} else {
return false;
}
}
}
return true;
}
/**
* Determines if an array is associative.
*
* An array is "associative" if it doesn't have sequential numerical keys beginning with zero.
*
* @param array $array
* @return bool
*/
public static function isAssoc(array $array)
{
$keys = array_keys($array);
return array_keys($keys) !== $keys;
}
/**
* Get a subset of the items from the given array.
*
* @param array $array
* @param array|string $keys
* @return array
*/
public static function only($array, $keys)
{
return array_intersect_key($array, array_flip((array)$keys));
}
/**
* Pluck an array of values from an array.
*
* @param iterable $array
* @param string|array $value
* @param string|array|null $key
* @return array
*/
public static function pluck($array, $value, $key = null)
{
$results = [];
[$value, $key] = self::explodePluckParameters($value, $key);
foreach ($array as $item) {
$itemValue = self::path($item, $value);
// If the key is "null", we will just append the value to the array and keep
// looping. Otherwise we will key the array using the value of the key we
// received from the developer. Then we'll return the final array form.
if (is_null($key)) {
$results[] = $itemValue;
} else {
$itemKey = self::path($item, $key);
if (is_object($itemKey) && method_exists($itemKey, '__toString')) {
$itemKey = (string)$itemKey;
}
$results[$itemKey] = $itemValue;
}
}
return $results;
}
/**
* Explode the "value" and "key" arguments passed to "pluck".
*
* @param string|array $value
* @param string|array|null $key
* @return array
*/
protected static function explodePluckParameters($value, $key)
{
$value = is_string($value) ? explode('.', $value) : $value;
$key = is_null($key) || is_array($key) ? $key : explode('.', $key);
return [$value, $key];
}
/**
* Push an item onto the beginning of an array.
*
* @param array $array
* @param mixed $value
* @param mixed $key
* @return array
*/
public static function prepend($array, $value, $key = null)
{
if (is_null($key)) {
array_unshift($array, $value);
} else {
$array = [$key => $value] + $array;
}
return $array;
}
/**
* Get a value from the array, and remove it.
*
* @param array $array
* @param string $key
* @param mixed $default
* @return mixed
*/
public static function pull(&$array, $key, $default = null)
{
$value = self::get($array, $key, $default);
self::forget($array, $key);
return $value;
}
/**
* Get one or a specified number of random values from an array.
*
* @param array $array
* @param int|null $number
* @return mixed
*
* @throws \InvalidArgumentException
*/
public static function random($array, $number = null)
{
$requested = is_null($number) ? 1 : $number;
$count = count($array);
if ($requested > $count) {
throw new \InvalidArgumentException(
"You requested {$requested} items, but there are only {$count} items available."
);
}
if (is_null($number)) {
return $array[array_rand($array)];
}
if ((int)$number === 0) {
return [];
}
$keys = array_rand($array, $number);
$results = [];
foreach ((array)$keys as $key) {
$results[] = $array[$key];
}
return $results;
}
/**
* Set an array item to a given value using "dot" notation.
*
* If no key is given to the method, the entire array will be replaced.
*
* @param array $array
* @param string $key
* @param mixed $value
* @return array
*/
public static function set(&$array, $key, $value)
{
if (is_null($key)) {
return $array = $value;
}
$keys = explode('.', $key);
while (count($keys) > 1) {
$key = array_shift($keys);
// If the key doesn't exist at this depth, we will just create an empty array
// to hold the next value, allowing us to create the arrays to hold final
// values at the correct depth. Then we'll keep digging into the array.
if (!isset($array[$key]) || !is_array($array[$key])) {
$array[$key] = [];
}
$array = &$array[$key];
}
$array[array_shift($keys)] = $value;
return $array;
}
/**
* Shuffle the given array and return the result.
*
* @param array $array
* @param int|null $seed
* @return array
*/
public static function shuffle($array, $seed = null)
{
if (is_null($seed)) {
shuffle($array);
} else {
mt_srand($seed);
shuffle($array);
mt_srand();
}
return $array;
}
/**
* Recursively sort an array by keys and values.
*
* @param array $array
* @return array
*/
public static function sortRecursive($array)
{
foreach ($array as &$value) {
if (is_array($value)) {
$value = self::sortRecursive($value);
}
}
if (self::isAssoc($array)) {
ksort($array);
} else {
sort($array);
}
return $array;
}
/**
* Convert the array into a query string.
*
* @param array $array
* @return string
*/
public static function query($array)
{
return http_build_query($array, null, '&', PHP_QUERY_RFC3986);
}
/**
* Filter the array using the given criteria array
*
* @param array $array
* @param array $filterItems
* @return array
*/
public static function where(array $array, array $filterItems)
{
$result = [];
foreach($array as $element)
{
foreach ($filterItems as $key => $value)
{
if (key_exists($key, $element) && $element[$key] === $value) {
$result[] = $element;
}
}
}
return $result;
}
/**
* If the given value is not an array and not null, wrap it in one.
*
* @param mixed $value
* @return array
*/
public static function wrap($value)
{
if (is_null($value)) {
return [];
}
return is_array($value) ? $value : [$value];
}
/**
* Determine whether the given value is array
*
* @param mixed $value
* @return bool
*/
public static function isArray($value)
{
if (is_array($value)) {
// Definitely an array
return TRUE;
} else {
// Possibly a Traversable object, functionally the same as an array
return (is_object($value) AND $value instanceof \Traversable);
}
}
/**
* Determine whether the given value is associative array
*
* @param array $array
* @return bool
*/
public static function is_assoc(array $array)
{
// Keys of the array
$keys = array_keys($array);
// If the array keys of the keys match the keys, then the array must
// not be associative (e.g. the keys array looked like {0:0, 1:1...}).
return array_keys($keys) !== $keys;
}
/**
* Gets a value from an array using a dot separated path.
*
* @param array $array required - Array to search
* @param mixed $path required - Key path string (delimiter separated) or array of keys
* @param mixed $default = NULL - Default value if the path is not set
* @param string $delimiter = NULL - Key path delimiter
* @return array|null
*/
public static function path($array, $path, $default = NULL, $delimiter = NULL)
{
if (!self::isArray($array)) {
// This is not an array!
return $default;
}
if (is_array($path)) {
// The path has already been separated into keys
$keys = $path;
} else {
if (array_key_exists($path, $array)) {
// No need to do extra processing
return $array[$path];
}
if ($delimiter === NULL) {
// Use the default delimiter
$delimiter = self::$delimiter;
}
// Remove starting delimiters and spaces
$path = ltrim($path, "{$delimiter} ");
// Remove ending delimiters, spaces, and wildcards
$path = rtrim($path, "{$delimiter} *");
// Split the keys by delimiter
$keys = explode($delimiter, $path);
}
do {
$key = array_shift($keys);
if (ctype_digit($key)) {
// Make the key an integer
$key = (int)$key;
}
if (isset($array[$key])) {
if (!$keys) {
// Found the path requested
return $array[$key];
}
if (!self::isArray($array[$key])) {
// Unable to dig deeper
break;
}
// Dig down into the next part of the path
$array = $array[$key];
} elseif ($key === '*') {
// Handle wildcards
$values = array();
foreach ($array as $arr) {
if ($value = self::path($arr, implode('.', $keys))) {
$values[] = $value;
}
}
if (!$values) {
// Unable to dig deeper
break;
}
// Found the values requested
return $values;
} else {
// Unable to dig deeper
break;
}
} while ($keys);
// Unable to find the value requested
return $default;
}
}
<?php
namespace app\Repository;
use function array_filter;
use function array_keys;
use function array_walk;
use DateTimeZone;
use function gzinflate;
use function is_array;
use function str_replace;
use function touch;
use function unserialize;
use app\Helper\Arr;
class ScheduledEmailsRepository
{
private $queue = [];
CONST CACHE_DIR = '/scheduled_emails/storage/';
CONST CACHE_FILENAME = 'tasks.tmp';
CONST STATUS_PENDING = 0;
CONST STATUS_READY = 1;
CONST STATUS_ON_THE_GO = 2;
CONST STATUS_DONE = 3;
private $path;
/**
* @param \PDO $connection
* @param $appCacheDir -
*/
public function __construct($appCacheDir)
{
$this->path = self::createStorageIfNeeded($appCacheDir);
$this->queue = $this->getAll();
}
/**
* @param \Swift_Message $message
* @param string $period
* @param bool $renewable
* @return string
* @example ->add($message, '+7 days')
*/
public function add(\Swift_Message $message, $period, $renewable = true)
{
if ($period[0] !== '+') {
$period = '+' . $period;
}
$id = str_replace('.', '', $message->generateId());
Arr::set($this->queue, $id, [
'id' => $id,
'status' => self::STATUS_PENDING,
'scheduled_on' => (new \DateTime('now', new \DateTimeZone('UTC')))->modify($period),
'period' => $period,
'message' => $message,
'renewable' => $renewable
]);
$this->syncStorage();
return $id;
}
/**
* @param $id
* @param $item
*/
public function update($id, $item)
{
Arr::set($this->queue, $id, $item);
$this->syncStorage();
}
public function renew($item)
{
if (!Arr::get($item, 'renewable')) {
return FALSE;
}
$period = Arr::get($item, 'period');
Arr::set($item, 'status', self::STATUS_PENDING);
Arr::set($item, 'scheduled_on', (new \DateTime('now', new \DateTimeZone('UTC')))->modify($period));
$this->update(Arr::get($item, 'id'), $item);
return TRUE;
}
/**
* @param $id
*/
public function markAsInWork($id)
{
$this->markAs($id, self::STATUS_ON_THE_GO);
$this->syncStorage();
}
/**
* @param $id
*/
public function markAsDone($id)
{
$this->markAs($id, self::STATUS_DONE);
$this->syncStorage();
}
/**
* Force remove all tasks
*
*/
public function clearStorage()
{
$this->queue = [];
return $this->syncStorage();
}
/**
* @param $keys
* @param $status
* @return ScheduledEmailsRepository
*/
public function updateStatuses($keys = [], $status = self::STATUS_READY)
{
if (!count($keys)) {
$this->getAllReadyToSend();
return $this;
}
foreach($this->queue as $key => &$item) {
if (in_array($key, $keys)) {
Arr::set($item, 'status', $status);
}
}
$this->syncStorage();
return $this;
}
/**
* @return array
*/
public function getAllReadyToSend()
{
$now = new \DateTime('now', new DateTimeZone('UTC'));
$items = array_filter($this->getPending(), function($item) use ($now) {
dump($item['scheduled_on']);
dump($now);
return $item['scheduled_on'] <= $now;
});
$keys = Arr::pluck($items, 'id');
if (count($keys)) {
$this->updateStatuses($keys, self::STATUS_READY);
}
$items = Arr::where($this->queue, ['status' => self::STATUS_READY]);
return $items;
}
public function getPending()
{
return Arr::where($this->queue, ['status' => self::STATUS_PENDING]);
}
public function getDone()
{
return Arr::where($this->queue, ['status' => self::STATUS_DONE]);
}
/**
* @param array $val
* @return array
*/
public function findBy(array $val)
{
return Arr::where($this->queue, $val);
}
/**
* @param $id
* @param bool $onlyActive
* @return mixed
*/
public function findById($id, $onlyActive = true)
{
$needle = ['id' => $id];
if ($onlyActive) {
$needle += ['status' => self::STATUS_PENDING];
}
$result = Arr::where($this->queue, $needle);
return reset($result);
}
/**
* @param $id
* @return false|int|string
*/
public function findKeyById($id)
{
$tmp = array_combine(
array_keys($this->queue),
array_column($this->queue, 'id')
);
return array_search($id, $tmp);
}
/**
* @return mixed
*/
public function getAll()
{
return $this->decodeStorage();
}
/**
* @param string|array $id
* @param int $status
*/
private function markAs($id, $status)
{
$self = $this;
$setStatusDone = function($id) use ($self, $status) {
Arr::set($self->queue, $id . '.status', $status);
};
if (is_array($id)) {
array_walk($id, $setStatusDone);
} else {
$setStatusDone($id);
}
}
/**
* @param $appCacheDir
* @return string
*/
private static function createStorageIfNeeded($appCacheDir)
{
$cachePath = $appCacheDir . self::CACHE_DIR;
if (!file_exists($cachePath)) {
mkdir($cachePath, 0755, TRUE);
}
$filePath = $cachePath . self::CACHE_FILENAME;
if (!file_exists($filePath)) {
touch($filePath);
}
return $filePath;
}
/**
* @return mixed
*/
private function readStorageFile()
{
$content = file_get_contents($this->path);
if (!$content) {
return [];
}
return unserialize(gzinflate($content));
}
/**
* @return mixed
*/
private function decodeStorage()
{
$list = $this->readStorageFile();
foreach ($list as &$message) {
$message = unserialize(gzinflate($message));
}
return $list;
}
/**
* @return string
*/
private function prepareToSave()
{
$list = $this->queue;
foreach ($list as &$message) {
$message = gzdeflate(serialize($message), 9);
}
return gzdeflate(serialize($list), 9);
}
/**
* @return int
*/
private function syncStorage()
{
$bytes = file_put_contents($this->path, $this->prepareToSave());
if (!$bytes) {
throw Exception('Error write queues to file storage. Zero bytes written');
}
return $bytes;
}
}
app.repository.scheduled_emails:
class: app\Repository\ScheduledEmailsRepository
arguments:
- '%kernel.cache_dir%'
app.service.mailer:
class: App\Service\Mailer
arguments:
- '@mailer'
- '@service_container'
app.service.spool_emails_sender:
class: App\Tasks\SpoolEmailsSender
arguments:
- '%kernel.project_dir%'
- '@app.service.scheduled_email'
- '%app.email_scheduler.mailer_spool_flush_timeout%'
tags:
- { name: ts.task }
app.service.scheduled_emails_processor:
class: App\Tasks\ScheduledEmailsProcessor
arguments:
- '@app.service.scheduled_email'
- '%app.email_scheduler.check_timeout%'
tags:
- { name: ts.task }
app.service.scheduled_email:
class: App\Service\ScheduledEmail
arguments:
- '@app.repository.scheduled_emails'
- '@app.service.mailer'
- '%kernel.project_dir%'
app.listener.email_sent:
class: App\EventListener\EmailSentListener
arguments:
- '@app.service.scheduled_email'
tags:
- { name: swiftmailer.default.plugin, event: \Swift_Events_SendEvent, method: sendPerformed }
- { name: kernel.event_listener, event: \Swift_Events_SendEvent, method: sendPerformed }
<?php
namespace App\Service;
use DirectoryIterator;
use Swift_Mailer;
use Swift_Message;
use Swift_Spool;
use Symfony\Component\DependencyInjection\Container;
class Mailer
{
const DEFAULT_MESSAGE_LIMIT = 20;
const DEFAULT_TIME_LIMIT = 10;
const DEFAULT_RECOVER_TIMEOUT = 90;
private $mailer;
private $container;
public function __construct(Swift_Mailer $mailer, Container $container)
{
$this->mailer = $mailer;
$this->container = $container;
}
/**
* @return Container
*/
private function getContainer()
{
return $this->container;
}
/**
* @return bool|\Swift_Transport
*/
private function getTransport()
{
$transport = $this->mailer->getTransport();
if (!($transport instanceof \Swift_Transport_SpoolTransport)) {
return FALSE;
}
return $transport;
}
/**
* @return Swift_Spool
*/
private function getSpool()
{
$spool = $this->getTransport()->getSpool();
if (!($spool instanceof \Swift_ConfigurableSpool)) {
return FALSE;
}
return $spool;
}
/**
* Adds message to spool queue
*
* @param Swift_Message $message
* @return int
*/
public function send(Swift_Message $message)
{
return $this->mailer->send($message);
}
public function immediatelySend(Swift_Message $message)
{
$this->send($message);
return $this->flushSpoolQueue();
}
/**
* Adds message to spool queue
*
* @param Swift_Message $message
* @return bool
*/
public function queueMessage(Swift_Message $message)
{
return $this
->getSpool()
->queueMessage($message);
}
/**
* @return int
*/
public function flushSpoolQueue()
{
$spool = $this->getSpool();
$spool->setMessageLimit(self::DEFAULT_MESSAGE_LIMIT);
$spool->setTimeLimit(self::DEFAULT_TIME_LIMIT);
if ($spool instanceof \Swift_FileSpool) {
$spool->recover(self::DEFAULT_RECOVER_TIMEOUT);
}
$transport = $this->getTransport();
return $spool->flushQueue($transport);
}
/**
* @return mixed
*/
private function getSpoolPath()
{
$container = $this->getContainer();
return $container->getParameter(
sprintf('swiftmailer.spool.%s.file.path', $container->getParameter('swiftmailer.default_mailer'))
);
}
/**
* @return Swift_Message[]
*/
public function getSpooledEmails()
{
$path = $this->getSpoolPath();
$directoryIterator = new DirectoryIterator($path);
$result = [];
foreach ($directoryIterator as $file) {
$file = $file->getRealPath();
if (substr($file, -8) != '.message') {
continue;
}
$message = unserialize(file_get_contents($file));
$result[] = $message;
}
return $result;
}
/**
* @return int
*/
public function getSpooledEmailsCount()
{
$count = 0;
$directoryIterator = new DirectoryIterator($this->getSpoolPath());
foreach ($directoryIterator as $file) {
if (substr($file->getRealPath(), -8) != '.message') {
continue;
}
$count++;
}
return $count;
}
}
<?php
namespace App\Service;
use Swift_Message;
use Symfony\Component\Process\Exception\ProcessFailedException;
use Symfony\Component\Process\PhpExecutableFinder;
use Symfony\Component\Process\Process;
use App\Helper\Arr;
use App\Repository\ScheduledEmailsRepository;
class ScheduledEmail
{
/**
* @var ScheduledEmailsRepository
*/
private $repository;
/**
* @var Mailer
*/
private $mailer;
private $projectRoot;
/**
* @param ScheduledEmailsRepository $scheduledEmailsRepository
* @param Mailer $mailer
* @param $projectRoot
*/
public function __construct(ScheduledEmailsRepository $scheduledEmailsRepository, Mailer $mailer, $projectRoot)
{
$this->repository = $scheduledEmailsRepository;
$this->mailer = $mailer;
$this->projectRoot = $projectRoot;
}
public function getMailer()
{
return $this->mailer;
}
/**
* @return ScheduledEmailsRepository
*/
public function getRepository()
{
return $this->repository;
}
public function checkScheduleAndSendIfNeeded()
{
$items = $this->repository
->updateStatuses()
->getAllReadyToSend();
$this
->addToMailerSpoolQueue(Arr::pluck($items, 'message'))
->repository->markAsInWork(Arr::pluck($items, 'id'));
return count($items);
}
public function addToMailerSpoolQueue($message)
{
$mailer = $this->getMailer();
$queueMessage = function($message) use($mailer) {
$mailer->queueMessage($message);
};
if (is_array($message)) {
array_walk($message, $queueMessage);
} else {
$queueMessage($message);
}
return $this;
}
/**
* @return int - count of mails that were sent
*/
public function flushMailerSpool()
{
return $this->mailer->flushSpoolQueue();
}
/**
* Send all emails from mailer spool
*/
public function forceFlushSpool()
{
$phpBinaryPath = (new PhpExecutableFinder())->find();
$process = new Process([
$phpBinaryPath,
$this->projectRoot . '/app/console',
'swiftmailer:spool:send'
]);
try {
$process->mustRun();
echo $process->getOutput();
} catch (ProcessFailedException $exception) {
echo $exception->getMessage();
}
}
/**
* Add message to queue and send after $period of time
*
* @param Swift_Message $message
* @param $period - something like: 7 days
* @return string - id of new queue item
*/
public function addToSchedule(Swift_Message $message, $period)
{
return $this->repository->add($message, $period);
}
public function immediatelySend(Swift_Message $message)
{
return $this->mailer->immediatelySend($message);
}
/**
* @return int
*/
public function emailsInMailerSpoolCount()
{
return $this->mailer->getSpooledEmailsCount();
}
/**
* @return \Swift_Message[]
*/
public function getMailerSpooledEmails()
{
return $this->mailer->getSpooledEmails();
}
}
<?php
namespace App\Tasks;
use Rewieer\TaskSchedulerBundle\Task\AbstractScheduledTask;
use Rewieer\TaskSchedulerBundle\Task\Schedule;
use App\ScheduledEmail;
class ScheduledEmailsProcessor extends AbstractScheduledTask
{
private $service;
private $timeout;
public function __construct(ScheduledEmail $scheduledEmailService, $timeout)
{
$this->service = $scheduledEmailService;
$this->timeout = (int)$timeout;
parent::__construct();
}
protected function initialize(Schedule $schedule)
{
$schedule
->everyMinutes($this->timeout);
}
public function run()
{
$this->service->checkScheduleAndSendIfNeeded();
}
}
<?php
namespace App\Tasks;
use Rewieer\TaskSchedulerBundle\Task\AbstractScheduledTask;
use Rewieer\TaskSchedulerBundle\Task\Schedule;
use App\Service\ScheduledEmail;
class SpoolEmailsSender extends AbstractScheduledTask
{
private $projectRoot;
private $scheduledEmailService;
private $timeout;
public function __construct($projectRoot, ScheduledEmail $scheduledEmail, $timeout)
{
$this->projectRoot = $projectRoot;
$this->scheduledEmailService = $scheduledEmail;
$this->timeout = (int)$timeout;
parent::__construct();
}
protected function initialize(Schedule $schedule)
{
$schedule
->everyMinutes($this->timeout);
}
public function run()
{
if ($this->scheduledEmailService->emailsInMailerSpoolCount()) {
$this->scheduledEmailService->forceFlushSpool();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment