Skip to content

Instantly share code, notes, and snippets.

@freekmurze
Forked from barryvdh/HorizonDatabaseQueue.php
Created April 24, 2019 14:15
Show Gist options
  • Save freekmurze/73eb1c03997873f290a36e004880a01d to your computer and use it in GitHub Desktop.
Save freekmurze/73eb1c03997873f290a36e004880a01d to your computer and use it in GitHub Desktop.
<?php
namespace App\Libraries\Queue;
use Illuminate\Events\Dispatcher;
use Illuminate\Queue\DatabaseQueue;
use Illuminate\Support\Str;
use Laravel\Horizon\Events\JobDeleted;
use Laravel\Horizon\Events\JobPushed;
use Laravel\Horizon\Events\JobReleased;
use Laravel\Horizon\Events\JobReserved;
use Laravel\Horizon\JobPayload;
use Laravel\Horizon\RedisQueue;
/**
* Implements the Horizon methods/events for the Database driver.
*
* @see RedisQueue
*/
class HorizonDatabaseQueue extends DatabaseQueue
{
/**
* The job that last pushed to queue via the "push" method.
*
* @var object|string
*/
protected $lastPushed;
/**
* The job that last popped from queue via the "pop" method.
*
* @var \Illuminate\Queue\Jobs\Job
*/
protected $lastPopped;
/**
* Push a new job onto the queue.
*
* @param object|string $job
* @param mixed $data
* @param string|null $queue
* @return mixed
*/
public function push($job, $data = '', $queue = null)
{
$this->lastPushed = $job;
return parent::push($job, $data, $queue);
}
protected function pushToDatabase($queue, $payload, $delay = 0, $attempts = 0)
{
$payload = (new JobPayload($payload))->prepare($this->lastPushed)->value;
return tap(parent::pushToDatabase($queue, $payload, $delay, $attempts), function ($id) use ($queue, $payload) {
$payload = (new JobPayload($payload))->set(['id' => $id])->value;
$this->event($this->getQueue($queue), new JobPushed($payload));
});
}
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTimeInterface|\DateInterval|int $delay
* @param string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function later($delay, $job, $data = '', $queue = null)
{
$this->lastPushed = $job;
return parent::later($delay, $job, $data, $queue);
}
/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
return tap(parent::pop($queue), function ($result) use ($queue) {
/** @var \Illuminate\Queue\Jobs\DatabaseJob */
if ($result) {
$payload = (new JobPayload($result->getRawBody()))
->set(['id' => $result->getJobId()])
->value;
$this->event($this->getQueue($queue), new JobReserved($payload));
$this->lastPopped = $result;
}
});
}
/**
* Delete a reserved job from the queue.
*
* @param string $queue
* @param string $id
* @return void
* @throws \Exception|\Throwable
*/
public function deleteReserved($queue, $id)
{
parent::deleteReserved($queue, $id);
$reservedJob = $this->lastPopped;
if ($reservedJob->getJobId() !== $id) {
return;
}
$payload = (new JobPayload($reservedJob->getRawBody()))
->set(['id' => $reservedJob->getJobId()])
->value;
$this->event($this->getQueue($queue), new JobDeleted($reservedJob, $payload));
}
/**
* Release a reserved job back onto the queue.
*
* @param string $queue
* @param \Illuminate\Queue\Jobs\DatabaseJobRecord $job
* @param int $delay
* @return mixed
*/
public function release($queue, $job, $delay)
{
$result = parent::release($queue, $job, $delay);
$reservedJob = $this->lastPopped;
if ($reservedJob->getJobId() !== $job->id) {
return $result;
}
$payload = (new JobPayload($reservedJob->getRawBody()))
->set(['id' => $reservedJob->getJobId()])
->value;
$this->event($this->getQueue($queue), new JobReleased($payload));
return $result;
}
/**
* Get the number of queue jobs that are ready to process.
*
* @param string|null $queue
* @return int
*/
public function readyNow($queue = null)
{
return $this->database->table($this->table)
->lockForUpdate()
->where('queue', $this->getQueue($queue))
->where(function ($query) {
$this->isAvailable($query);
$this->isReservedButExpired($query);
})
->count();
}
/**
* Fire the given event if a dispatcher is bound.
*
* @param string $queue
* @param mixed $event
* @return void
*/
protected function event($queue, $event)
{
if ($this->container && $this->container->bound(Dispatcher::class)) {
$queue = Str::replaceFirst('queues:', '', $queue);
$this->container->make(Dispatcher::class)->dispatch(
$event->connection($this->getConnectionName())->queue($queue)
);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment