Job batching in Laravel: How it works

Sep 7, 2020 3 min read

Laravel 8 ships with a neat feature that allows us to dispatch a group of jobs to the queue to be executed in parallel. We can monitor these jobs and execute certain logic when any of the jobs fail or when all jobs are processed.

In this post, we're going to look into how this works under the hood.

Here's an example from the official docs:

$batch = Bus::batch([
    new ProcessPodcast(Podcast::find(1)),
    new ProcessPodcast(Podcast::find(2)),
    new ProcessPodcast(Podcast::find(3)),
    new ProcessPodcast(Podcast::find(4)),
    new ProcessPodcast(Podcast::find(5)),
])->then(function (Batch $batch) {
    // All jobs completed successfully...
})->catch(function (Batch $batch, Throwable $e) {
    // First batch job failure detected...
})->finally(function (Batch $batch) {
    // The batch has finished executing...
})->name('Process Podcasts')
  ->allowFailures(false)
  ->onConnection('redis')
  ->onQueue('podcasts')
  ->dispatch();

Storing the Batch

When you call dispatch(), Laravel stores the batch information in the database:

$this->connection->table($this->table)->insert([
    'id' => $id,
    'name' => $batch->name,
    'total_jobs' => 0,
    'pending_jobs' => 0,
    'failed_jobs' => 0,
    'failed_job_ids' => '[]',
    'options' => serialize($batch->options),
    'created_at' => time(),
    'cancelled_at' => null,
    'finished_at' => null,
]);

This snippet is from the store() method of the Illuminate\Bus\DatabaseBatchRepository class. Each batch is assigned a UUID and an optional name. The batch options array is also stored as a string after being serialized.

That options array holds the closures provided to the then, catch, and finally methods. It also holds the values passed to allowFailures, onConnection, and onQueue.

The closures are serialized by the framework's implementation of Opis\Closure\SerializableClosure so they can be stored in the database.

Dispatching the Jobs

Now that the batch information is stored in the database, Laravel stores the batch ID in each job instance we pass to the batch and then dispatches the jobs to the queue:

$jobs->each->withBatchId($this->id);

$this->repository->incrementTotalJobs(
    $this->id, 
    count($jobs)
);

$this->queue
     ->connection($this->options['connection'] ?? null)
     ->bulk(
        $jobs->all(),
        $data = '',
        $this->options['queue'] ?? null
    );

This snippet is from the add() method of the Illuminate\Bus\Batch class.

While dispatching the jobs to the queue, Laravel increments the jobs count in the database row where the batch is stored. That way it can track the remaining jobs to check whether the batch has finished or not.

You can also notice that Laravel uses the bulk() method to dispatch the jobs. That way it can send all the jobs to the queue store in a single transaction instead of calling dispatch() for every single job.

Monitoring the Batch

After each successful job execution, the recordSuccessfulJob() method of the Illuminate\Bus\Batch class is called. This method decrements the number of pending batch jobs in the database and calls the proper callbacks:

$counts = $this->decrementPendingJobs($jobId);

if ($counts->pendingJobs === 0) {
    $this->repository->markAsFinished($this->id);
}

if ($counts->pendingJobs === 0 && $this->hasThenCallbacks()) {
    collect($this->options['then'])->each(/* Invoke */);
}

if ($counts->allJobsHaveRanExactlyOnce() && $this->hasFinallyCallbacks()) {
    collect($this->options['finally'])->each(/* Invoke */);
}

If there aren't any pending jobs in the batch, Laravel will mark the batch as finished by updating the finished_at timestamp in the database. It'll also call any then callbacks if all jobs in the batch ran successfully, and any finally callbacks if all the jobs in the batch were attempted at least once.

Similarly, the recordFailedJob() method is called after any job failure. And by failure I mean consuming all the attempts of a job.

$counts = $this->incrementFailedJobs($jobId);

if ($counts->failedJobs === 1 && ! $this->allowsFailures()) {
    $this->cancel();
}

if ($counts->failedJobs === 1 && $this->hasCatchCallbacks()) {
    $batch = $this->fresh();

    collect($this->options['catch'])->each(/* Invoke */);
}

if ($counts->allJobsHaveRanExactlyOnce() && $this->hasFinallyCallbacks()) {
    $batch = $this->fresh();

    collect($this->options['finally'])->each(/* Invoke */);
}

If the batch doesn't allow failure, it's going to be cancelled after the first recorded failure. The cancelled_at field in the database will be updated with the current timestamp.

In addition to that, any catch callbacks will be called it was the first recorded failure, and any finally callbacks will be called if all the jobs in the batch were attempted at least once.

The recordSuccessfulJob() and recordFailedJob() methods are called from within the Illuminate\Queue\CallQueuedHandler class.

Exploring Further

If you wish to learn further details on how batches works. You can check these classes and methods:


I'm Mohamed Said. I work with companies and teams all over the world to build and scale web applications in the cloud. Find me on twitter @themsaid.


Get updates in your inbox.

Menu

Log in to access your purchases.

Log in

Check the courses I have published.


You can reach me on Twitter @themsaid or email [email protected].


Join my newsletter to receive updates on new content I publish.