Have you ever met the Too many queueable jobs added to the queue: 2
exception? In such case this little framework can be interesting for you.
Utility class to enqueue Queueable Jobs. It is intended to be used as a replacement for System.enqueJob()
method. Advantages in comparison with the system method:
- Allows to enqueue Queueable job even if you have run out of governon limit on number of Queueable jobs. It can be helpful in case you need to enqueue multiple Queueable jobs from Apex Batch Job, future method, or Queueable Job
- Better error handling. Error message contains full stack trace, there is possibility to retry eecution of failed Queueable Jobs.
The Queueable Job should support serialization to JSON string. JSON.serializePretty() and JSON.deserialize() are used to serialize the job to Custom Field Params__c on Async_Request__c
The Queueable Jobs should be derived from BaseQueueable abstract class. In case you need to enqueue some existing job, you can create QueueableWrapper which is derived from BaseQueueable. It should take the Queueable job as a constructor parameter.
Create Apex class which is derived from the BaseQueueable
public with sharing class MyQueueable extends BaseQueueable {
public override void doExecute(QueueableContext context) {
System.debug('doExecute()');
}
}
Enqueue the Queueable class:
List<BaseQueueable> queueables = new List<BaseQueueable> {
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(), new MyQueueable(),
new MyQueueable()
};
QueueableManager.enqueue(queueables);
It is recommended to derive all Queueable Jobs in your project from BaseQueueable class and use QueueableManager to start the new Queueable jobs. In such case there will higher chances that some specific Queueable Job will be picked up by the framework even if limits for Queueable Job are exceeded in current transaction.
To restart failed jobs, you can change status on corresponding Async Request records from Error
to null and execute QueueableManager.enqueueNextJob();
method from Anonymous Apex. It is important that either CreatedById
or LastModifiedById
on the records is the same as Id of the user which executes the script above.
- The framework serializes BaseQueueable to JSON format to be able to restore it later and execute corresponding Queueable Job. The length of the serialized JSON is limited by Async_Request__c.Params__c field which is 131KB. Possible workaround is to change the implementation so the large serialized JSON is written to file instead of Large Text Area field.
- Implement alternative approaches to enqueue Async Tasks even if limits for Queueable Jobs exceeded. Possible option is to create static Boolean variable hasEnqueuedAnyJobInCurrentTransaction inside QueueableManager to indicate if QueueableManager was used previously to enqueue any job. In case limits for Queueable Jobs exceeded and the framework was not called previously, than either future method or Schedulable can be used as a backup mechanism to enqueue the jobs under current user. Alternative approach is using Transaction Finalizers (Beta) for it.
- Implement Schedulable job to monitor status of Async Requests and send corresponding email
- Use Transaction Finalizers (Beta) for error handling, enqueuening the next Queueable Job from Async Request, removing Async Request record for successfully completed Queueable Job.
Implementation ideas were taken from:
- Async Queue Framework by Jitendra Zaa
- Going Asynchronous with Queueable Apex by Dan Appleman in Advanced Apex Programming in Salesforce
- Salesforce Asynchronous by Jitendra Zaa on SalesforceWay Podcast (part 1, part 2)
- Apex Queueable and easy chaining - Medium
Hi,
Love the Async_Request__c approach. I've implemented it, and it works really well most of the time. One particular use case did cause an issue. The "FOR UPDATE" flag didn't prevent two threads from grabbing the same record which resulted in two processes performing the same operation.
To address this, we added a thread attribute in order to prevent this from happening. Here are the additions and changes:
Async_Request__c Object:
Added a new number field: Thread__c
BaseQueueable class:
NEW:
public integer thread;
CHANGE:
WAS:
QueueableManager.enqueueNextJob();
IS:
QueueableManager.enqueueNextJob(this.thread);
QueueableManager Class:
CHANGED for loop within enqueue method:
CHANGED enqueueNextJob Method:
public static void enqueueNextJob(Integer currentThread) {
I thought the "FOR UPDATE" operation would prevent this type of issue, but it doesn't seem to be working for us.
With the above code changes, now as each queueable job finishes, it only grabs the next job with the same thread number. This prevents multiple processes from grabbing the same record.
Hope this helps!
Mike