Queue System / Preparing Jobs For Queue

Before The Dive Preparing Jobs For Queue Pushing Jobs To Queue Workers

Every job we push to queue is stored in some storage space sorted by the order of execution, this storage place could be a MySQL database, Redis store, or a 3rd party service like Amazon SQS.

Later in this dive we're going to explore how workers fetch these jobs and start executing them, but before that let's see how we store our jobs, here are the attributes we keep for every job we store:

  • What queue it should be running through
  • The number of times this job was attempted (Initially zero)
  • The time when the job was reserved by a worker
  • The time when the job becomes available for a worker to pickup
  • The time when the job was created
  • The payload of the job

What do you mean by queue?

Your application sends several types of jobs to queue, by default all the jobs are put in a single queue; however, you might want to put mail jobs in a different queue and instruct a dedicated worker to run jobs only from this queue, this will ensure that other jobs won't delay sending emails since it has a dedicated worker of its own.

The number of attempts

By default the queue manager will keep trying to run a specific job if it fails running, it'll keep doing that forever unless you set a maximum number of attempts, when the job attempts reach that number the job will be marked as failed and workers won't try to run it again. This number starts as zero but we keep incrementing it every time we run the job.

The Reservation time

Once a worker picks a job we mark it as reserved and store the timestamp of when that happened, next time a worker tries to pick a new job it won't pick a reserved one unless the reservation lock is expired, but more on that later.

The availability time

By default a job is available once it's pushed to queue and workers can pick it right away, but sometimes you might need to delay running a job for sometime, you can do that by providing a delay while pushing the job to queue using the later() method instead of push():

Queue::later(60, new SendInvoice())

The later() method uses the availableAt() method to determine the availability time:

protected function availableAt($delay = 0)
{
    return $delay instanceof DateTimeInterface
                        ? $delay->getTimestamp()
                        : Carbon::now()->addSeconds($delay)->getTimestamp();
}

As you can see you can pass an instance of DateTimeInterface to set the exact time, or you can pass the number of seconds and Laravel will calculate the availability time for you under the hood.

The payload

The push() & later() methods use the createPayload() method internally to create the information needed to execute the job when it's picked by a worker. You can pass a job to queue in two formats:

// Pass an object
Queue::push(new SendInvoice($order));

// Pass a string
Queue::push('App\Jobs\[email protected]', ['order_id' => $orderId])

In the second example while the worker is picking the job, Laravel will use the container to create an instance of the given class, this gives you the chance to require any dependencies in the job's constructor.

Creating the payload of a string job

createPayload() calls createPayloadArray() internally which calls the createStringPayload() method in case the job type is non-object:

protected function createStringPayload($job, $data)
{
    return [
        'displayName' => is_string($job) ? explode('@', $job)[0] : null,
        'job' => $job, 'maxTries' => null,
        'timeout' => null, 'data' => $data,
    ];
}

The displayName of a job is a string you can use to identify the job that's running, in case of non-object job definitions we use the the job class name as the displayName.

Notice also that we store the given data in the job payload.

Creating the payload of an object job

Here's how an object-based job payload is created:

protected function createObjectPayload($job)
{
    return [
        'displayName' => $this->getDisplayName($job),
        'job' => 'Illuminate\Queue\[email protected]',
        'maxTries' => isset($job->tries) ? $job->tries : null,
        'timeout' => isset($job->timeout) ? $job->timeout : null,
        'data' => [
            'commandName' => get_class($job),
            'command' => serialize(clone $job),
        ],
    ];
}

Since we already have the instance of the job we can extract some useful information from it, for example the getDisplayName() method looks for a displayName() method inside the job instance and if found it uses the return value as the job name, that means you can add such method in your job class to be in control of the name of your job in queue.

protected function getDisplayName($job)
{
    return method_exists($job, 'displayName')
                    ? $job->displayName() : get_class($job);
}

We can also extract the value of the maximum number a job should be retried and the timeout for the job, if you pass these values as class properties Laravel stores this data into the payload for use by the workers later.

As for the data attribute, Laravel stores the class name of the job as well as a serialized version of that job.

Then how can I pass my own data

In case you chose to pass an object-based job you can't provide a data array, you can store any data you need inside the job instance and it'll be available once un-serialized.

Why do we pass a different class as the "job" parameter

Queue\[email protected] is a special class Laravel uses while running object-based jobs, we'll look into it in a later stage.

Serializing jobs

Serializing a PHP object generates a string that holds information about the class the object is an instance of as well as the state of that object, this string can be used later to re-create the instance.

In our case we serialize the job object in order to be able to easily store it somewhere until a worker is ready to pick it up & run it, while creating the payload for the job we serialize a clone of the job object:

serialize(clone $job);

But why a clone? why not serialize the object itself?

While serializing the job we might need to do some transformation to some of the job properties or properties of any of the instances our job might be using, if we pass the job instance itself transformations will be applied to the original instances while this might not be desired, let's take a look at an example:

class SendInvoice
{
    public function __construct(Invoice $invoice)
    {
        $this->invoice = $invoice;
    }
}

class Invoice
{
    public function __construct($pdf, $customer)
    {
        $this->pdf = $pdf;
        $this->customer = $customer;
    }
}

While creating the payload for the SendInvoice job we're going to serialize that instance and all its child objects, the Invoice object, but PHP doesn't always work well with serializing files and the Invoice object has a property called $pdf which holds a file, for that we might want to store that file somewhere, keep a reference to it in our instance while serializing and then when we un-serialize we bring back the file using that reference.

class Invoice
{
    public function __construct($pdf, $customer)
    {
        $this->pdf = $pdf;
        $this->customer = $customer;
    }

    public function __sleep()
    {
        $this->pdf = stream_get_meta_data($this->pdf)['uri'];

        return ['customer', 'pdf'];
    }

    public function __wakeup()
    {
        $this->pdf = fopen($this->pdf, 'a');
    }
}

The __sleep() method is automatically called before PHP starts serializing our object, in our example we transform our pdf property to hold the path to the PDF resource instead of the resource itself, and inside __wakup() we convert that path back to the original value, that way we can safely serialize the object.

Now if we dispatch our job to queue we know that it's going to be serialized under the hood:

$invoice = new Invoice($pdf, 'Customer #123');

Queue::push(new SendInvoice($invoice));

dd($invoice->pdf);

However, if we try to look at the invoice pdf property after sending the job to queue we'll find that it holds the path to the resource, it doesn't hold the resource anymore, that's because while serializing the Job PHP serialized the original invoice instance as well since it was passed by reference to the SendInvoice instance.

Here's when cloning becomes handy, while cloning the SendInvoice object PHP will create a new instance of that object but that new instance will still hold reference to the original Invoice instance, but we can change that:

class SendInvoice
{
    public function __construct(Invoice $invoice)
    {
        $this->invoice = $invoice;
    }

    public function __clone()
    {
        $this->invoice = clone $this->invoice;
    }
}

Here we instruct PHP that whenever it clones an instance of the SendInvoice object it should use a clone of the invoice property in the new instance not the original one, that way the original Invoice object will not be affected while we serialize.

Continue to "Pushing Jobs To Queue"

I'm Mohamed Said, a web developer from Hurghada-Egypt. I work with the laravel core team trying to deliver the best developer experience.
Find me as @themsaid


Subscribe to mailing list