Created
January 27, 2025 22:22
-
-
Save eileenmcnaughton/384c6699f5deea81ae76dc0080824718 to your computer and use it in GitHub Desktop.
queue-helper
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <?php | |
| namespace Civi; | |
| use Civi\Core\Exception\DBQueryException; | |
| /** | |
| * Queue helper. | |
| * | |
| * This comes from https://gist.github.com/totten/fa830cfced9bb7a92dea485f5422055a | |
| * & will hopefully be moved to core. | |
| */ | |
| class QueueHelper { | |
| protected $queue; | |
| public const ITERATE_UNTIL_DONE = 1; | |
| public const ITERATE_RUN_ONCE = 0; | |
| public const ITERATE_UNTIL_TRUE = 2; | |
| /** | |
| * @var int|null | |
| */ | |
| protected $runAs; | |
| public function __construct(\CRM_Queue_Queue $queue) { | |
| $this->queue = $queue; | |
| } | |
| /** | |
| * @param int|null|array $runAs | |
| * | |
| * @return $this | |
| */ | |
| public function setRunAs($runAs): QueueHelper { | |
| $this->runAs = $runAs; | |
| return $this; | |
| } | |
| /** | |
| * @param string $sql | |
| * @param array $params | |
| * @param int $iterate | |
| * @param array $doneCondition | |
| * @param int $weight | |
| * | |
| * @return $this | |
| * | |
| * @params array $doneParameters | |
| */ | |
| public function sql(string $sql, array $params = [], int $iterate = self::ITERATE_RUN_ONCE, $doneCondition = [], $weight = 0): QueueHelper { | |
| $task = new \CRM_Queue_Task([self::class, 'doSql'], [ | |
| $sql, | |
| $params, | |
| $iterate, | |
| $doneCondition, | |
| ]); | |
| $task->runAs = $this->runAs; | |
| $this->queue->createItem($task, ['weight' => $weight]); | |
| return $this; | |
| } | |
| /** | |
| * Create a task to execute an api4 action in a queue context | |
| * | |
| * @param string $entity | |
| * @param string $action | |
| * @param array $params | |
| * @param array $options | |
| * @return $this | |
| */ | |
| public function api4(string $entity, string $action, array $params = [], array $incrementParameters = [], array $options = []) { | |
| $task = new \CRM_Queue_Task([self::class, 'doApi4'], [$entity, $action, $params, $incrementParameters]); | |
| $task->runAs = $this->runAs; | |
| $this->queue->createItem($task, $options); | |
| return $this; | |
| } | |
| /** | |
| * | |
| * | |
| * public function api3(string $entity, string $action, array $params = []) { | |
| * $this->queue->createItem(new \CRM_Queue_Task([self::class, 'doApi3'], [$entity, $action, $params])); | |
| * return $this; | |
| * } | |
| */ | |
| /** | |
| * Do SQL in a queue context. | |
| * | |
| * @param \CRM_Queue_TaskContext $taskContext | |
| * @param string $sql | |
| * @param array $queryParameters | |
| * Values to interpolate into the sql. These are in the format | |
| * [1 => [500, 'Integer], 2 => ['bob' => 'String']]. They can be incremented | |
| * with the passing of incrementParams. | |
| * @param int $iterate | |
| * Either self::ITERATE_ONCE or self::ITERATE_UNTIL DONE | |
| * @param array $incrementParameters | |
| * Additional parameters when using ITERATE_UNTIL_DONE | |
| * These are keyed the same as the queryParameters and will be added to the | |
| * query parameters to provide batching. e.g if the queryParameters have a key | |
| * [1 => [0, 'Integer']] and the incrementParameters have | |
| * [1 => ['increment' => 200]] then on re-queueing the parameter will be set | |
| * to 200 rather than 0, and 400 for the next iteration etc. | |
| * @param array $doneCondition | |
| * | |
| * @return bool | |
| * @internal only use from this class. | |
| */ | |
| public static function doSql(\CRM_Queue_TaskContext $taskContext, string $sql, array $queryParameters, int $iterate, array $doneCondition = []): bool { | |
| try { | |
| $daoParams = []; | |
| foreach ($queryParameters as $index => $queryParameter) { | |
| // Flatten out the array. | |
| $daoParams[$index] = [$queryParameter['value'], $queryParameter['type']]; | |
| } | |
| $result = \CRM_Core_DAO::executeQuery($sql, $daoParams); | |
| } | |
| catch (DBQueryException $e) { | |
| \Civi::log('queue')->error('queued action failed to run {sql} with parameters {params} sql error {sql_error_code} {message} {exception}', [ | |
| 'sql' => $sql, | |
| 'params' => $queryParameters, | |
| // @todo - add isDeadLock? Maybe at the error level. | |
| 'sql_error_code' => $e->getSQLErrorCode(), | |
| 'message' => $e->getMessage(), | |
| 'exception' => $e, | |
| ]); | |
| return FALSE; | |
| } | |
| if (($iterate === self::ITERATE_UNTIL_DONE && $result->affectedRows() > 0) | |
| || ($iterate === self::ITERATE_UNTIL_TRUE && !self::isIterationComplete($doneCondition))) { | |
| foreach ($queryParameters as $index => $queryParameter) { | |
| // Each loop we add on the value from the increment to our replacement params, if passed | |
| // note that there isn't validation at this stage as | |
| // to whether we are incrementing an Integer/DateTime. As this | |
| // code settles we might add it - just not sure this is the right place. | |
| if (!empty($queryParameter['increment'])) { | |
| $queryParameters[$index]['value'] += $queryParameter['increment']; | |
| } | |
| } | |
| try { | |
| // Not finished, queue another pass. | |
| $task = new \CRM_Queue_Task([self::class, 'doSql'], [ | |
| $sql, | |
| $queryParameters, | |
| $iterate, | |
| $doneCondition, | |
| ]); | |
| $taskContext->queue->createItem($task); | |
| } | |
| catch (\CRM_Core_Exception $e) { | |
| \Civi::log('queue')->error('queued action failed to re-queue {message} {exception}', [ | |
| 'message' => $e->getMessage(), | |
| 'exception' => $e, | |
| ]); | |
| } | |
| } | |
| return TRUE; | |
| } | |
| private static function isIterationComplete($doneParams): bool { | |
| if (empty($doneParams)) { | |
| return TRUE; | |
| } | |
| // We might nuance this so keying as 'sql_returns_none' for now to allow others. | |
| // when the sql returns nothing the iteration is complete. | |
| return !\CRM_Core_DAO::singleValueQuery($doneParams['sql_returns_none']); | |
| } | |
| /** | |
| * Queue calls using api v4, with optional increment parameters. | |
| * | |
| * Do apiv4 call in a queue context. | |
| * | |
| * @param \CRM_Queue_TaskContext $taskContext | |
| * @param string $entity | |
| * @param string $action | |
| * @param array $params api Parameters with (e.g.) '%1' in the where clause to be | |
| * replaced with the increment parameter 1 | |
| * @param array $incrementParameters Describe how to increment the values. | |
| * 1 => [ | |
| * 'value' => 0, | |
| * 'type' => 'Integer', | |
| * 'increment' => 2, | |
| * 'max' => $maxContactID, | |
| * ], | |
| * 2 => [ | |
| * 'value' => 2, | |
| * 'type' => 'Integer', | |
| * 'increment' => 2, | |
| * ], | |
| * | |
| * @return bool | |
| * @internal only use from this class. | |
| */ | |
| public static function doApi4(\CRM_Queue_TaskContext $taskContext, string $entity, string $action, array $params, $incrementParameters): bool { | |
| try { | |
| $runParams = $params; | |
| $runParams['checkPermissions'] = FALSE; | |
| foreach ($incrementParameters as $incrementKey => $incrementParameter) { | |
| foreach ($params['where'] ?? [] as $index => $where) { | |
| if (is_array($where[2])) { | |
| foreach ($where[2] as $key => $criteria) { | |
| if ($criteria === '%' . $incrementKey) { | |
| $runParams['where'][$index][2][$key] = $incrementParameter['value']; | |
| } | |
| } | |
| } | |
| else { | |
| $criteria = $where[2]; | |
| if ($criteria === '%' . $incrementKey) { | |
| $runParams['where'][$index][2] = $incrementParameter['value']; | |
| } | |
| } | |
| } | |
| } | |
| civicrm_api4($entity, $action, $runParams); | |
| $reQueue = TRUE; | |
| foreach ($incrementParameters as $key => $incrementParameter) { | |
| if (empty($incrementParameter['max']) || $incrementParameter['value'] < $incrementParameter['max']) { | |
| $incrementParameters[$key]['value'] += $incrementParameter['increment']; | |
| } | |
| else { | |
| // If max is set & met on any parameter we are done. | |
| $reQueue = FALSE; | |
| } | |
| } | |
| if ($reQueue) { | |
| // Not finished, queue another pass. | |
| $task = new \CRM_Queue_Task([self::class, 'doApi4'], [ | |
| $entity, | |
| $action, | |
| $params, | |
| $incrementParameters, | |
| ]); | |
| $taskContext->queue->createItem($task); | |
| } | |
| } | |
| catch (\CRM_Core_Exception $e) { | |
| \Civi::log('queue')->error('queued action failed {entity} {action} {params} {message} {exception}', [ | |
| 'entity' => $entity, | |
| 'action' => $action, | |
| 'params' => $params, | |
| 'message' => $e->getMessage(), | |
| 'exception' => $e, | |
| ]); | |
| return FALSE; | |
| } | |
| return TRUE; | |
| } | |
| /** | |
| * Do apiv3 call in a queue context. | |
| * | |
| * @internal only use from this class. | |
| * | |
| * | |
| * public static function doApi3(\CRM_Queue_TaskContext $taskContext, string $entity, string $action, array $params): bool { | |
| * try { | |
| * civicrm_api3($entity, $action, $params); | |
| * } | |
| * catch (\CRM_Core_Exception $e) { | |
| * \Civi::log('queue')->error('queued action failed {entity} {action} {params} {message} {exception}', [ | |
| * 'entity' => $entity, | |
| * 'action' => $action, | |
| * 'params' => $params, | |
| * 'message' => $e->getMessage(), | |
| * 'exception' => $e, | |
| * ]); | |
| * return FALSE; | |
| * } | |
| * return TRUE; | |
| * } | |
| */ | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment