Header Ads Widget

Responsive Advertisement

Optimizing Salesforce Queueable Jobs with Queueable Manager | Asynchronous Apex Best Practices

Salesforce Queueable Jobs are a powerful tool for handling asynchronous operations. However, managing multiple Queueable Jobs efficiently can be challenging, especially when dealing with governor limits and error handling. In this blog post, we'll explore how to optimize Salesforce Queueable Jobs using a Queueable Manager, a utility class designed to streamline the enqueuing process and improve error handling.

Table of Contents

Introduction

Queueable Jobs in Salesforce allow developers to execute complex operations asynchronously. However, when you need to enqueue multiple jobs, you might encounter the "Too many queueable jobs added to the queue" exception. This is where a Queueable Manager comes into play. It helps you manage and enqueue jobs more efficiently, even when you're close to hitting governor limits.

Architecture Overview

The Queueable Manager is a utility class that acts as a wrapper around the standard System.enqueueJob() method. It provides additional features such as better error handling, retry mechanisms, and the ability to enqueue jobs even when you've reached the governor limit for Queueable Jobs.

The diagram above illustrates how the Queueable Manager interacts with Salesforce's asynchronous framework. It serializes Queueable Jobs into JSON and stores them in a custom object (Async_Request__c). This allows the jobs to be enqueued later, even if the initial enqueue attempt fails due to limits.

Creating the Custom Object

To implement the Queueable Manager, you need to create a custom object called Async_Request__c. This object will store the details of each Queueable Job, including its status, parameters, and error messages (if any). Below are the fields you need to create:

Fields in Async_Request__c

  • Apex_Class_Name__c - Text(255): Stores the name of the Apex class for the Queueable Job.
  • Params__c - Long Text Area(131072): Stores the serialized JSON parameters of the Queueable Job.
  • Status__c - Picklist (New, Enqueued, Error): Tracks the status of the Queueable Job.
  • Async_Apex_Job_Id__c - Text(18): Stores the ID of the associated Async Apex Job. This field should be marked as Unique Case Insensitive.
  • Error_Message__c - Long Text Area(32768): Stores detailed error messages if the job fails.
  • Thread__c - Number: A new field to manage thread logic and prevent multiple processes from grabbing the same record.

Here’s how you can create the custom object in Salesforce:

  1. Go to SetupObject ManagerCreateCustom Object.
  2. Name the object Async_Request__c and provide a plural label like "Async Requests".
  3. Add the fields listed above with the specified data types and settings.
  4. Save the object and ensure it is accessible to the profiles that will use the Queueable Manager.

Step-by-Step Implementation

1. Create the BaseQueueable Class

The BaseQueueable class is an abstract class that all your Queueable Jobs should extend. It provides a standardized way to handle execution and error logging.

public abstract with sharing class BaseQueueable implements Queueable {
    public Integer thread;  // Support for thread tracking

    public void execute(QueueableContext context) {
        Savepoint savepoint = Database.setSavepoint();
        try {
            doExecute(context);
            delete [SELECT Id FROM Async_Request__c WHERE Async_Apex_Job_Id__c = :context.getJobId()];
        } catch (Exception ex) {
            Database.rollback(savepoint);

            Async_Request__c asyncRequest = [
                SELECT Id FROM Async_Request__c WHERE Async_Apex_Job_Id__c = :context.getJobId()
                LIMIT 1
            ];
            asyncRequest.Status__c = 'Error';
            asyncRequest.Error_Message__c = ex.getTypeName() + ': ' + ex.getMessage() + 
                ' (Line ' + ex.getLineNumber() + ') ' + ex.getStackTraceString();
            update asyncRequest;

            QueueableManager.enqueueNextJob(this.thread);
            return;
        }

        QueueableManager.enqueueNextJob(this.thread);
    }

    public abstract void doExecute(QueueableContext context);
}

2. Implement the QueueableManager Class

The QueueableManager class is responsible for enqueuing jobs and handling retries. It checks the available limits and enqueues jobs accordingly. The thread logic is added to prevent multiple processes from grabbing the same record.

public with sharing class QueueableManager {
    public static void enqueue(BaseQueueable queueable) {
        enqueue(new List{queueable});
    }

    public static void enqueue(List queueables) {
        List asyncRequestsToInsert = new List();
        Integer availableLimit = Limits.getLimitQueueableJobs() - Limits.getQueueableJobs();

        integer threadCount = 0;
        integer threadLimit = availableLimit;
        for (BaseQueueable queueable : queueables) {
            queueable.thread = threadCount;
            String paramsJSON = JSON.serialize(queueable);
            System.debug('@queueable'+ queueable);
            
            Async_Request__c asyncRequest = new Async_Request__c(
                Apex_Class_Name__c = getObjectType(queueable),
                Params__c = paramsJSON.length() > 131072 ? paramsJSON.substring(0,131071) : paramsJSON,
                Thread__c = threadCount
            );
            if (availableLimit > 0) {
                asyncRequest.Async_Apex_Job_Id__c = System.enqueueJob(queueable);
                asyncRequest.Status__c = 'Enqueued';
                availableLimit--;
            }
            asyncRequestsToInsert.add(asyncRequest);
            threadCount++;
            if (threadCount >= threadLimit) threadCount = 0;

        }
        insert asyncRequestsToInsert;
    }

    
    /**
     * @description Enqueues the next Queueable Job which is serialized to Async_Request__c object. It is
     * intended to be called from execute() method when logic of current Queueable Job is completed.
     */
    public static void enqueueNextJob(Integer currentThread) {
        Savepoint savepoint;
        try {
            Id currentUserId = UserInfo.getUserId();
            List asyncRequests =
                [
                    SELECT Apex_Class_Name__c, Params__c, Thread__c
                    FROM Async_Request__c
                    WHERE Status__c = 'New' AND (CreatedById = :currentUserId OR LastModifiedById = :currentUserId) AND
                        Thread__c =: currentThread
                    LIMIT 1
                    FOR UPDATE
                ];
                if (asyncRequests.isEmpty()) {
                    return;
                }

            Async_Request__c asyncRequest = asyncRequests[0];
            BaseQueueable queueable;
            try{
                queueable =
                    (BaseQueueable) JSON.deserialize(
                        asyncRequest.Params__c, Type.forName(asyncRequest.Apex_Class_Name__c)
                    )
                ;
                queueable.thread = currentThread;
            } catch (Exception ex) {
                asyncRequest.Status__c = 'Error';
                asyncRequest.Error_Message__c =
                    ex.getTypeName() + ': ' + ex.getMessage() + '; cause: ' + ex.getCause() + '; line number: '
                        + ex.getLineNumber() + '; stack trace: ' + ex.getStackTraceString()
                ;
                update asyncRequest;
                
                enqueueNextJob(currentThread);
                return;
            }

            savepoint = Database.setSavepoint();
            Id jobId = System.enqueueJob(queueable);
            asyncRequest.Status__c = 'Enqueued';
            asyncRequest.Async_Apex_Job_Id__c = jobId;
            update asyncRequest;
        } catch (Exception ex) {
            Database.rollback(savepoint);
            System.debug('Error enqueuing job: ' + ex.getMessage());
        }
    }

    private static String getObjectType(Object obj) {
        String result = 'DateTime';
        try {
            DateTime typeCheck = (DateTime) obj;
        }
        catch(System.TypeException typeException) {
            String message = typeException.getMessage().substringAfter('Invalid conversion from runtime type ');
            result = message.substringBefore(' to Datetime');
        }

        return result;
    }
}

3. Enqueue Jobs Using the QueueableManager

To enqueue jobs, simply create a list of BaseQueueable instances and pass them to the QueueableManager.enqueue() method.

List<BaseQueueable> queueables = new List<BaseQueueable> {
    new MyQueueable(), new MyQueueable(), new MyQueueable()
};
QueueableManager.enqueue(queueables);

Thread Logic for Preventing Deadlocks

One common issue when enqueuing multiple Queueable Jobs is that multiple threads might try to process the same record, leading to deadlocks. To prevent this, we introduced a thread logic in the Queueable Manager. Here's how it works:

  • Each Queueable Job is assigned a thread number when it is enqueued.
  • The Thread__c field in the Async_Request__c object stores this thread number.
  • When enqueuing the next job, the system only picks up jobs with the same thread number, ensuring that no two processes grab the same record.

This approach ensures that jobs are processed in a thread-safe manner, preventing deadlocks and improving the reliability of the Queueable Manager.

Best Practices

  • Monitor Governor Limits: Always check the available limits before enqueuing jobs to avoid hitting the Queueable Job limit.
  • Use Custom Objects for Job Tracking: Store job details in a custom object to allow for retries and better error handling.
  • Implement Retry Logic: Use the Async_Request__c object to track failed jobs and retry them later.
  • Optimize JSON Serialization: Ensure that your Queueable Jobs are serializable and avoid storing large amounts of data in the Params__c field.
  • Test in Sandbox First: Always test your Queueable Jobs in a sandbox environment before deploying to production.

Common Errors and How to Avoid Them

  • Too Many Queueable Jobs: This error occurs when you exceed the governor limit for Queueable Jobs. Use the Queueable Manager to enqueue jobs only when limits allow.
  • Serialization Errors: Ensure that your Queueable Jobs are serializable and avoid using non-serializable objects.
  • Deadlocks: Use the FOR UPDATE clause and thread logic to prevent multiple processes from accessing the same record simultaneously.

Performance, Security, and Limit Considerations

When using Queueable Jobs, it's important to consider performance, security, and governor limits. Always monitor the number of jobs being enqueued and ensure that your code is optimized for performance. Additionally, be mindful of security by restricting access to sensitive data and using proper error handling to prevent data leaks.

Conclusion

Optimizing Salesforce Queueable Jobs with a Queueable Manager can significantly improve the efficiency and reliability of your asynchronous operations. By following the best practices outlined in this post, you can avoid common pitfalls and ensure that your Queueable Jobs run smoothly, even under heavy loads.

References

If you found this post helpful, please share it with your network and leave a comment below.

Post a Comment

0 Comments