How to group queued jobs using Laravel 8's new Batch class
Laravel 8 offers a shiny new way to group multiple jobs into one batch. This will allow you to easily check how many job there are in a batch, what to total progress is and even cancel all jobs in a batch.
In this blog post, I'd like to share how we will use this feature in the upcoming v3 of Mailcoach. We'll also take a look at how batches are implemented under the hood in Laravel.
Introducing Mailcoach
In this blogpost we're going to take a look at some of the source code of Mailcoach. You can think of Mailcoach as a self-hosted version of Mailchimp. It sends mails via external mail services providers such as Amazon SES, Mailgun, Sendgrid, ... Those services are pretty cheap to use even for large volumes.
This makes using Mailcoach much more affordable than Mailchimp. If you want to know more about Mailcoach itself, head over to the site, or read the docs.
Making sending a campaign fast and restartable
In Mailcoach, you can send a campaign to a list of subscribers. To send a mail to each subscriber of a list, a naive approach would be to loop over the subscribers and immediately send an email.
protected function sendMailsForCampaign(Campaign $campaign)
{
$subscribers = $campaign->list->subscribers;
$subscribers->each(function (Subscriber $subscriber)
// send a mail directly here
});
}
This approach has some drawbacks. Under the hood, sending a mail is an API call to an external service. Such a call can be slow. Sending emails one after the other can take quite some time before all emails get sent.
A second drawback is that this process is not restartable. Imagine that there is a problem in the middle of sending a campaign. You'll have a hard time figuring out which emails were sent and which not. If you executed SendCampaignAction
again, some subscribers would get the campaign twice, which is very bad.
To avoid these problems, Mailcoach does not directly send emails from within a loop. Instead, it will loop over each subscriber of a list and create a Send
model. This model represents an email that should be sent (or has been sent). For each created Send
model, a SendMailJob
is dispatched. That job will send the actual mail.
Here's the relevant code from Mailcoach. I've simplified it a bit for brevity.
// in SendCampaignAction.php
protected function sendMailsForCampaign(Campaign $campaign)
{
$subscribers = $campaign->list->subscribers;
$subscribers->each(function (Subscriber $subscriber) use ($campaign) {
$pendingSend = $this->createSend($campaign, $subscriber);
dispatch(new SendMailJob($pendingSend));
});
dispatch(new MarkCampaignAsSentJob($campaign));
}
protected function createSend(Campaign $campaign, Subscriber $subscriber): Send
{
/** @var \Spatie\Mailcoach\Models\Send $pendingSend */
$pendingSend = $campaign->sends()
->where('subscriber_id', $subscriber->id)
->first();
if ($pendingSend) {
return $pendingSend;
}
return $campaign->sends()->create([
'subscriber_id' => $subscriber->id,
'uuid' => (string)Str::uuid(),
]);
}
With this approach, the two drawbacks of the naive approach are addressed. Mails are now being sent from a queue. When using multiple workers to handle the queue, emails will be sent in parallel.
The SendCampaignAction.php
is now restartable. You can see that createSend
will not create a new Send
model if one already exists for the given subscriber. When the SendMailJob
actually sends out a mail for the given Send
, it will mark the Send
as processed. It will not send emails for a Send
that was already processed.
From dispatching individual jobs...
When Mailcoach has sent emails to each subscriber, it will mark the campaign as fully sent. This is done by the MarkCampaignAsSentJob
, which you saw being dispatched at the end of the sendMailsForCampaign
function. Let's take a look at how this job looks like in Mailcoach v2.
namespace Spatie\Mailcoach\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Spatie\Mailcoach\Events\CampaignSentEvent;
use Spatie\Mailcoach\Models\Campaign;
use Spatie\Mailcoach\Support\Config;
class MarkCampaignAsSentJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public Campaign $campaign;
/** @var string */
public $queue;
/** We will retry this on each minute for an entire day */
public int $tries = 60 * 24;
public function __construct(Campaign $campaign)
{
$this->campaign = $campaign;
$this->queue = config('mailcoach.perform_on_queue.send_campaign_job');
$this->connection = $this->connection ?? Config::getQueueConnection();
}
public function handle()
{
if (! $this->allMailsHaveBeenSent()) {
$this->release(60);
return;
}
$this->campaign->markAsSent($this->campaign->sends()->count());
event(new CampaignSentEvent($this->campaign));
}
protected function allMailsHaveBeenSent(): bool
{
return (int) $this->campaign->sendsCount() === (int) $this->campaign->fresh()->sent_to_number_of_subscribers;
}
}
You can see that in this job we'll poll the queue to check if all mails have been sent. If not all emails have been sent, the job will be re-added to the queue by releasing it. It will get picked up again in 60 seconds.
This approach works, but it feels rather dirty. Luckily, Laravel 8 contains some new queueing functionality that makes this a lot easier.
... to dispatching jobs in a batch
Laravel 8 introduces a new batch
method that allows to dispatch multiple jobs in one go. Let's take a look at we can refactor the code above using batch
.
protected function sendMailsForCampaign(Campaign $campaign)
{
$jobs = $campaign->list->subscribers
->cursor()
->map(fn (Subscriber $subscriber) => $this->createSendMailJob($campaign, $subscriber, $segment))
->filter()
->toArray();
$batch = Bus::batch($jobs)
->allowFailures()
->finally(function () use ($campaign) {
$campaign->markAsSent($this->campaign->sends()->count());
event(new CampaignSentEvent($campaign));
})
->dispatch();
$campaign->update(['send_batch_id' => $batch->id]);
}
By default, the entire batch of jobs would be canceled when one of the jobs fails. In this case, we don't want one failing mail to stop all others. We can prevent that from happening by calling allowFailures
.
Instead of polling to check if all mails are sent, we can pass a callable to the finally
method. That callable will be executed when all jobs in the batch have been processed. The MarkCampaignAsSentJob
from Mailcoach v2 isn't needed anymore and is removed in v3.
Solving memory issues
This code above introduced a new problem. Imagine that the email list you're sending a campaign to contains many subscribers, let's say half a million. Using the code above, the jobs
variable would hold half a million jobs. Memory will likely run out if we leave things like this. Let's fix that!
Instead of scheduling all jobs at once, let's add them one by one. This can be done using the add
method.
$batch = Bus::batch([])
->allowFailures()
->finally(function () use ($campaign) {
$campaign->markAsSent($this->campaign->sends()->count());
event(new CampaignSentEvent($campaign));
})
->dispatch();
$campaign->update(['send_batch_id' => $batch->id]);
$subscribersQuery
->cursor()
->map(fn (Subscriber $subscriber) => $this->createSendMailJob($campaign, $subscriber, $segment))
->filter()
->each(fn (SendMailJob $sendMailJob) => $batch->add($sendMailJob));
The cursor
method can retrieve models in an efficient way and keeps memory usage low. It returns and instance of LazyCollection
. To know more about that kind of collection, check out Joseph Silber's excellent blog post that explains the ins and outs of this class.
In the code snippet above the memory issue is solved but another problem was introduced. The finally
method will get executed each time all jobs in the batch have been executed. So if the first job added in the batch gets executed before another one was added, finally
will already be executed, and the campaign will be marked as sent. That's not our intention.
We only want the campaign marked as sent after jobs for all subscribers have been added to the batch, and the batch is fully processed. In Mailcoach, we've solved this by adding another job to the batch called MarkCampaignAsFullyDispatchedJob
after all SendMailJobs
have been added. This is how MarkCampaignAsFullyDispatchedJob
looks like.
namespace Spatie\Mailcoach\Jobs;
use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Spatie\Mailcoach\Models\Campaign;
class MarkCampaignAsFullyDispatchedJob implements ShouldQueue
{
use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public Campaign $campaign;
public function __construct(Campaign $campaign)
{
$this->campaign = $campaign;
}
public function handle()
{
$this->campaign->update(['all_jobs_added_to_batch_at' => now()]);
}
}
In this job we'll update the all_jobs_added_to_batch_at
of the attribute. Notice that we use the new Illuminate\Bus\Batchable
trait, which should be added to all jobs you put in a batch.
In the finally
method of the batch in SendCampaignAction
we can should now check if all_jobs_added_to_batch_at
was set. If it were set, we would know for sure that all sends of the campaign were dispatched and handled.
$campaign->update(['all_jobs_added_to_batch_at' => null]);
$batch = Bus::batch([])
->allowFailures()
->finally(function () use ($campaign) {
if (! $campaign->refresh()->all_jobs_added_to_batch_at) {
return;
}
$campaign->markAsSent($this->campaign->sends()->count());
event(new CampaignSentEvent($campaign));
})
->dispatch();
$campaign->update(['send_batch_id' => $batch->id]);
$subscribersQuery
->cursor()
->map(fn (Subscriber $subscriber) => $this->createSendMailJob($campaign, $subscriber, $segment))
->filter()
->each(fn (SendMailJob $sendMailJob) => $batch->add($sendMailJob));
$batch->add(new MarkCampaignAsFullyDispatchedJob($campaign));
This code is now able to handle campaigns that are sent to large email lists. Let's make one final optimization. Right now, all jobs are added to the queue one by one. It might be better for performance to add multiple jobs to the batch in one go.
Luckily the LazyCollection
which we are using to loop over each subscriber, can be chunked. Let's use that to our advantage.
$subscribersQuery
->cursor()
->map(fn (Subscriber $subscriber) => $this->createSendMailJob($campaign, $subscriber, $segment))
->filter()
->chunk(1000)
->each(function (LazyCollection $jobs) use ($batch) {
$batch->add($jobs); // 1000 jobs are now added in one go
});
So we get subscribers one by one via a MySQL cursor and convert each one to a job. When we have done that a thousand times, the jobs are added to the batch and and the next subscribers will be retrieved. In my opinion, the fact that you can chunk a LazyCollection
is pretty mind-blowing.
At the moment of writing I have two nitpicks. First, I don't like that it is required to pass an empty array to batch
. Secondly, it would nice if add
would accept a closure, so a MarkCampaignAsFullyDispatchedJob
can be refactored away.
$subscribersQuery
// ...
->each(fn (SendMailJob $sendMailJob) => $batch->add(function() {
// update campaign
});
I'm sure the Laravel team will improve on this in the future.
Working with a batch
Did you notice that in the code above we saved the batch id to the database?
$campaign->update(['send_batch_id' => $batch->id]);
This id will help us retrieve an instance of Illuminate\Bus\Batch
, which contains many helpful methods. You could, for example, use that instance to cancel all jobs in a batch.
In Mailcoach, you can cancel sending a campaign while it is being sent. This is how that looks like in the UI:
This is the code that will get executed when cancel
is clicked.
namespace Spatie\Mailcoach\Http\App\Controllers\Campaigns;
use Illuminate\Support\Facades\Bus;
use Spatie\Mailcoach\Enums\CampaignStatus;
use Spatie\Mailcoach\Models\Campaign;
use Spatie\Mailcoach\Traits\UsesMailcoachModels;
class CancelSendingCampaignController
{
use UsesMailcoachModels;
public function __invoke(Campaign $campaign)
{
$batch = Bus::findBatch(
$campaign->send_batch_id
);
$batch->cancel();
$campaign->update([
'status' => CampaignStatus::CANCELLED,
'sent_at' => now(),
]);
flash()->success(__('Sending successfully cancelled.'));
return redirect()->back();
}
}
Batch
has these helpful properties and functions:
-
totalJobs
: the total number of jobs in the batch -
pendingJobs
: the number of jobs that have not been handled yet -
failedJobs
: the number of failed jobs -
progress()
: the percentage of jobs that have been processed
There are a few more properties and functions available. You can discover them in the docs or by browsing the source code.
These properties and functions are used in various places in the Mailcoach code base.
How it works under the hood
The basic implementation of batches is quite simple. All batches get stored in a repository. The default repository is a table called job_batches
. Here's the migration for that table.
Schema::create('job_batches', function (Blueprint $table) {
$table->string('id')->primary();
$table->string('name');
$table->integer('total_jobs');
$table->integer('pending_jobs');
$table->integer('failed_jobs');
$table->text('failed_job_ids');
$table->text('options')->nullable();
$table->integer('cancelled_at')->nullable();
$table->integer('created_at');
$table->integer('finished_at')->nullable();
});
When calling Bus::batch
, a new instance of PendingBatch
is created. PendingBatch
is a class that can be used to configure a batch. In this video, I dive deeper into how these pending object can avoid large function signatures.
When you've configured your batch called methods like allowFailures()
and finally
on PendingBatch
, you will start the batch by calling dispatch
.
Bus::batch($jobs)
->allowFailures()
->finally(function() {
})
->dispatch();
Let's take a look at how the dispatch
method looks like.
$repository = $this->container->make(BatchRepository::class);
try {
$batch = $repository->store($this);
$batch = $batch->add($this->jobs);
} catch (Throwable $e) {
if (isset($batch)) {
$repository->delete($batch->id);
}
throw $e;
}
$this->container->make(EventDispatcher::class)->dispatch(
new BatchDispatched($batch)
);
return $batch;
First, a record is created for the batch in the repository. This is the implementation of store
in DatabaseBatchRepository
. Notice that a UUID is used as the id of a batch.
public function store(PendingBatch $batch)
{
$id = (string) Str::orderedUuid();
$this->connection->table($this->table)->insert([
'id' => $id,
'name' => $batch->name,
'total_jobs' => 0,
'pending_jobs' => 0,
'failed_jobs' => 0,
'failed_job_ids' => '[]',
'options' => serialize($batch->options),
'created_at' => time(),
'cancelled_at' => null,
'finished_at' => null,
]);
return $this->find($id);
}
In the dispatch
method, a job is added to the batch by calling the add
method.
$batch = $batch->add($this->jobs);
Inside the add
method, the batch id will get injected into the payload of the job added to the batch.
Now let's take a look at how canceling a batch is implemented. Calling cancel
will result in calling cancel
on the repository. The cancelled_at
and finished_at
columns will be filled with the current date time.
// in `DatabaseBatchRepository.php`
public function cancel(string $batchId)
{
$this->connection->table($this->table)->where('id', $batchId)->update([
'cancelled_at' => time(),
'finished_at' => time(),
]);
}
With the batch id injected in the job, we can easily retrieve the batch by using the Batchable
trait on the job.
use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class YourJob {
use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function handle() {
if (optional($this->batch())->canceled()) {
// optionally perform some clean up if necessary
return;
}
// do the actual work
}
}
Closing thoughts
Batches are an excellent addition to the framework. At the moment of writing Laravel 8 isn't released yet, but I think it's fairly safe to assume that the API for Batch won't change anymore.
The Mailcoach code snippets in this blogpost were a bit simplified for brevity. If you want to see the actual code, consider picking up a license at the Mailcoach website.
Even if you don't need a hosted solution to send out an email campaign, you'll probably learn some cool things by reading the source code and watching the videos.
If you want to see another example of Laravel's batched jobs being used, consider picking up Mohamed's book on queues. I read a proof copy and can highly recommend it to anyone that uses Laravel queues.
What are your thoughts on "How to group queued jobs using Laravel 8's new Batch class"?