Laravel Queues in Action is now available!

Job Batching in Laravel: How It Works

Updated: Sep 7, 2020 — 3 min Read#queues

Laravel 8 ships with a neat feature that allows us to dispatch a group of jobs to the queue to be executed in parallel. We can monitor these jobs and execute certain logic when any of the jobs fail or when all jobs are processed.

In this post, we're going to look into how this works under the hood.

Here's an example from the official docs:

$batch = Bus::batch([
    new ProcessPodcast(Podcast::find(1)),
    new ProcessPodcast(Podcast::find(2)),
    new ProcessPodcast(Podcast::find(3)),
    new ProcessPodcast(Podcast::find(4)),
    new ProcessPodcast(Podcast::find(5)),
])->then(function (Batch $batch) {
    // All jobs completed successfully...
})->catch(function (Batch $batch, Throwable $e) {
    // First batch job failure detected...
})->finally(function (Batch $batch) {
    // The batch has finished executing...
})->name('Process Podcasts')
  ->allowFailures(false)
  ->onConnection('redis')
  ->onQueue('podcasts')
  ->dispatch();

👋 I recently published a book titled "Laravel Queues in Action" that covers everything I know about running queues in Laravel. Check it out for a crash course, a cookbook, a guide, and a reference.


Storing the Batch

When you call dispatch(), Laravel stores the batch information in the database:

$this->connection->table($this->table)->insert([
    'id' => $id,
    'name' => $batch->name,
    'total_jobs' => 0,
    'pending_jobs' => 0,
    'failed_jobs' => 0,
    'failed_job_ids' => '[]',
    'options' => serialize($batch->options),
    'created_at' => time(),
    'cancelled_at' => null,
    'finished_at' => null,
]);

This snippet is from the store() method of the Illuminate\Bus\DatabaseBatchRepository class. Each batch is assigned a UUID and an optional name. The batch options array is also stored as a string after being serialized.

That options array holds the closures provided to the then, catch, and finally methods. It also holds the values passed to allowFailures, onConnection, and onQueue.

The closures are serialized by the framework's implementation of Opis\Closure\SerializableClosure so they can be stored in the database.

Dispatching the Jobs

Now that the batch information is stored in the database, Laravel stores the batch ID in each job instance we pass to the batch and then dispatches the jobs to the queue:

$jobs->each->withBatchId($this->id);

$this->repository->incrementTotalJobs(
    $this->id, 
    count($jobs)
);

$this->queue
     ->connection($this->options['connection'] ?? null)
     ->bulk(
        $jobs->all(),
        $data = '',
        $this->options['queue'] ?? null
    );

This snippet is from the add() method of the Illuminate\Bus\Batch class.

While dispatching the jobs to the queue, Laravel increments the jobs count in the database row where the batch is stored. That way it can track the remaining jobs to check whether the batch has finished or not.

You can also notice that Laravel uses the bulk() method to dispatch the jobs. That way it can send all the jobs to the queue store in a single transaction instead of calling dispatch() for every single job.

Monitoring the Batch

After each successful job execution, the recordSuccessfulJob() method of the Illuminate\Bus\Batch class is called. This method decrements the number of pending batch jobs in the database and calls the proper callbacks:

$counts = $this->decrementPendingJobs($jobId);

if ($counts->pendingJobs === 0) {
    $this->repository->markAsFinished($this->id);
}

if ($counts->pendingJobs === 0 && $this->hasThenCallbacks()) {
    collect($this->options['then'])->each(/* Invoke */);
}

if ($counts->allJobsHaveRanExactlyOnce() && $this->hasFinallyCallbacks()) {
    collect($this->options['finally'])->each(/* Invoke */);
}

If there aren't any pending jobs in the batch, Laravel will mark the batch as finished by updating the finished_at timestamp in the database. It'll also call any then callbacks if all jobs in the batch ran successfully, and any finally callbacks if all the jobs in the batch were attempted at least once.

Similarly, the recordFailedJob() method is called after any job failure. And by failure I mean consuming all the attempts of a job.

$counts = $this->incrementFailedJobs($jobId);

if ($counts->failedJobs === 1 && ! $this->allowsFailures()) {
    $this->cancel();
}

if ($counts->failedJobs === 1 && $this->hasCatchCallbacks()) {
    $batch = $this->fresh();

    collect($this->options['catch'])->each(/* Invoke */);
}

if ($counts->allJobsHaveRanExactlyOnce() && $this->hasFinallyCallbacks()) {
    $batch = $this->fresh();

    collect($this->options['finally'])->each(/* Invoke */);
}

If the batch doesn't allow failure, it's going to be cancelled after the first recorded failure. The cancelled_at field in the database will be updated with the current timestamp.

In addition to that, any catch callbacks will be called it was the first recorded failure, and any finally callbacks will be called if all the jobs in the batch were attempted at least once.

The recordSuccessfulJob() and recordFailedJob() methods are called from within the Illuminate\Queue\CallQueuedHandler class.

Exploring Further

If you wish to learn further details on how batches works. You can check these classes and methods:

If you have more questions on Job Batching in Laravel, feel free to comment on this tweet.

Hey! 👋 If you want to receive updates on what I'm up to, I host a newsletter on my website themsaid.com and would love to have you.

You can also follow me on Twitter, I regularly post about all things Laravel including my latest video tutorials and blog posts.

By Mohamed Said

Hello! I'm a full-stack web developer working at Laravel. In this publication, I share everything I know about Laravel's core, packages, and tools.

You can find me on Twitter and Github.

This site was built using Wink. Follow the RSS Feed.