Oh Dear is the all-in-one monitoring tool for your entire website. We monitor uptime, SSL certificates, broken links, scheduled tasks and more. You'll get a notifications for us when something's wrong. All that paired with a developer friendly API and kick-ass documentation. O, and you'll also be able to create a public status page under a minute. Start monitoring using our free trial now.

How to group queued jobs using Laravel 8's new Batch class

Original – by Freek Van der Herten – 14 minute read

Laravel 8 offers a shiny new way to group multiple jobs into one batch. This will allow you to easily check how many job there are in a batch, what to total progress is and even cancel all jobs in a batch.

In this blog post, I'd like to share how we will use this feature in the upcoming v3 of Mailcoach. We'll also take a look at how batches are implemented under the hood in Laravel.

Introducing Mailcoach

In this blogpost we're going to take a look at some of the source code of Mailcoach. You can think of Mailcoach as a self-hosted version of Mailchimp. It sends mails via external mail services providers such as Amazon SES, Mailgun, Sendgrid, ... Those services are pretty cheap to use even for large volumes.

This makes using Mailcoach much more affordable than Mailchimp. If you want to know more about Mailcoach itself, head over to the site, or read the docs.

Making sending a campaign fast and restartable

In Mailcoach, you can send a campaign to a list of subscribers. To send a mail to each subscriber of a list, a naive approach would be to loop over the subscribers and immediately send an email.

protected function sendMailsForCampaign(Campaign $campaign)
{
    $subscribers = $campaign->list->subscribers;

    $subscribers->each(function (Subscriber $subscriber) 
        // send a mail directly here
    });
}

This approach has some drawbacks. Under the hood, sending a mail is an API call to an external service. Such a call can be slow. Sending emails one after the other can take quite some time before all emails get sent.

A second drawback is that this process is not restartable. Imagine that there is a problem in the middle of sending a campaign. You'll have a hard time figuring out which emails were sent and which not. If you executed SendCampaignAction again, some subscribers would get the campaign twice, which is very bad.

To avoid these problems, Mailcoach does not directly send emails from within a loop. Instead, it will loop over each subscriber of a list and create a Send model. This model represents an email that should be sent (or has been sent). For each created Send model, a SendMailJob is dispatched. That job will send the actual mail.

Here's the relevant code from Mailcoach. I've simplified it a bit for brevity.

// in SendCampaignAction.php

protected function sendMailsForCampaign(Campaign $campaign)
{
    $subscribers = $campaign->list->subscribers;

    $subscribers->each(function (Subscriber $subscriber) use ($campaign) {
        $pendingSend = $this->createSend($campaign, $subscriber);

        dispatch(new SendMailJob($pendingSend));
    });

    dispatch(new MarkCampaignAsSentJob($campaign));
}

protected function createSend(Campaign $campaign, Subscriber $subscriber): Send
{
    /** @var \Spatie\Mailcoach\Models\Send $pendingSend */
    $pendingSend = $campaign->sends()
        ->where('subscriber_id', $subscriber->id)
        ->first();

    if ($pendingSend) {
        return $pendingSend;
    }

    return $campaign->sends()->create([
        'subscriber_id' => $subscriber->id,
        'uuid' => (string)Str::uuid(),
    ]);
}

With this approach, the two drawbacks of the naive approach are addressed. Mails are now being sent from a queue. When using multiple workers to handle the queue, emails will be sent in parallel.

The SendCampaignAction.php is now restartable. You can see that createSend will not create a new Send model if one already exists for the given subscriber. When the SendMailJob actually sends out a mail for the given Send, it will mark the Send as processed. It will not send emails for a Send that was already processed.

From dispatching individual jobs...

When Mailcoach has sent emails to each subscriber, it will mark the campaign as fully sent. This is done by the MarkCampaignAsSentJob, which you saw being dispatched at the end of the sendMailsForCampaign function. Let's take a look at how this job looks like in Mailcoach v2.

namespace Spatie\Mailcoach\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Spatie\Mailcoach\Events\CampaignSentEvent;
use Spatie\Mailcoach\Models\Campaign;
use Spatie\Mailcoach\Support\Config;

class MarkCampaignAsSentJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public Campaign $campaign;

    /** @var string */
    public $queue;

    /** We will retry this on each minute for an entire day */
    public int $tries = 60 * 24;

    public function __construct(Campaign $campaign)
    {
        $this->campaign = $campaign;

        $this->queue = config('mailcoach.perform_on_queue.send_campaign_job');

        $this->connection = $this->connection ?? Config::getQueueConnection();
    }

    public function handle()
    {
        if (! $this->allMailsHaveBeenSent()) {
            $this->release(60);

            return;
        }

        $this->campaign->markAsSent($this->campaign->sends()->count());

        event(new CampaignSentEvent($this->campaign));
    }

    protected function allMailsHaveBeenSent(): bool
    {
        return (int) $this->campaign->sendsCount() === (int) $this->campaign->fresh()->sent_to_number_of_subscribers;
    }
}

You can see that in this job we'll poll the queue to check if all mails have been sent. If not all emails have been sent, the job will be re-added to the queue by releasing it. It will get picked up again in 60 seconds.

This approach works, but it feels rather dirty. Luckily, Laravel 8 contains some new queueing functionality that makes this a lot easier.

... to dispatching jobs in a batch

Laravel 8 introduces a new batch method that allows to dispatch multiple jobs in one go. Let's take a look at we can refactor the code above using batch.

protected function sendMailsForCampaign(Campaign $campaign)
{
    $jobs = $campaign->list->subscribers
        ->cursor()
        ->map(fn (Subscriber $subscriber) => $this->createSendMailJob($campaign, $subscriber, $segment))
        ->filter()
        ->toArray();

    $batch = Bus::batch($jobs)
        ->allowFailures()
        ->finally(function () use ($campaign) {
            $campaign->markAsSent($this->campaign->sends()->count());

            event(new CampaignSentEvent($campaign));
        })
        ->dispatch();

    $campaign->update(['send_batch_id' => $batch->id]);
}

By default, the entire batch of jobs would be canceled when one of the jobs fails. In this case, we don't want one failing mail to stop all others. We can prevent that from happening by calling allowFailures.

Instead of polling to check if all mails are sent, we can pass a callable to the finally method. That callable will be executed when all jobs in the batch have been processed. The MarkCampaignAsSentJob from Mailcoach v2 isn't needed anymore and is removed in v3.

Solving memory issues

This code above introduced a new problem. Imagine that the email list you're sending a campaign to contains many subscribers, let's say half a million. Using the code above, the jobs variable would hold half a million jobs. Memory will likely run out if we leave things like this. Let's fix that!

Instead of scheduling all jobs at once, let's add them one by one. This can be done using the add method.

$batch = Bus::batch([])
    ->allowFailures()
    ->finally(function () use ($campaign) {
        $campaign->markAsSent($this->campaign->sends()->count());

        event(new CampaignSentEvent($campaign));
    })
    ->dispatch();

$campaign->update(['send_batch_id' => $batch->id]);

$subscribersQuery
    ->cursor()
    ->map(fn (Subscriber $subscriber) => $this->createSendMailJob($campaign, $subscriber, $segment))
    ->filter()
    ->each(fn (SendMailJob $sendMailJob) => $batch->add($sendMailJob));

The cursor method can retrieve models in an efficient way and keeps memory usage low. It returns and instance of LazyCollection. To know more about that kind of collection, check out Joseph Silber's excellent blog post that explains the ins and outs of this class.

In the code snippet above the memory issue is solved but another problem was introduced. The finally method will get executed each time all jobs in the batch have been executed. So if the first job added in the batch gets executed before another one was added, finally will already be executed, and the campaign will be marked as sent. That's not our intention.

We only want the campaign marked as sent after jobs for all subscribers have been added to the batch, and the batch is fully processed. In Mailcoach, we've solved this by adding another job to the batch called MarkCampaignAsFullyDispatchedJob after all SendMailJobs have been added. This is how MarkCampaignAsFullyDispatchedJob looks like.

namespace Spatie\Mailcoach\Jobs;

use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Spatie\Mailcoach\Models\Campaign;

class MarkCampaignAsFullyDispatchedJob implements ShouldQueue
{
    use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public Campaign $campaign;

    public function __construct(Campaign $campaign)
    {
        $this->campaign = $campaign;
    }

    public function handle()
    {
        $this->campaign->update(['all_jobs_added_to_batch_at' => now()]);
    }
}

In this job we'll update the all_jobs_added_to_batch_at of the attribute. Notice that we use the new Illuminate\Bus\Batchable trait, which should be added to all jobs you put in a batch.

In the finally method of the batch in SendCampaignAction we can should now check if all_jobs_added_to_batch_at was set. If it were set, we would know for sure that all sends of the campaign were dispatched and handled.

$campaign->update(['all_jobs_added_to_batch_at' => null]);

$batch = Bus::batch([])
    ->allowFailures()
    ->finally(function () use ($campaign) {
        if (! $campaign->refresh()->all_jobs_added_to_batch_at) {
            return;
        }

        $campaign->markAsSent($this->campaign->sends()->count());

        event(new CampaignSentEvent($campaign));
    })
    ->dispatch();

$campaign->update(['send_batch_id' => $batch->id]);

$subscribersQuery
    ->cursor()
    ->map(fn (Subscriber $subscriber) => $this->createSendMailJob($campaign, $subscriber, $segment))
    ->filter()
    ->each(fn (SendMailJob $sendMailJob) => $batch->add($sendMailJob));

$batch->add(new MarkCampaignAsFullyDispatchedJob($campaign));

This code is now able to handle campaigns that are sent to large email lists. Let's make one final optimization. Right now, all jobs are added to the queue one by one. It might be better for performance to add multiple jobs to the batch in one go.

Luckily the LazyCollection which we are using to loop over each subscriber, can be chunked. Let's use that to our advantage.

$subscribersQuery
    ->cursor()
    ->map(fn (Subscriber $subscriber) => $this->createSendMailJob($campaign, $subscriber, $segment))
    ->filter()
    ->chunk(1000)
    ->each(function (LazyCollection $jobs) use ($batch) {
        $batch->add($jobs); // 1000 jobs are now added in one go
    });

So we get subscribers one by one via a MySQL cursor and convert each one to a job. When we have done that a thousand times, the jobs are added to the batch and and the next subscribers will be retrieved. In my opinion, the fact that you can chunk a LazyCollection is pretty mind-blowing.

At the moment of writing I have two nitpicks. First, I don't like that it is required to pass an empty array to batch. Secondly, it would nice if add would accept a closure, so a MarkCampaignAsFullyDispatchedJob can be refactored away.

$subscribersQuery
    // ...
    ->each(fn (SendMailJob $sendMailJob) => $batch->add(function() {
	   // update campaign
    });

I'm sure the Laravel team will improve on this in the future.

Working with a batch

Did you notice that in the code above we saved the batch id to the database?

$campaign->update(['send_batch_id' => $batch->id]);

This id will help us retrieve an instance of Illuminate\Bus\Batch, which contains many helpful methods. You could, for example, use that instance to cancel all jobs in a batch.

In Mailcoach, you can cancel sending a campaign while it is being sent. This is how that looks like in the UI:

screenshot

This is the code that will get executed when cancel is clicked.

namespace Spatie\Mailcoach\Http\App\Controllers\Campaigns;

use Illuminate\Support\Facades\Bus;
use Spatie\Mailcoach\Enums\CampaignStatus;
use Spatie\Mailcoach\Models\Campaign;
use Spatie\Mailcoach\Traits\UsesMailcoachModels;

class CancelSendingCampaignController
{
    use UsesMailcoachModels;

    public function __invoke(Campaign $campaign)
    {
        $batch = Bus::findBatch(
            $campaign->send_batch_id
        );

        $batch->cancel();

        $campaign->update([
            'status' => CampaignStatus::CANCELLED,
            'sent_at' => now(),
        ]);

        flash()->success(__('Sending successfully cancelled.'));

        return redirect()->back();
    }
}

Batch has these helpful properties and functions:

  • totalJobs: the total number of jobs in the batch
  • pendingJobs: the number of jobs that have not been handled yet
  • failedJobs: the number of failed jobs
  • progress(): the percentage of jobs that have been processed

There are a few more properties and functions available. You can discover them in the docs or by browsing the source code.

These properties and functions are used in various places in the Mailcoach code base.

How it works under the hood

The basic implementation of batches is quite simple. All batches get stored in a repository. The default repository is a table called job_batches. Here's the migration for that table.

Schema::create('job_batches', function (Blueprint $table) {
    $table->string('id')->primary();
    $table->string('name');
    $table->integer('total_jobs');
    $table->integer('pending_jobs');
    $table->integer('failed_jobs');
    $table->text('failed_job_ids');
    $table->text('options')->nullable();
    $table->integer('cancelled_at')->nullable();
    $table->integer('created_at');
    $table->integer('finished_at')->nullable();
});

When calling Bus::batch, a new instance of PendingBatch is created. PendingBatch is a class that can be used to configure a batch. In this video, I dive deeper into how these pending object can avoid large function signatures.

When you've configured your batch called methods like allowFailures() and finally on PendingBatch, you will start the batch by calling dispatch.

Bus::batch($jobs)
	->allowFailures()
	->finally(function() {
	
	})
	->dispatch();

Let's take a look at how the dispatch method looks like.

$repository = $this->container->make(BatchRepository::class);

try {
    $batch = $repository->store($this);

    $batch = $batch->add($this->jobs);
} catch (Throwable $e) {
    if (isset($batch)) {
        $repository->delete($batch->id);
    }

    throw $e;
}

$this->container->make(EventDispatcher::class)->dispatch(
    new BatchDispatched($batch)
);

return $batch;

First, a record is created for the batch in the repository. This is the implementation of store in DatabaseBatchRepository. Notice that a UUID is used as the id of a batch.

public function store(PendingBatch $batch)
{
    $id = (string) Str::orderedUuid();

    $this->connection->table($this->table)->insert([
        'id' => $id,
        'name' => $batch->name,
        'total_jobs' => 0,
        'pending_jobs' => 0,
        'failed_jobs' => 0,
        'failed_job_ids' => '[]',
        'options' => serialize($batch->options),
        'created_at' => time(),
        'cancelled_at' => null,
        'finished_at' => null,
    ]);

    return $this->find($id);
}

In the dispatch method, a job is added to the batch by calling the add method.

$batch = $batch->add($this->jobs);

Inside the add method, the batch id will get injected into the payload of the job added to the batch.

Now let's take a look at how canceling a batch is implemented. Calling cancel will result in calling cancel on the repository. The cancelled_at and finished_at columns will be filled with the current date time.

// in `DatabaseBatchRepository.php`

public function cancel(string $batchId)
{
    $this->connection->table($this->table)->where('id', $batchId)->update([
        'cancelled_at' => time(),
        'finished_at' => time(),
    ]);
}

With the batch id injected in the job, we can easily retrieve the batch by using the Batchable trait on the job.

use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class YourJob {
   use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

   public function handle() {
      if (optional($this->batch())->canceled()) {
         // optionally perform some clean up if necessary
         return;
      }
		  
      // do the actual work
   }
}

Closing thoughts

Batches are an excellent addition to the framework. At the moment of writing Laravel 8 isn't released yet, but I think it's fairly safe to assume that the API for Batch won't change anymore.

The Mailcoach code snippets in this blogpost were a bit simplified for brevity. If you want to see the actual code, consider picking up a license at the Mailcoach website.

Even if you don't need a hosted solution to send out an email campaign, you'll probably learn some cool things by reading the source code and watching the videos.

If you want to see another example of Laravel's batched jobs being used, consider picking up Mohamed's book on queues. I read a proof copy and can highly recommend it to anyone that uses Laravel queues.

Stay up to date with all things Laravel, PHP, and JavaScript.

You can follow me on these platforms:

On all these platforms, regularly share programming tips, and what I myself have learned in ongoing projects.

Every month I send out a newsletter containing lots of interesting stuff for the modern PHP developer.

Expect quick tips & tricks, interesting tutorials, opinions and packages. Because I work with Laravel every day there is an emphasis on that framework.

Rest assured that I will only use your email address to send you the newsletter and will not use it for any other purposes.

Comments

What are your thoughts on "How to group queued jobs using Laravel 8's new Batch class"?

Comments powered by Laravel Comments
Want to join the conversation? Log in or create an account to post a comment.