Simple Description: Asynchronous batch job processor for handling bulk transactions in Laravel using queues and real-time updates.
A Laravel-based system that processes large volumes of transactions asynchronously using queued batch jobs. It breaks down large transaction sets into smaller chunks, processes them in parallel, and provides real-time status updates.
βββ Jobs/
β βββ LoadSendTransactionsBatch.php # Main batch job loader
βββ Services/
β βββ SendJobTransactionService.php # Service orchestrator
<?php
namespace App\Jobs;
use App\Models\Transaction\Transaction;
use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Throwable;
class LoadSendTransactionsBatch implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
use Dispatchable;
use InteractsWithQueue;
use Queueable;
use SerializesModels;
use Batchable;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct(
private int $transactionId,
private int $userId,
) {
}
/**
* @return string
*/
public function uniqueId(): string
{
return $this->transactionId;
}
/**
* Execute the job.
*
* @return void
*
* @throws Throwable
*/
public function handle()
{
if ($this->batch()->cancelled()) {
return;
}
Transaction::findOrFail($this->transactionId)
->getTransactionsToSend()
->lazy(500)
->each(fn ($transactionDetails) => $this->batch()
->add(
new SendTransactionsProcess(
transactionId: $this->transactionId,
transactionDetailId: $transactionDetails->id,
userId: $this->userId
)
)
);
}
}<?php
namespace App\Services\Transaction;
use App\Events\TransactionUpdated;
use App\Exceptions\TransactionException;
use App\Jobs\LoadSendTransactionsBatch;
use App\Models\Transaction\Transaction;
use Illuminate\Support\Facades\Bus;
use Throwable;
class SendJobTransactionService
{
protected Transaction $transaction;
protected int $userId;
/**
* add the transaction and transaction detail.
*
* @param int $transactionId
* @return $this
*/
public function to(int $transactionId): static
{
$this->transaction = Transaction::lockForUpdate()->findOrFail($transactionId);
return $this;
}
/**
* user who process transaction.
*
* @param int $userId
* @return $this
*/
public function sendBy(int $userId): static
{
$this->userId = $userId;
return $this;
}
/**
* make checks before handling functionality to Send Job transaction.
*
* @throws Throwable
*/
private function check(): void
{
throw_if(! $this->transaction->canSendTransaction(), TransactionException::cannotSendTransaction());
throw_if(! $this->transaction->isFinishedJob(), TransactionException::transactionJobNotCompleted());
throw_if(! $this->transaction->haveTransactionsToSend(), TransactionException::transactionNotHaveJobs());
}
/**
* handle the logic to send the transaction.
*
* @throws Throwable
*/
public function handle(): void
{
$this->check();
$batch = Bus::batch([
new LoadSendTransactionsBatch($this->transaction->id, $this->userId),
])
->allowFailures()
->name("transaction-{$this->transaction->id}")
->finally(fn () => broadcast(new TransactionUpdated($this->transaction->id)))
->dispatch();
$this->transaction->update([
'batch_id' => $batch->id,
]);
}
}| Feature | Description |
|---|---|
| Chunked Processing | Loads 500 records at a time to prevent memory issues |
| Unique Jobs | Prevents duplicate processing with ShouldBeUniqueUntilProcessing |
| Batch Cancellation | Stops processing if batch is cancelled |
| Real-time Updates | Broadcasts status via websockets |
| Error Handling | Custom exceptions with descriptive messages |
| Database Locking | Prevents race conditions with lockForUpdate() |