Created
May 20, 2017 02:36
-
-
Save Insolita/414f0111670e3d5b11b22d94ae1830f1 to your computer and use it in GitHub Desktop.
...
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace console\components\queue; | |
use yii\base\Object; | |
use zhuravljov\yii\queue\Job; | |
/** | |
* Class ComplexJob | |
* | |
* @package console\components | |
*/ | |
abstract class ComplexJob extends Object implements Job | |
{ | |
use SignalAbortTrait; | |
/** | |
* @var | |
*/ | |
public $id; | |
/** | |
* @var | |
*/ | |
public $taskName; | |
/** | |
* @var string | |
*/ | |
public $runBy = 'auto'; | |
/** | |
* @var bool | |
**/ | |
public $isRetry = false; | |
/** | |
* @var | |
**/ | |
public $parent = null; | |
/** | |
* @return mixed | |
*/ | |
abstract public function activate(); | |
/** | |
* @param $percent | |
* @param null $stage | |
*/ | |
abstract public function updateProgress($percent, $stage = null); | |
/** | |
* @param $newStatus | |
*/ | |
abstract public function updateStatus($newStatus); | |
/** | |
* @param $summary | |
* @param $state | |
*/ | |
abstract public function saveSummary($summary, $state); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
/** | |
* Created by solly [09.04.17 9:12] | |
*/ | |
namespace console\components\queue; | |
use Carbon\Carbon; | |
use console\Jobs\JobAbortException; | |
use yii\db\Connection; | |
use yii\di\Instance; | |
use yii\helpers\Json; | |
use yii\db\Expression; | |
use zhuravljov\yii\queue\serializers\Serializer; | |
/** | |
* Class JobManager | |
* | |
* @package console\components\queue | |
*/ | |
class JobManager implements IJobManagerInterface | |
{ | |
/** | |
* @var Connection | |
*/ | |
private $db; | |
/** | |
* @var | |
*/ | |
private $tableName; | |
/** | |
* @var Serializer | |
*/ | |
private $serializer; | |
/** | |
* JobManager constructor. | |
* | |
* @param $db | |
* @param $tableName | |
* @param Serializer $serializer | |
*/ | |
public function __construct($db, $tableName, Serializer $serializer) | |
{ | |
$this->tableName = $tableName; | |
$this->db = Instance::ensure($db, Connection::class); | |
$this->serializer = $serializer; | |
} | |
/** | |
* @param \console\components\queue\Job $job | |
* | |
* @return void | |
*/ | |
public function initialize(Job $job, $channel) | |
{ | |
$this->db->createCommand()->insert( | |
$this->tableName, | |
[ | |
'id' => $job->id, | |
'taskName' => $job->getTaskName(), | |
'parentTask' => $job->parent, | |
'channel' => $channel, | |
'job' => $this->serializer->serialize($job), | |
'state' => JobStatus::STATUS_WAIT, | |
'stage' => '', | |
'percent' => 0, | |
'retry' => 0, | |
'report' => '', | |
'pid' => null, | |
'runBy' => $job->runBy, | |
'[[createdAt]]' => time(), | |
'timeout' => 0, | |
] | |
)->execute(); | |
} | |
/** | |
* @param $jobId | |
* @param $pid | |
* @param string $stage | |
* @return void | |
*/ | |
public function activate($jobId, $pid, $stage='init') | |
{ | |
$this->db->createCommand()->update( | |
$this->tableName, | |
[ | |
'[[startAt]]' => Carbon::now()->toDateTimeString(), | |
'state' => JobStatus::STATUS_ACTIVE, | |
'stage' => $stage, | |
'pid' => $pid, | |
], | |
'id=:id', | |
[':id' => $jobId] | |
)->execute(); | |
} | |
/** | |
* @param $jobId | |
* | |
* @return void | |
*/ | |
public function finalize($jobId) | |
{ | |
$this->db->createCommand()->update( | |
$this->tableName, | |
[ | |
'[[finishAt]]' => Carbon::now()->toDateTimeString(), | |
'percent' => 100, | |
'pid' => null, | |
], | |
'id=:id', | |
[':id' => $jobId] | |
)->execute(); | |
} | |
/** | |
* @param $jobId | |
* @param \Throwable|null $error | |
* | |
* @return void | |
*/ | |
public function markFail($jobId, \Throwable $error = null) | |
{ | |
$update = [ | |
'[[finishAt]]' => Carbon::now()->toDateTimeString(), | |
'state' => JobStatus::STATUS_FAIL, | |
'pid' => null, | |
]; | |
if ($error) { | |
$update['report'] = Json::encode( | |
[ | |
$error->getCode(), | |
$error->getMessage(), | |
$error->getFile() . ':' . $error->getLine(), | |
] | |
); | |
} | |
$this->db->createCommand()->update( | |
$this->tableName, | |
$update, | |
'id=:id', | |
[':id' => $jobId] | |
)->execute(); | |
} | |
/** | |
* @param $jobId | |
* @param \console\Jobs\JobAbortException $error | |
* | |
* @return void | |
*/ | |
public function markAbort($jobId, JobAbortException $error) | |
{ | |
$update = [ | |
'[[finishAt]]' => Carbon::now()->toDateTimeString(), | |
'state' => JobStatus::STATUS_ABORT, | |
'report' => $error->getMessage(), | |
'pid' => null, | |
]; | |
$this->db->createCommand()->update( | |
$this->tableName, | |
$update, | |
'id=:id', | |
[':id' => $jobId] | |
)->execute(); | |
} | |
/** | |
* @param $id | |
* @param $percent | |
* @param null $stage | |
* | |
* @return void | |
*/ | |
public function updateProgress($id, $percent, $stage = null) | |
{ | |
$update['percent'] = $percent; | |
if ($stage) { | |
$update['stage'] = $stage; | |
} | |
$this->db->createCommand()->update( | |
$this->tableName, | |
$update, | |
['id' => $id] | |
)->execute(); | |
} | |
/** | |
* @param $id | |
* @param $state | |
* | |
* @return void | |
*/ | |
public function updateStatus($id, $state) | |
{ | |
$this->db->createCommand()->update( | |
$this->tableName, | |
['state' => $state], | |
['id' => $id] | |
)->execute(); | |
} | |
/** | |
* @param $summary | |
* @param $state | |
* | |
* @return void | |
*/ | |
public function saveSummary($id, $summary, $state) | |
{ | |
$summary = is_string($summary) ? [$summary] : $summary; | |
$this->db->createCommand()->update( | |
$this->tableName, | |
[ | |
'state' => $state, | |
'report' => Json::encode($summary), | |
], | |
'id=:id', | |
[':id' => $id] | |
)->execute(); | |
} | |
/** | |
* @param $id | |
* @param $summary | |
* | |
* @return void | |
*/ | |
public function appendSummary($id, $summary) | |
{ | |
$summary = is_string($summary) ? [$summary] : $summary; | |
$this->db->createCommand()->update( | |
$this->tableName, | |
[ | |
'report' => new Expression('report||:s', [':s' => Json::encode($summary)]), | |
], | |
'id=:id', | |
[':id' => $id] | |
)->execute(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
/** | |
* Created by solly [09.04.17 7:48] | |
*/ | |
namespace console\components\queue; | |
use yii\helpers\ArrayHelper; | |
/** | |
* Class JobStatus | |
* | |
*/ | |
class JobStatus | |
{ | |
/** | |
* Ожидает запуска | |
*/ | |
const STATUS_WAIT = 'wait'; | |
/** | |
* | |
*/ | |
const STATUS_WAIT_RETRY = 'waitRetry'; | |
/** | |
* В процессе выполнения | |
*/ | |
const STATUS_ACTIVE = 'active'; | |
/** | |
* Завершена успешно | |
*/ | |
const STATUS_SUCCESS = 'success'; | |
/** | |
* Прервана вручную | |
*/ | |
const STATUS_ABORT = 'abort'; | |
/** | |
* Завершена с ошибкой | |
*/ | |
const STATUS_FAIL = 'fail'; | |
/** | |
* @var array | |
*/ | |
public static $variants | |
= [ | |
self::STATUS_SUCCESS => 'Выполнено', | |
self::STATUS_FAIL => 'Ошибка', | |
self::STATUS_ABORT => 'Прервано', | |
self::STATUS_ACTIVE => 'В процессе', | |
self::STATUS_WAIT => 'Ожидание', | |
self::STATUS_WAIT_RETRY => 'Перезапуск', | |
]; | |
/** | |
* @var array | |
*/ | |
public static $presents | |
= [ | |
self::STATUS_SUCCESS => '<div class="label label-success">Выполнено</div>', | |
self::STATUS_FAIL => '<div class="label label-danger">Ошибка</div>', | |
self::STATUS_ABORT => '<div class="label bg-black">Прервано</div>', | |
self::STATUS_ACTIVE => '<div class="label label-primary">В процессе</div>', | |
self::STATUS_WAIT => '<div class="label bg-purple">Ожидание</div>', | |
self::STATUS_WAIT_RETRY => '<div class="label bg-gray">Перезапуск</div>', | |
]; | |
/** | |
* @var | |
*/ | |
private $value; | |
/** | |
* TaskStatus constructor. | |
* | |
* @param $value | |
*/ | |
public function __construct($value) | |
{ | |
$this->value = $value; | |
} | |
/** | |
* @return mixed | |
*/ | |
public function name() | |
{ | |
return ArrayHelper::getValue(static::$variants, $this->value); | |
} | |
/** | |
* @return mixed | |
*/ | |
public function present() | |
{ | |
return ArrayHelper::getValue(static::$presents, $this->value); | |
} | |
/** | |
* @return bool | |
*/ | |
public function isWait() | |
{ | |
return $this->value == self::STATUS_WAIT; | |
} | |
/** | |
* @return bool | |
*/ | |
public function isWaitRetry() | |
{ | |
return $this->value == self::STATUS_WAIT_RETRY; | |
} | |
/** | |
* @return bool | |
*/ | |
public function isActive() | |
{ | |
return $this->value == self::STATUS_ACTIVE; | |
} | |
/** | |
* @return bool | |
*/ | |
public function isDone() | |
{ | |
return ($this->value == self::STATUS_ABORT || $this->value == self::STATUS_SUCCESS | |
|| $this->value == self::STATUS_FAIL); | |
} | |
/** | |
* @return bool | |
*/ | |
public function isSuccess() | |
{ | |
return $this->value == self::STATUS_SUCCESS; | |
} | |
/** | |
* @return bool | |
*/ | |
public function isFail() | |
{ | |
return $this->value == self::STATUS_FAIL; | |
} | |
/** | |
* @return bool | |
*/ | |
public function isAbort() | |
{ | |
return $this->value == self::STATUS_ABORT; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace console\components\queue; | |
use console\components\queue\JobAbortException; | |
use yii\base\Behavior; | |
use zhuravljov\yii\queue\Queue; | |
class MixedQueueBehavior extends Behavior | |
{ | |
/**@var \console\components\queue\JobManager* */ | |
public $jobManager; | |
/**@var \zhuravljov\yii\queue\Queue* */ | |
public $owner; | |
public function events() | |
{ | |
return [ | |
Queue::EVENT_BEFORE_PUSH => 'beforePush', | |
Queue::EVENT_BEFORE_EXEC => 'beforeWork', | |
Queue::EVENT_AFTER_EXEC => 'afterWork', | |
Queue::EVENT_AFTER_EXEC_ERROR => 'onError', | |
]; | |
} | |
public function onError(JobErrorEvent $event) | |
{ | |
if ($event->error instanceof JobAbortException) { | |
$this->jobManager->markAbort($event->job->id, $event->error); | |
} else { | |
$this->jobManager->markFail($event->job->id, $event->error); | |
} | |
} | |
public function beforePush(JobEvent $event) | |
{ | |
if ($event->job->isRetry === false) { | |
$this->jobManager->initialize($event->job, $this->owner->channel); | |
} else { | |
$this->jobManager->updateStatus($event->job->id, JobStatus::STATUS_WAIT); | |
} | |
} | |
public function afterWork(JobEvent $event) | |
{ | |
\Yii::$app->getLog()->getLogger()->flush(); | |
$this->jobManager->finalize($event->job); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment