Skip to content

Instantly share code, notes, and snippets.

@Insolita
Created May 20, 2017 02:36
Show Gist options
  • Save Insolita/414f0111670e3d5b11b22d94ae1830f1 to your computer and use it in GitHub Desktop.
Save Insolita/414f0111670e3d5b11b22d94ae1830f1 to your computer and use it in GitHub Desktop.
...
<?php
namespace console\components\queue;
use yii\base\Object;
use zhuravljov\yii\queue\Job;
/**
* Class ComplexJob
*
* @package console\components
*/
abstract class ComplexJob extends Object implements Job
{
use SignalAbortTrait;
/**
* @var
*/
public $id;
/**
* @var
*/
public $taskName;
/**
* @var string
*/
public $runBy = 'auto';
/**
* @var bool
**/
public $isRetry = false;
/**
* @var
**/
public $parent = null;
/**
* @return mixed
*/
abstract public function activate();
/**
* @param $percent
* @param null $stage
*/
abstract public function updateProgress($percent, $stage = null);
/**
* @param $newStatus
*/
abstract public function updateStatus($newStatus);
/**
* @param $summary
* @param $state
*/
abstract public function saveSummary($summary, $state);
}
<?php
/**
* Created by solly [09.04.17 9:12]
*/
namespace console\components\queue;
use Carbon\Carbon;
use console\Jobs\JobAbortException;
use yii\db\Connection;
use yii\di\Instance;
use yii\helpers\Json;
use yii\db\Expression;
use zhuravljov\yii\queue\serializers\Serializer;
/**
* Class JobManager
*
* @package console\components\queue
*/
class JobManager implements IJobManagerInterface
{
/**
* @var Connection
*/
private $db;
/**
* @var
*/
private $tableName;
/**
* @var Serializer
*/
private $serializer;
/**
* JobManager constructor.
*
* @param $db
* @param $tableName
* @param Serializer $serializer
*/
public function __construct($db, $tableName, Serializer $serializer)
{
$this->tableName = $tableName;
$this->db = Instance::ensure($db, Connection::class);
$this->serializer = $serializer;
}
/**
* @param \console\components\queue\Job $job
*
* @return void
*/
public function initialize(Job $job, $channel)
{
$this->db->createCommand()->insert(
$this->tableName,
[
'id' => $job->id,
'taskName' => $job->getTaskName(),
'parentTask' => $job->parent,
'channel' => $channel,
'job' => $this->serializer->serialize($job),
'state' => JobStatus::STATUS_WAIT,
'stage' => '',
'percent' => 0,
'retry' => 0,
'report' => '',
'pid' => null,
'runBy' => $job->runBy,
'[[createdAt]]' => time(),
'timeout' => 0,
]
)->execute();
}
/**
* @param $jobId
* @param $pid
* @param string $stage
* @return void
*/
public function activate($jobId, $pid, $stage='init')
{
$this->db->createCommand()->update(
$this->tableName,
[
'[[startAt]]' => Carbon::now()->toDateTimeString(),
'state' => JobStatus::STATUS_ACTIVE,
'stage' => $stage,
'pid' => $pid,
],
'id=:id',
[':id' => $jobId]
)->execute();
}
/**
* @param $jobId
*
* @return void
*/
public function finalize($jobId)
{
$this->db->createCommand()->update(
$this->tableName,
[
'[[finishAt]]' => Carbon::now()->toDateTimeString(),
'percent' => 100,
'pid' => null,
],
'id=:id',
[':id' => $jobId]
)->execute();
}
/**
* @param $jobId
* @param \Throwable|null $error
*
* @return void
*/
public function markFail($jobId, \Throwable $error = null)
{
$update = [
'[[finishAt]]' => Carbon::now()->toDateTimeString(),
'state' => JobStatus::STATUS_FAIL,
'pid' => null,
];
if ($error) {
$update['report'] = Json::encode(
[
$error->getCode(),
$error->getMessage(),
$error->getFile() . ':' . $error->getLine(),
]
);
}
$this->db->createCommand()->update(
$this->tableName,
$update,
'id=:id',
[':id' => $jobId]
)->execute();
}
/**
* @param $jobId
* @param \console\Jobs\JobAbortException $error
*
* @return void
*/
public function markAbort($jobId, JobAbortException $error)
{
$update = [
'[[finishAt]]' => Carbon::now()->toDateTimeString(),
'state' => JobStatus::STATUS_ABORT,
'report' => $error->getMessage(),
'pid' => null,
];
$this->db->createCommand()->update(
$this->tableName,
$update,
'id=:id',
[':id' => $jobId]
)->execute();
}
/**
* @param $id
* @param $percent
* @param null $stage
*
* @return void
*/
public function updateProgress($id, $percent, $stage = null)
{
$update['percent'] = $percent;
if ($stage) {
$update['stage'] = $stage;
}
$this->db->createCommand()->update(
$this->tableName,
$update,
['id' => $id]
)->execute();
}
/**
* @param $id
* @param $state
*
* @return void
*/
public function updateStatus($id, $state)
{
$this->db->createCommand()->update(
$this->tableName,
['state' => $state],
['id' => $id]
)->execute();
}
/**
* @param $summary
* @param $state
*
* @return void
*/
public function saveSummary($id, $summary, $state)
{
$summary = is_string($summary) ? [$summary] : $summary;
$this->db->createCommand()->update(
$this->tableName,
[
'state' => $state,
'report' => Json::encode($summary),
],
'id=:id',
[':id' => $id]
)->execute();
}
/**
* @param $id
* @param $summary
*
* @return void
*/
public function appendSummary($id, $summary)
{
$summary = is_string($summary) ? [$summary] : $summary;
$this->db->createCommand()->update(
$this->tableName,
[
'report' => new Expression('report||:s', [':s' => Json::encode($summary)]),
],
'id=:id',
[':id' => $id]
)->execute();
}
}
<?php
/**
* Created by solly [09.04.17 7:48]
*/
namespace console\components\queue;
use yii\helpers\ArrayHelper;
/**
* Class JobStatus
*
*/
class JobStatus
{
/**
* Ожидает запуска
*/
const STATUS_WAIT = 'wait';
/**
*
*/
const STATUS_WAIT_RETRY = 'waitRetry';
/**
* В процессе выполнения
*/
const STATUS_ACTIVE = 'active';
/**
* Завершена успешно
*/
const STATUS_SUCCESS = 'success';
/**
* Прервана вручную
*/
const STATUS_ABORT = 'abort';
/**
* Завершена с ошибкой
*/
const STATUS_FAIL = 'fail';
/**
* @var array
*/
public static $variants
= [
self::STATUS_SUCCESS => 'Выполнено',
self::STATUS_FAIL => 'Ошибка',
self::STATUS_ABORT => 'Прервано',
self::STATUS_ACTIVE => 'В процессе',
self::STATUS_WAIT => 'Ожидание',
self::STATUS_WAIT_RETRY => 'Перезапуск',
];
/**
* @var array
*/
public static $presents
= [
self::STATUS_SUCCESS => '<div class="label label-success">Выполнено</div>',
self::STATUS_FAIL => '<div class="label label-danger">Ошибка</div>',
self::STATUS_ABORT => '<div class="label bg-black">Прервано</div>',
self::STATUS_ACTIVE => '<div class="label label-primary">В процессе</div>',
self::STATUS_WAIT => '<div class="label bg-purple">Ожидание</div>',
self::STATUS_WAIT_RETRY => '<div class="label bg-gray">Перезапуск</div>',
];
/**
* @var
*/
private $value;
/**
* TaskStatus constructor.
*
* @param $value
*/
public function __construct($value)
{
$this->value = $value;
}
/**
* @return mixed
*/
public function name()
{
return ArrayHelper::getValue(static::$variants, $this->value);
}
/**
* @return mixed
*/
public function present()
{
return ArrayHelper::getValue(static::$presents, $this->value);
}
/**
* @return bool
*/
public function isWait()
{
return $this->value == self::STATUS_WAIT;
}
/**
* @return bool
*/
public function isWaitRetry()
{
return $this->value == self::STATUS_WAIT_RETRY;
}
/**
* @return bool
*/
public function isActive()
{
return $this->value == self::STATUS_ACTIVE;
}
/**
* @return bool
*/
public function isDone()
{
return ($this->value == self::STATUS_ABORT || $this->value == self::STATUS_SUCCESS
|| $this->value == self::STATUS_FAIL);
}
/**
* @return bool
*/
public function isSuccess()
{
return $this->value == self::STATUS_SUCCESS;
}
/**
* @return bool
*/
public function isFail()
{
return $this->value == self::STATUS_FAIL;
}
/**
* @return bool
*/
public function isAbort()
{
return $this->value == self::STATUS_ABORT;
}
}
<?php
namespace console\components\queue;
use console\components\queue\JobAbortException;
use yii\base\Behavior;
use zhuravljov\yii\queue\Queue;
class MixedQueueBehavior extends Behavior
{
/**@var \console\components\queue\JobManager* */
public $jobManager;
/**@var \zhuravljov\yii\queue\Queue* */
public $owner;
public function events()
{
return [
Queue::EVENT_BEFORE_PUSH => 'beforePush',
Queue::EVENT_BEFORE_EXEC => 'beforeWork',
Queue::EVENT_AFTER_EXEC => 'afterWork',
Queue::EVENT_AFTER_EXEC_ERROR => 'onError',
];
}
public function onError(JobErrorEvent $event)
{
if ($event->error instanceof JobAbortException) {
$this->jobManager->markAbort($event->job->id, $event->error);
} else {
$this->jobManager->markFail($event->job->id, $event->error);
}
}
public function beforePush(JobEvent $event)
{
if ($event->job->isRetry === false) {
$this->jobManager->initialize($event->job, $this->owner->channel);
} else {
$this->jobManager->updateStatus($event->job->id, JobStatus::STATUS_WAIT);
}
}
public function afterWork(JobEvent $event)
{
\Yii::$app->getLog()->getLogger()->flush();
$this->jobManager->finalize($event->job);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment