-
-
Save elfeffe/4c5553e7a95bddfb07cef628785b5bce to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace App\Libraries\Queue; | |
use Illuminate\Events\Dispatcher; | |
use Illuminate\Queue\DatabaseQueue; | |
use Illuminate\Support\Str; | |
use Laravel\Horizon\Events\JobDeleted; | |
use Laravel\Horizon\Events\JobPushed; | |
use Laravel\Horizon\Events\JobReleased; | |
use Laravel\Horizon\Events\JobReserved; | |
use Laravel\Horizon\JobPayload; | |
use Laravel\Horizon\RedisQueue; | |
/** | |
* Implements the Horizon methods/events for the Database driver. | |
* | |
* @see RedisQueue | |
*/ | |
class HorizonDatabaseQueue extends DatabaseQueue | |
{ | |
/** | |
* The job that last pushed to queue via the "push" method. | |
* | |
* @var object|string | |
*/ | |
protected $lastPushed; | |
/** | |
* The job that last popped from queue via the "pop" method. | |
* | |
* @var \Illuminate\Queue\Jobs\Job | |
*/ | |
protected $lastPopped; | |
/** | |
* Push a new job onto the queue. | |
* | |
* @param object|string $job | |
* @param mixed $data | |
* @param string|null $queue | |
* @return mixed | |
*/ | |
public function push($job, $data = '', $queue = null) | |
{ | |
$this->lastPushed = $job; | |
return parent::push($job, $data, $queue); | |
} | |
protected function pushToDatabase($queue, $payload, $delay = 0, $attempts = 0) | |
{ | |
$payload = (new JobPayload($payload))->prepare($this->lastPushed)->value; | |
return tap(parent::pushToDatabase($queue, $payload, $delay, $attempts), function ($id) use ($queue, $payload) { | |
$payload = (new JobPayload($payload))->set(['id' => $id])->value; | |
$this->event($this->getQueue($queue), new JobPushed($payload)); | |
}); | |
} | |
/** | |
* Push a new job onto the queue after a delay. | |
* | |
* @param \DateTimeInterface|\DateInterval|int $delay | |
* @param string $job | |
* @param mixed $data | |
* @param string $queue | |
* @return mixed | |
*/ | |
public function later($delay, $job, $data = '', $queue = null) | |
{ | |
$this->lastPushed = $job; | |
return parent::later($delay, $job, $data, $queue); | |
} | |
/** | |
* Pop the next job off of the queue. | |
* | |
* @param string $queue | |
* @return \Illuminate\Contracts\Queue\Job|null | |
*/ | |
public function pop($queue = null) | |
{ | |
return tap(parent::pop($queue), function ($result) use ($queue) { | |
/** @var \Illuminate\Queue\Jobs\DatabaseJob */ | |
if ($result) { | |
$payload = (new JobPayload($result->getRawBody())) | |
->set(['id' => $result->getJobId()]) | |
->value; | |
$this->event($this->getQueue($queue), new JobReserved($payload)); | |
$this->lastPopped = $result; | |
} | |
}); | |
} | |
/** | |
* Delete a reserved job from the queue. | |
* | |
* @param string $queue | |
* @param string $id | |
* @return void | |
* @throws \Exception|\Throwable | |
*/ | |
public function deleteReserved($queue, $id) | |
{ | |
parent::deleteReserved($queue, $id); | |
$reservedJob = $this->lastPopped; | |
if ($reservedJob->getJobId() !== $id) { | |
return; | |
} | |
$payload = (new JobPayload($reservedJob->getRawBody())) | |
->set(['id' => $reservedJob->getJobId()]) | |
->value; | |
$this->event($this->getQueue($queue), new JobDeleted($reservedJob, $payload)); | |
} | |
/** | |
* Release a reserved job back onto the queue. | |
* | |
* @param string $queue | |
* @param \Illuminate\Queue\Jobs\DatabaseJobRecord $job | |
* @param int $delay | |
* @return mixed | |
*/ | |
public function release($queue, $job, $delay) | |
{ | |
$result = parent::release($queue, $job, $delay); | |
$reservedJob = $this->lastPopped; | |
if ($reservedJob->getJobId() !== $job->id) { | |
return $result; | |
} | |
$payload = (new JobPayload($reservedJob->getRawBody())) | |
->set(['id' => $reservedJob->getJobId()]) | |
->value; | |
$this->event($this->getQueue($queue), new JobReleased($payload)); | |
return $result; | |
} | |
/** | |
* Get the number of queue jobs that are ready to process. | |
* | |
* @param string|null $queue | |
* @return int | |
*/ | |
public function readyNow($queue = null) | |
{ | |
return $this->database->table($this->table) | |
->lockForUpdate() | |
->where('queue', $this->getQueue($queue)) | |
->where(function ($query) { | |
$this->isAvailable($query); | |
$this->isReservedButExpired($query); | |
}) | |
->count(); | |
} | |
/** | |
* Fire the given event if a dispatcher is bound. | |
* | |
* @param string $queue | |
* @param mixed $event | |
* @return void | |
*/ | |
protected function event($queue, $event) | |
{ | |
if ($this->container && $this->container->bound(Dispatcher::class)) { | |
$queue = Str::replaceFirst('queues:', '', $queue); | |
$this->container->make(Dispatcher::class)->dispatch( | |
$event->connection($this->getConnectionName())->queue($queue) | |
); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment