Last active
October 17, 2023 04:42
-
-
Save eileenmcnaughton/1e033f5e30fe6eb82981055226627367 to your computer and use it in GitHub Desktop.
helper file
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Updgrader function | |
/** | |
* Delete now-orphaned records in civicrm_mailing. | |
* | |
* @return bool | |
*/ | |
public function upgrade_4385() : bool { | |
$sql = 'DELETE j | |
FROM civicrm_mailing_job j | |
LEFT JOIN civicrm_mailing_event_queue q ON q.job_id = j.id | |
WHERE q.id IS NULL | |
AND j.id BETWEEN %1 AND %2'; | |
$this->queueSQL($sql, [ | |
1 => [ | |
'value' => 0, | |
'type' => 'Integer', | |
'increment' => 2000, | |
], | |
2 => [ | |
'value' => 2000, | |
'type' => 'Integer', | |
'increment' => 2000, | |
], | |
], | |
[ | |
'sql_returns_none' => ' | |
SELECT j.id | |
FROM civicrm_mailing_job j | |
LEFT JOIN civicrm_mailing_event_queue q ON q.job_id = j.id | |
WHERE q.id IS NULL LIMIT 1' | |
], 20); | |
return TRUE; | |
} | |
/** | |
* Queue up an SQL update. | |
* | |
* @param string $sql | |
* @param array $queryParameters | |
* Parameters to interpolate in with the keys | |
* - value | |
* - type (Integer, String, etc) | |
* - increment (optional, if present this is added to the value on each subsequent run. | |
* @param array $doneCondition | |
* Criteria to determine when it is done. Currently supports one key | |
* - sql_returns_none - a query that should return empty when done. | |
* @param int $weight | |
*/ | |
private function queueSQL(string $sql, array $queryParameters = [], $doneCondition = [], int $weight = 0): void { | |
$queue = new QueueHelper(\Civi::queue('wmf_data_upgrades', [ | |
'type' => 'Sql', | |
'runner' => 'task', | |
// This is kinda high but while debugging I was seeing sometime coworker | |
// causing this to increment too quickly while debugging. This could be | |
// due to the break point process but let's go with this for now. | |
'retry_limit' => 100, | |
'reset' => FALSE, | |
'error' => 'abort', | |
])); | |
$queue->sql($sql, $queryParameters, empty($doneCondition) ? QueueHelper::ITERATE_UNTIL_DONE : QueueHelper::ITERATE_UNTIL_TRUE, $doneCondition, $weight); | |
} | |
<?php | |
namespace Civi\Queue; | |
use Civi\Api4\Queue; | |
use Civi\Core\Exception\DBQueryException; | |
/** | |
* Queue helper. | |
* | |
* This comes from https://gist.github.com/totten/fa830cfced9bb7a92dea485f5422055a | |
* & will hopefully be moved to core. | |
*/ | |
class QueueHelper { | |
protected $queue; | |
public const ITERATE_UNTIL_DONE = 1; | |
public const ITERATE_RUN_ONCE = 0; | |
public const ITERATE_UNTIL_TRUE = 2; | |
/** | |
* @var int|null | |
*/ | |
protected $runAs; | |
public function __construct(\CRM_Queue_Queue $queue) { | |
$this->queue = $queue; | |
} | |
/** | |
* @param int|null $runAs | |
* | |
* @return $this | |
*/ | |
public function setRunAs(?int $runAs): QueueHelper { | |
$this->runAs = $runAs; | |
return $this; | |
} | |
/** | |
* @param string $sql | |
* @param array $params | |
* @param int $iterate | |
* @params array $doneParameters | |
* @param int $weight | |
* | |
* @return $this | |
*/ | |
public function sql(string $sql, array $params = [], int $iterate = self::ITERATE_RUN_ONCE, $doneCondition = [], $weight = 0): QueueHelper { | |
$task = new \CRM_Queue_Task([self::class, 'doSql'], [ | |
$sql, | |
$params, | |
$iterate, | |
$doneCondition | |
]); | |
$task->runAs = $this->runAs; | |
$this->queue->createItem($task, ['weight' => $weight]); | |
return $this; | |
} | |
/** | |
* Api3 & 4 helpers not yet tested ... | |
* | |
* @param string $entity | |
* @param string $action | |
* @param array $params | |
* | |
* @return $this | |
* | |
public function api4(string $entity, string $action, array $params = []) { | |
$this->queue->createItem(new \CRM_Queue_Task([self::class, 'doApi4'], [$entity, $action, $params])); | |
return $this; | |
} | |
public function api3(string $entity, string $action, array $params = []) { | |
$this->queue->createItem(new \CRM_Queue_Task([self::class, 'doApi3'], [$entity, $action, $params])); | |
return $this; | |
} | |
*/ | |
/** | |
* Do SQL in a queue context. | |
* | |
* @param \CRM_Queue_TaskContext $taskContext | |
* @param string $sql | |
* @param array $queryParameters | |
* Values to interpolate into the sql. These are in the format | |
* [1 => [500, 'Integer], 2 => ['bob' => 'String']]. They can be incremented | |
* with the passing of incrementParams. | |
* @param int $iterate | |
* Either self::ITERATE_ONCE or self::ITERATE_UNTIL DONE | |
* @param array $incrementParameters | |
* Additional parameters when using ITERATE_UNTIL_DONE | |
* These are keyed the same as the queryParameters and will be added to the | |
* query parameters to provide batching. e.g if the queryParameters have a key | |
* [1 => [0, 'Integer']] and the incrementParameters have | |
* [1 => ['increment' => 200]] then on re-queueing the parameter will be set | |
* to 200 rather than 0, and 400 for the next iteration etc. | |
* @param array $doneCondition | |
* | |
* @return bool | |
* @internal only use from this class. | |
*/ | |
public static function doSql(\CRM_Queue_TaskContext $taskContext, string $sql, array $queryParameters, int $iterate, array $doneCondition = []): bool { | |
try { | |
$daoParams = []; | |
foreach ($queryParameters as $index => $queryParameter) { | |
// Flatten out the array. | |
$daoParams[$index] = [$queryParameter['value'], $queryParameter['type']]; | |
} | |
$result = \CRM_Core_DAO::executeQuery($sql, $daoParams); | |
} | |
catch (DBQueryException $e) { | |
\Civi::log('queue')->error('queued action failed to run {sql} with parameters {params} sql error {sql_error_code} {message} {exception}', [ | |
'sql' => $sql, | |
'params' => $queryParameters, | |
// @todo - add isDeadLock? Maybe at the error level. | |
'sql_error_code' => $e->getSQLErrorCode(), | |
'message' => $e->getMessage(), | |
'exception' => $e, | |
]); | |
return FALSE; | |
} | |
if (($iterate === self::ITERATE_UNTIL_DONE && $result->affectedRows() > 0) | |
|| ($iterate === self::ITERATE_UNTIL_TRUE && !self::isIterationComplete($doneCondition))) { | |
foreach ($queryParameters as $index => $queryParameter) { | |
// Each loop we add on the value from the increment to our replacement params, if passed | |
// note that there isn't validation at this stage as | |
// to whether we are incrementing an Integer/DateTime. As this | |
// code settles we might add it - just not sure this is the right place. | |
if (!empty($queryParameter['increment'])) { | |
$queryParameters[$index]['value'] += $queryParameter['increment']; | |
} | |
} | |
try { | |
// Not finished, queue another pass. | |
$task = new \CRM_Queue_Task([self::class, 'doSql'], [ | |
$sql, | |
$queryParameters, | |
$iterate, | |
$doneCondition | |
]); | |
$taskContext->queue->createItem($task, ['weight' => $taskContext->queue->getSpec('weight')]); | |
} | |
catch (\CRM_Core_Exception $e) { | |
\Civi::log('queue')->error('queued action failed to re-queue {message} {exception}', [ | |
'message' => $e->getMessage(), | |
'exception' => $e, | |
]); | |
} | |
} | |
return TRUE; | |
} | |
private static function isIterationComplete($doneParams): bool { | |
if (empty($doneParams)) { | |
return TRUE; | |
} | |
// We might nuance this so keying as 'sql_returns_none' for now to allow others. | |
// when the sql returns nothing the iteration is complete. | |
return !\CRM_Core_DAO::singleValueQuery($doneParams['sql_returns_none']); | |
} | |
/** | |
* Api3 & 4 helpers not yet working. | |
* | |
* Do apiv4 call in a queue context. | |
* | |
* @param \CRM_Queue_TaskContext $taskContext | |
* @param string $entity | |
* @param string $action | |
* @param array $params | |
* | |
* @return bool | |
* @internal only use from this class. | |
* | |
public static function doApi4(\CRM_Queue_TaskContext $taskContext, string $entity, string $action, array $params): bool { | |
try { | |
civicrm_api4($entity, $action, $params); | |
} | |
catch (\CRM_Core_Exception $e) { | |
\Civi::log('queue')->error('queued action failed {entity} {action} {params} {message} {exception}', [ | |
'entity' => $entity, | |
'action' => $action, | |
'params' => $params, | |
'message' => $e->getMessage(), | |
'exception' => $e, | |
]); | |
return FALSE; | |
} | |
return TRUE; | |
} | |
/** | |
* Do apiv3 call in a queue context. | |
* | |
* @param \CRM_Queue_TaskContext $taskContext | |
* @param string $entity | |
* @params string $action | |
* @param array $params | |
* | |
* @return bool | |
*@internal only use from this class. | |
* | |
* | |
public static function doApi3(\CRM_Queue_TaskContext $taskContext, string $entity, string $action, array $params): bool { | |
try { | |
civicrm_api3($entity, $action, $params); | |
} | |
catch (\CRM_Core_Exception $e) { | |
\Civi::log('queue')->error('queued action failed {entity} {action} {params} {message} {exception}', [ | |
'entity' => $entity, | |
'action' => $action, | |
'params' => $params, | |
'message' => $e->getMessage(), | |
'exception' => $e, | |
]); | |
return FALSE; | |
} | |
return TRUE; | |
} | |
*/ | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment