Created
August 9, 2019 05:31
-
-
Save kmuenkel/ce7af20c4fed74dae626e99b5153e8ec to your computer and use it in GitHub Desktop.
Fan-out the handling of items in an array using asynchronous queue jobs.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace App\Jobs; | |
use Carbon\Carbon; | |
use Illuminate\Bus\Queueable; | |
use App\Models\PayrollRequest; | |
use Illuminate\Support\Collection; | |
use Illuminate\Queue\SerializesModels; | |
use Illuminate\Queue\InteractsWithQueue; | |
use Illuminate\Contracts\Queue\ShouldQueue; | |
/** | |
* Class CacheHydrationJob | |
* @package App\Jobs | |
*/ | |
class InstanceHandlerJob implements ShouldQueue | |
{ | |
use InteractsWithQueue, Queueable, SerializesModels; | |
/** | |
* @var string | |
*/ | |
protected $instanceKey; | |
/** | |
* @var array | |
*/ | |
protected $item; | |
/** | |
* @var \Illuminate\Support\Carbon | |
*/ | |
protected $expiration; | |
/** | |
* CacheHydrationJob constructor. | |
* @param string $instanceKey | |
* @param array $item | |
* @param PayrollRequest $requestLog | |
* @param Carbon $expiration | |
*/ | |
public function __construct( | |
string $instanceKey, | |
array $item, | |
PayrollRequest $requestLog, | |
Carbon $expiration | |
) { | |
$this->queue = 'instance'; | |
$this->instanceKey = $instanceKey; | |
$this->item = $item; | |
$this->expiration = $expiration; | |
} | |
/** | |
* @throws \Exception | |
*/ | |
public function handle() | |
{ | |
try { | |
//TODO: Handle each instance of the Overseer's data | |
} catch (\Exception $error) { | |
// | |
} | |
$promiseKey = $this->instanceKey.'_'.$this->item['id']; | |
cache([$promiseKey => $this->item['id']], $this->expiration); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace App\Jobs; | |
ini_set('memory_limit', '1024M'); | |
exec('ulimit -S -n 2048'); | |
use ErrorTracker; | |
use Carbon\Carbon; | |
use Ramsey\Uuid\Uuid; | |
use Illuminate\Bus\Queueable; | |
use Illuminate\Queue\SerializesModels; | |
use Illuminate\Queue\InteractsWithQueue; | |
use Illuminate\Contracts\Queue\ShouldQueue; | |
/** | |
* The purpose of this class is to ping the Dayforce API for Profile data asynchronously, so that it's available in | |
* cache for later use much faster than pulling them one at a time on-demand | |
* | |
* Class OverseerJob | |
* @package App\Jobs | |
*/ | |
class OverseerJob implements ShouldQueue | |
{ | |
use InteractsWithQueue, Queueable, SerializesModels; | |
/** | |
* @var string | |
*/ | |
protected $instanceKey; | |
/** | |
* @var int | |
*/ | |
protected $timeout = 120; | |
/** | |
* @var int | |
*/ | |
protected $interval = 3; | |
/** | |
* @var array | |
*/ | |
protected $data; | |
/** | |
* @var bool | |
*/ | |
protected $shouldInitiate = false; | |
/** | |
* @var \Illuminate\Support\Carbon | |
*/ | |
protected $expiration; | |
/** | |
* @var array | |
*/ | |
protected $promises = []; | |
/** | |
* @var int | |
*/ | |
protected $completed = 0; | |
/** | |
* @var string | |
*/ | |
protected $provider; | |
/** | |
* CacheHydrationOverseerJob constructor. | |
* @param ResponseCollection $data | |
* @param string|null $instanceKey | |
* @param Carbon|null $expiration | |
* @param array $promises | |
* @param int $completed | |
* @throws \Exception | |
*/ | |
public function __construct( | |
ResponseCollection $data, | |
string $instanceKey = null, | |
$expiration = null, | |
array $promises = [], | |
int $completed = 0 | |
) { | |
$this->queue = 'overseer'; | |
$this->data = $data; | |
$this->expiration = $expiration ?: now()->addSeconds($this->timeout); | |
$this->promises = $promises; | |
$this->completed = $completed; | |
$this->instanceKey = $instanceKey; | |
if (!$this->instanceKey) { | |
$this->instanceKey = (string)Uuid::uuid1(); | |
$this->shouldInitiate = true; | |
} | |
} | |
/** | |
* @param null|string $provider | |
* @return $this | |
*/ | |
public function setProvider(?string $provider) | |
{ | |
$this->provider = $provider; | |
return $this; | |
} | |
/** | |
* @throws \Exception | |
*/ | |
public function handle() | |
{ | |
$progress = []; | |
if ($this->shouldInitiate) { | |
/** @var array $item */ | |
foreach ($this->data as $item) { | |
$promiseKey = $this->instanceKey.'_'.$item['id']; | |
$this->promises[] = $promiseKey; | |
cache([$promiseKey => false], now()->addMinutes(120)); | |
$job = app(InstanceHandlerJob::class, [ | |
'instanceKey' => $this->instanceKey, | |
'item' => $item, | |
'expiration' => now()->addMinutes(120) | |
]); | |
dispatch($job); | |
} | |
} else { | |
foreach ($this->promises as $promiseKey) { | |
$progress[$promiseKey] = cache($promiseKey); | |
} | |
$progress = array_filter($progress); | |
} | |
$completed = count($progress); | |
if (config('app.debug')) { | |
el($completed.' of '.count($this->promises)); | |
} | |
if ($completed == count($this->promises)) { | |
//TODO: Proceed with the next step in the process here | |
} else { | |
if ($completed > $this->completed) { | |
$this->expiration = now()->addSeconds($this->timeout); | |
} elseif (!$this->checkTime()) { | |
return; | |
} | |
$this->completed = $completed; | |
$this->reQueue(); | |
} | |
} | |
/** | |
* @return bool | |
* @throws \Exception | |
*/ | |
protected function checkTime() | |
{ | |
if (now()->greaterThanOrEqualTo($this->expiration)) { | |
$ids = []; | |
foreach ($this->promises as $index => $promiseKey) { | |
if (!cache($promiseKey)) { | |
$ids[] = last(explode('_', $promiseKey, 2)); | |
} | |
} | |
foreach ($ids as $id) { | |
//TODO: Throw error 'Transmissions are taking too long. Remaining records: $ids' | |
} | |
if (empty($ids)) { | |
//TODO: Throw error 'Transmissions are taking too long.' | |
} | |
event(self::class.' error', app(OverseerCompleteEvent::class, [ | |
'status' => 'timeout', | |
'message' => class_basename($this).': Transmissions are taking too long.' | |
])); | |
return false; | |
} | |
return true; | |
} | |
/** | |
* @void | |
*/ | |
protected function reQueue() | |
{ | |
/** @var self $job */ | |
$job = app(get_class($this), [ | |
'data' => $this->data, | |
'requestLog' => $this->requestLog, | |
'isDryRun' => $this->isDryRun, | |
'instanceKey' => $this->instanceKey, | |
'expiration' => $this->expiration, | |
'promises' => $this->promises, | |
'completed' => $this->completed | |
]); | |
$job = $job->setProvider($this->provider)->onQueue($this->queue); | |
dispatch($job)->delay(now()->addSeconds($this->interval)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment