Skip to content

Instantly share code, notes, and snippets.

@kmuenkel
Created August 9, 2019 05:31
Show Gist options
  • Save kmuenkel/ce7af20c4fed74dae626e99b5153e8ec to your computer and use it in GitHub Desktop.
Save kmuenkel/ce7af20c4fed74dae626e99b5153e8ec to your computer and use it in GitHub Desktop.
Fan-out the handling of items in an array using asynchronous queue jobs.
<?php
namespace App\Jobs;
use Carbon\Carbon;
use Illuminate\Bus\Queueable;
use App\Models\PayrollRequest;
use Illuminate\Support\Collection;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
/**
* Class CacheHydrationJob
* @package App\Jobs
*/
class InstanceHandlerJob implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
/**
* @var string
*/
protected $instanceKey;
/**
* @var array
*/
protected $item;
/**
* @var \Illuminate\Support\Carbon
*/
protected $expiration;
/**
* CacheHydrationJob constructor.
* @param string $instanceKey
* @param array $item
* @param PayrollRequest $requestLog
* @param Carbon $expiration
*/
public function __construct(
string $instanceKey,
array $item,
PayrollRequest $requestLog,
Carbon $expiration
) {
$this->queue = 'instance';
$this->instanceKey = $instanceKey;
$this->item = $item;
$this->expiration = $expiration;
}
/**
* @throws \Exception
*/
public function handle()
{
try {
//TODO: Handle each instance of the Overseer's data
} catch (\Exception $error) {
//
}
$promiseKey = $this->instanceKey.'_'.$this->item['id'];
cache([$promiseKey => $this->item['id']], $this->expiration);
}
}
<?php
namespace App\Jobs;
ini_set('memory_limit', '1024M');
exec('ulimit -S -n 2048');
use ErrorTracker;
use Carbon\Carbon;
use Ramsey\Uuid\Uuid;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
/**
* The purpose of this class is to ping the Dayforce API for Profile data asynchronously, so that it's available in
* cache for later use much faster than pulling them one at a time on-demand
*
* Class OverseerJob
* @package App\Jobs
*/
class OverseerJob implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
/**
* @var string
*/
protected $instanceKey;
/**
* @var int
*/
protected $timeout = 120;
/**
* @var int
*/
protected $interval = 3;
/**
* @var array
*/
protected $data;
/**
* @var bool
*/
protected $shouldInitiate = false;
/**
* @var \Illuminate\Support\Carbon
*/
protected $expiration;
/**
* @var array
*/
protected $promises = [];
/**
* @var int
*/
protected $completed = 0;
/**
* @var string
*/
protected $provider;
/**
* CacheHydrationOverseerJob constructor.
* @param ResponseCollection $data
* @param string|null $instanceKey
* @param Carbon|null $expiration
* @param array $promises
* @param int $completed
* @throws \Exception
*/
public function __construct(
ResponseCollection $data,
string $instanceKey = null,
$expiration = null,
array $promises = [],
int $completed = 0
) {
$this->queue = 'overseer';
$this->data = $data;
$this->expiration = $expiration ?: now()->addSeconds($this->timeout);
$this->promises = $promises;
$this->completed = $completed;
$this->instanceKey = $instanceKey;
if (!$this->instanceKey) {
$this->instanceKey = (string)Uuid::uuid1();
$this->shouldInitiate = true;
}
}
/**
* @param null|string $provider
* @return $this
*/
public function setProvider(?string $provider)
{
$this->provider = $provider;
return $this;
}
/**
* @throws \Exception
*/
public function handle()
{
$progress = [];
if ($this->shouldInitiate) {
/** @var array $item */
foreach ($this->data as $item) {
$promiseKey = $this->instanceKey.'_'.$item['id'];
$this->promises[] = $promiseKey;
cache([$promiseKey => false], now()->addMinutes(120));
$job = app(InstanceHandlerJob::class, [
'instanceKey' => $this->instanceKey,
'item' => $item,
'expiration' => now()->addMinutes(120)
]);
dispatch($job);
}
} else {
foreach ($this->promises as $promiseKey) {
$progress[$promiseKey] = cache($promiseKey);
}
$progress = array_filter($progress);
}
$completed = count($progress);
if (config('app.debug')) {
el($completed.' of '.count($this->promises));
}
if ($completed == count($this->promises)) {
//TODO: Proceed with the next step in the process here
} else {
if ($completed > $this->completed) {
$this->expiration = now()->addSeconds($this->timeout);
} elseif (!$this->checkTime()) {
return;
}
$this->completed = $completed;
$this->reQueue();
}
}
/**
* @return bool
* @throws \Exception
*/
protected function checkTime()
{
if (now()->greaterThanOrEqualTo($this->expiration)) {
$ids = [];
foreach ($this->promises as $index => $promiseKey) {
if (!cache($promiseKey)) {
$ids[] = last(explode('_', $promiseKey, 2));
}
}
foreach ($ids as $id) {
//TODO: Throw error 'Transmissions are taking too long. Remaining records: $ids'
}
if (empty($ids)) {
//TODO: Throw error 'Transmissions are taking too long.'
}
event(self::class.' error', app(OverseerCompleteEvent::class, [
'status' => 'timeout',
'message' => class_basename($this).': Transmissions are taking too long.'
]));
return false;
}
return true;
}
/**
* @void
*/
protected function reQueue()
{
/** @var self $job */
$job = app(get_class($this), [
'data' => $this->data,
'requestLog' => $this->requestLog,
'isDryRun' => $this->isDryRun,
'instanceKey' => $this->instanceKey,
'expiration' => $this->expiration,
'promises' => $this->promises,
'completed' => $this->completed
]);
$job = $job->setProvider($this->provider)->onQueue($this->queue);
dispatch($job)->delay(now()->addSeconds($this->interval));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment