Skip to content

Instantly share code, notes, and snippets.

@eileenmcnaughton
Last active October 17, 2023 04:42
Show Gist options
  • Save eileenmcnaughton/1e033f5e30fe6eb82981055226627367 to your computer and use it in GitHub Desktop.
Save eileenmcnaughton/1e033f5e30fe6eb82981055226627367 to your computer and use it in GitHub Desktop.
helper file
Updgrader function
/**
* Delete now-orphaned records in civicrm_mailing.
*
* @return bool
*/
public function upgrade_4385() : bool {
$sql = 'DELETE j
FROM civicrm_mailing_job j
LEFT JOIN civicrm_mailing_event_queue q ON q.job_id = j.id
WHERE q.id IS NULL
AND j.id BETWEEN %1 AND %2';
$this->queueSQL($sql, [
1 => [
'value' => 0,
'type' => 'Integer',
'increment' => 2000,
],
2 => [
'value' => 2000,
'type' => 'Integer',
'increment' => 2000,
],
],
[
'sql_returns_none' => '
SELECT j.id
FROM civicrm_mailing_job j
LEFT JOIN civicrm_mailing_event_queue q ON q.job_id = j.id
WHERE q.id IS NULL LIMIT 1'
], 20);
return TRUE;
}
/**
* Queue up an SQL update.
*
* @param string $sql
* @param array $queryParameters
* Parameters to interpolate in with the keys
* - value
* - type (Integer, String, etc)
* - increment (optional, if present this is added to the value on each subsequent run.
* @param array $doneCondition
* Criteria to determine when it is done. Currently supports one key
* - sql_returns_none - a query that should return empty when done.
* @param int $weight
*/
private function queueSQL(string $sql, array $queryParameters = [], $doneCondition = [], int $weight = 0): void {
$queue = new QueueHelper(\Civi::queue('wmf_data_upgrades', [
'type' => 'Sql',
'runner' => 'task',
// This is kinda high but while debugging I was seeing sometime coworker
// causing this to increment too quickly while debugging. This could be
// due to the break point process but let's go with this for now.
'retry_limit' => 100,
'reset' => FALSE,
'error' => 'abort',
]));
$queue->sql($sql, $queryParameters, empty($doneCondition) ? QueueHelper::ITERATE_UNTIL_DONE : QueueHelper::ITERATE_UNTIL_TRUE, $doneCondition, $weight);
}
<?php
namespace Civi\Queue;
use Civi\Api4\Queue;
use Civi\Core\Exception\DBQueryException;
/**
* Queue helper.
*
* This comes from https://gist.github.com/totten/fa830cfced9bb7a92dea485f5422055a
* & will hopefully be moved to core.
*/
class QueueHelper {
protected $queue;
public const ITERATE_UNTIL_DONE = 1;
public const ITERATE_RUN_ONCE = 0;
public const ITERATE_UNTIL_TRUE = 2;
/**
* @var int|null
*/
protected $runAs;
public function __construct(\CRM_Queue_Queue $queue) {
$this->queue = $queue;
}
/**
* @param int|null $runAs
*
* @return $this
*/
public function setRunAs(?int $runAs): QueueHelper {
$this->runAs = $runAs;
return $this;
}
/**
* @param string $sql
* @param array $params
* @param int $iterate
* @params array $doneParameters
* @param int $weight
*
* @return $this
*/
public function sql(string $sql, array $params = [], int $iterate = self::ITERATE_RUN_ONCE, $doneCondition = [], $weight = 0): QueueHelper {
$task = new \CRM_Queue_Task([self::class, 'doSql'], [
$sql,
$params,
$iterate,
$doneCondition
]);
$task->runAs = $this->runAs;
$this->queue->createItem($task, ['weight' => $weight]);
return $this;
}
/**
* Api3 & 4 helpers not yet tested ...
*
* @param string $entity
* @param string $action
* @param array $params
*
* @return $this
*
public function api4(string $entity, string $action, array $params = []) {
$this->queue->createItem(new \CRM_Queue_Task([self::class, 'doApi4'], [$entity, $action, $params]));
return $this;
}
public function api3(string $entity, string $action, array $params = []) {
$this->queue->createItem(new \CRM_Queue_Task([self::class, 'doApi3'], [$entity, $action, $params]));
return $this;
}
*/
/**
* Do SQL in a queue context.
*
* @param \CRM_Queue_TaskContext $taskContext
* @param string $sql
* @param array $queryParameters
* Values to interpolate into the sql. These are in the format
* [1 => [500, 'Integer], 2 => ['bob' => 'String']]. They can be incremented
* with the passing of incrementParams.
* @param int $iterate
* Either self::ITERATE_ONCE or self::ITERATE_UNTIL DONE
* @param array $incrementParameters
* Additional parameters when using ITERATE_UNTIL_DONE
* These are keyed the same as the queryParameters and will be added to the
* query parameters to provide batching. e.g if the queryParameters have a key
* [1 => [0, 'Integer']] and the incrementParameters have
* [1 => ['increment' => 200]] then on re-queueing the parameter will be set
* to 200 rather than 0, and 400 for the next iteration etc.
* @param array $doneCondition
*
* @return bool
* @internal only use from this class.
*/
public static function doSql(\CRM_Queue_TaskContext $taskContext, string $sql, array $queryParameters, int $iterate, array $doneCondition = []): bool {
try {
$daoParams = [];
foreach ($queryParameters as $index => $queryParameter) {
// Flatten out the array.
$daoParams[$index] = [$queryParameter['value'], $queryParameter['type']];
}
$result = \CRM_Core_DAO::executeQuery($sql, $daoParams);
}
catch (DBQueryException $e) {
\Civi::log('queue')->error('queued action failed to run {sql} with parameters {params} sql error {sql_error_code} {message} {exception}', [
'sql' => $sql,
'params' => $queryParameters,
// @todo - add isDeadLock? Maybe at the error level.
'sql_error_code' => $e->getSQLErrorCode(),
'message' => $e->getMessage(),
'exception' => $e,
]);
return FALSE;
}
if (($iterate === self::ITERATE_UNTIL_DONE && $result->affectedRows() > 0)
|| ($iterate === self::ITERATE_UNTIL_TRUE && !self::isIterationComplete($doneCondition))) {
foreach ($queryParameters as $index => $queryParameter) {
// Each loop we add on the value from the increment to our replacement params, if passed
// note that there isn't validation at this stage as
// to whether we are incrementing an Integer/DateTime. As this
// code settles we might add it - just not sure this is the right place.
if (!empty($queryParameter['increment'])) {
$queryParameters[$index]['value'] += $queryParameter['increment'];
}
}
try {
// Not finished, queue another pass.
$task = new \CRM_Queue_Task([self::class, 'doSql'], [
$sql,
$queryParameters,
$iterate,
$doneCondition
]);
$taskContext->queue->createItem($task, ['weight' => $taskContext->queue->getSpec('weight')]);
}
catch (\CRM_Core_Exception $e) {
\Civi::log('queue')->error('queued action failed to re-queue {message} {exception}', [
'message' => $e->getMessage(),
'exception' => $e,
]);
}
}
return TRUE;
}
private static function isIterationComplete($doneParams): bool {
if (empty($doneParams)) {
return TRUE;
}
// We might nuance this so keying as 'sql_returns_none' for now to allow others.
// when the sql returns nothing the iteration is complete.
return !\CRM_Core_DAO::singleValueQuery($doneParams['sql_returns_none']);
}
/**
* Api3 & 4 helpers not yet working.
*
* Do apiv4 call in a queue context.
*
* @param \CRM_Queue_TaskContext $taskContext
* @param string $entity
* @param string $action
* @param array $params
*
* @return bool
* @internal only use from this class.
*
public static function doApi4(\CRM_Queue_TaskContext $taskContext, string $entity, string $action, array $params): bool {
try {
civicrm_api4($entity, $action, $params);
}
catch (\CRM_Core_Exception $e) {
\Civi::log('queue')->error('queued action failed {entity} {action} {params} {message} {exception}', [
'entity' => $entity,
'action' => $action,
'params' => $params,
'message' => $e->getMessage(),
'exception' => $e,
]);
return FALSE;
}
return TRUE;
}
/**
* Do apiv3 call in a queue context.
*
* @param \CRM_Queue_TaskContext $taskContext
* @param string $entity
* @params string $action
* @param array $params
*
* @return bool
*@internal only use from this class.
*
*
public static function doApi3(\CRM_Queue_TaskContext $taskContext, string $entity, string $action, array $params): bool {
try {
civicrm_api3($entity, $action, $params);
}
catch (\CRM_Core_Exception $e) {
\Civi::log('queue')->error('queued action failed {entity} {action} {params} {message} {exception}', [
'entity' => $entity,
'action' => $action,
'params' => $params,
'message' => $e->getMessage(),
'exception' => $e,
]);
return FALSE;
}
return TRUE;
}
*/
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment