Skip to content

队列

介绍

在构建您的Web应用程序时,您可能会有一些任务,例如解析和存储上传的CSV文件,这些任务在典型的Web请求中执行时间过长。幸运的是,Laravel允许您轻松创建可以在后台处理的排队作业。通过将耗时的任务移到队列中,您的应用程序可以以惊人的速度响应Web请求,并为客户提供更好的用户体验。

Laravel队列提供了一个统一的队列API,支持多种不同的队列后端,例如Amazon SQSRedis或甚至关系数据库。

Laravel的队列配置选项存储在您应用程序的config/queue.php配置文件中。在此文件中,您将找到与框架中包含的每个队列驱动程序的连接配置,包括数据库、Amazon SQSRedisBeanstalkd驱动程序,以及一个同步驱动程序,该驱动程序将立即执行作业(用于本地开发)。还包括一个null队列驱动程序,该驱动程序会丢弃排队的作业。

lightbulb

Laravel现在提供Horizon,这是一个美观的仪表板和配置系统,用于您的Redis驱动的队列。有关更多信息,请查看完整的Horizon文档

连接与队列

在开始使用Laravel队列之前,了解“连接”和“队列”之间的区别非常重要。在您的config/queue.php配置文件中,有一个connections配置数组。此选项定义了与后端队列服务(例如Amazon SQS、Beanstalk或Redis)的连接。然而,任何给定的队列连接可能有多个“队列”,可以将其视为不同的堆栈或排队作业的堆。

请注意,queue配置文件中的每个连接配置示例都包含一个queue属性。这是作业在发送到给定连接时将被调度到的默认队列。换句话说,如果您在调度作业时没有明确指定它应该调度到哪个队列,则该作业将被放置在连接配置的queue属性中定义的队列上:

php
use App\Jobs\ProcessPodcast;

// 此作业被发送到默认连接的默认队列...
ProcessPodcast::dispatch();

// 此作业被发送到默认连接的“emails”队列...
ProcessPodcast::dispatch()->onQueue('emails');

某些应用程序可能不需要将作业推送到多个队列,而是更喜欢有一个简单的队列。然而,将作业推送到多个队列对于希望优先处理或分段处理作业的应用程序尤其有用,因为Laravel队列工作者允许您指定它应该按优先级处理的队列。例如,如果您将作业推送到high队列,您可以运行一个工作者,使其具有更高的处理优先级:

shell
php artisan queue:work --queue=high,default

驱动程序说明和前提条件

数据库

为了使用database队列驱动程序,您需要一个数据库表来保存作业。通常,这在Laravel的默认0001_01_01_000002_create_jobs_table.php 数据库迁移中包含;但是,如果您的应用程序不包含此迁移,您可以使用make:queue-table Artisan命令来创建它:

shell
php artisan make:queue-table

php artisan migrate

Redis

为了使用redis队列驱动程序,您应该在config/database.php配置文件中配置Redis数据库连接。

exclamation

serializercompression Redis选项不受redis队列驱动程序支持。

Redis集群

如果您的Redis队列连接使用Redis集群,则队列名称必须包含键哈希标签。这是为了确保给定队列的所有Redis键都放置在同一个哈希槽中:

php
'redis' => [
    'driver' => 'redis',
    'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
    'queue' => env('REDIS_QUEUE', '{default}'),
    'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
    'block_for' => null,
    'after_commit' => false,
],

阻塞

使用Redis队列时,您可以使用block_for配置选项来指定驱动程序在迭代工作循环并重新轮询Redis数据库之前应等待多长时间,以便作业变得可用。

根据您的队列负载调整此值可能比不断轮询Redis数据库以获取新作业更有效。例如,您可以将值设置为5,以指示驱动程序在等待作业变得可用时应阻塞五秒:

php
'redis' => [
    'driver' => 'redis',
    'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
    'queue' => env('REDIS_QUEUE', 'default'),
    'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
    'block_for' => 5,
    'after_commit' => false,
],
exclamation

block_for设置为0将导致队列工作者在作业可用之前无限期阻塞。这也将阻止在下一个作业处理之前处理诸如SIGTERM之类的信号。

其他驱动程序前提条件

以下依赖项是所列队列驱动程序所需的。这些依赖项可以通过Composer包管理器安装:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~5.0
  • Redis: predis/predis ~2.0或phpredis PHP扩展
  • MongoDB: mongodb/laravel-mongodb

创建作业

生成作业类

默认情况下,您应用程序的所有可排队作业都存储在app/Jobs目录中。如果app/Jobs目录不存在,则在您运行make:job Artisan命令时将创建它:

shell
php artisan make:job ProcessPodcast

生成的类将实现Illuminate\Contracts\Queue\ShouldQueue接口,指示Laravel该作业应被推送到队列中以异步运行。

lightbulb

作业存根可以使用存根发布进行自定义。

类结构

作业类非常简单,通常只包含一个在队列处理作业时调用的handle方法。让我们看一个示例作业类。在这个例子中,我们假装我们管理一个播客发布服务,需要处理上传的播客文件,然后才能发布它们:

php
<?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * 创建一个新的作业实例。
     */
    public function __construct(
        public Podcast $podcast,
    ) {}

    /**
     * 执行作业。
     */
    public function handle(AudioProcessor $processor): void
    {
        // 处理上传的播客...
    }
}

在这个例子中,请注意我们能够将Eloquent模型直接传递到排队作业的构造函数中。由于作业使用的Queueable特性,Eloquent模型及其加载的关系将在处理作业时优雅地序列化和反序列化。

如果您的排队作业在其构造函数中接受Eloquent模型,则只有模型的标识符将被序列化到队列中。当作业实际处理时,队列系统将自动从数据库中重新获取完整的模型实例及其加载的关系。这种模型序列化的方法允许将更小的作业有效负载发送到您的队列驱动程序。

handle方法依赖注入

handle方法在作业被队列处理时调用。请注意,我们能够在作业的handle方法中类型提示依赖项。Laravel 服务容器会自动注入这些依赖项。

如果您希望完全控制容器如何将依赖项注入到handle方法中,您可以使用容器的bindMethod方法。bindMethod方法接受一个回调,该回调接收作业和容器。在回调中,您可以自由地以任何方式调用handle方法。通常,您应该在App\Providers\AppServiceProvider 服务提供者boot方法中调用此方法:

php
use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Foundation\Application;

$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
    return $job->handle($app->make(AudioProcessor::class));
});
exclamation

二进制数据,例如原始图像内容,应在传递给排队作业之前通过base64_encode函数进行编码。否则,作业在放入队列时可能无法正确序列化为JSON。

排队关系

由于所有加载的Eloquent模型关系在作业排队时也会被序列化,因此序列化的作业字符串有时可能会变得相当大。此外,当作业被反序列化并且模型关系从数据库中重新获取时,它们将被完全检索。任何在作业排队过程中应用于模型的先前关系约束在作业反序列化时将不再应用。因此,如果您希望处理给定关系的子集,则应在排队作业中重新约束该关系。

或者,为了防止关系被序列化,您可以在设置属性值时调用模型上的withoutRelations方法。此方法将返回一个没有加载关系的模型实例:

php
/**
 * 创建一个新的作业实例。
 */
public function __construct(
    Podcast $podcast,
) {
    $this->podcast = $podcast->withoutRelations();
}

如果您使用PHP构造函数属性提升并希望指示Eloquent模型不应序列化其关系,则可以使用WithoutRelations属性:

php
use Illuminate\Queue\Attributes\WithoutRelations;

/**
 * 创建一个新的作业实例。
 */
public function __construct(
    #[WithoutRelations]
    public Podcast $podcast,
) {}

如果作业接收一组或数组的Eloquent模型而不是单个模型,则该集合中的模型在作业反序列化和执行时将不会恢复其关系。这是为了防止处理大量模型的作业消耗过多资源。

唯一作业

exclamation

唯一作业需要支持的缓存驱动程序。目前,memcachedredisdynamodbdatabasefilearray缓存驱动程序支持原子锁。此外,唯一作业约束不适用于批处理中的作业。

有时,您可能希望确保在任何时候队列中只有一个特定作业实例。您可以通过在作业类上实现ShouldBeUnique接口来做到这一点。此接口不要求您在类上定义任何额外的方法:

php
<?php

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    ...
}

在上面的示例中,UpdateSearchIndex作业是唯一的。因此,如果队列中已经存在另一个实例的作业并且尚未完成处理,则该作业将不会被调度。

在某些情况下,您可能希望定义一个特定的“键”,使作业唯一,或者您可能希望指定一个超时,超时后作业不再保持唯一。为此,您可以在作业类上定义uniqueIduniqueFor属性或方法:

php
<?php

use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    /**
     * 产品实例。
     *
     * @var \App\Product
     */
    public $product;

    /**
     * 作业唯一锁将在多少秒后释放。
     *
     * @var int
     */
    public $uniqueFor = 3600;

    /**
     * 获取作业的唯一ID。
     */
    public function uniqueId(): string
    {
        return $this->product->id;
    }
}

在上面的示例中,UpdateSearchIndex作业通过产品ID唯一。因此,任何具有相同产品ID的新调度作业将被忽略,直到现有作业完成处理。此外,如果现有作业在一小时内未处理,则唯一锁将被释放,可以调度具有相同唯一键的另一个作业。

exclamation

如果您的应用程序从多个Web服务器或容器调度作业,则应确保所有服务器都与同一个中央缓存服务器通信,以便Laravel能够准确确定作业是否唯一。

保持作业唯一直到处理开始

默认情况下,唯一作业在作业完成处理或失败所有重试尝试后“解锁”。但是,在某些情况下,您可能希望作业在处理之前立即解锁。为此,您的作业应实现ShouldBeUniqueUntilProcessing合同,而不是ShouldBeUnique合同:

php
<?php

use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
    // ...
}

唯一作业锁

在后台,当调度ShouldBeUnique作业时,Laravel会尝试使用uniqueId键获取。如果未获取锁,则不会调度作业。此锁在作业完成处理或失败所有重试尝试时释放。默认情况下,Laravel将使用默认缓存驱动程序来获取此锁。但是,如果您希望使用其他驱动程序来获取锁,则可以定义一个uniqueVia方法,返回应使用的缓存驱动程序:

php
use Illuminate\Contracts\Cache\Repository;
use Illuminate\Support\Facades\Cache;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    ...

    /**
     * 获取唯一作业锁的缓存驱动程序。
     */
    public function uniqueVia(): Repository
    {
        return Cache::driver('redis');
    }
}
lightbulb

如果您只需要限制作业的并发处理,请使用WithoutOverlapping作业中间件。

加密作业

Laravel允许您通过加密确保作业数据的隐私和完整性。要开始,只需将ShouldBeEncrypted接口添加到作业类中。添加此接口后,Laravel将在将作业推送到队列之前自动加密您的作业:

php
<?php

use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;

class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted
{
    // ...
}

作业中间件

作业中间件允许您在排队作业的执行周围包装自定义逻辑,从而减少作业本身的样板代码。例如,考虑以下handle方法,它利用Laravel的Redis速率限制功能,允许每五秒处理一个作业:

php
use Illuminate\Support\Facades\Redis;

/**
 * 执行作业。
 */
public function handle(): void
{
    Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
        info('获取锁...');

        // 处理作业...
    }, function () {
        // 无法获取锁...

        return $this->release(5);
    });
}

虽然这段代码是有效的,但handle方法的实现变得嘈杂,因为它充满了Redis速率限制逻辑。此外,必须为我们希望速率限制的任何其他作业重复此速率限制逻辑。

我们可以定义一个处理速率限制的作业中间件,而不是在处理方法中进行速率限制。Laravel没有默认的作业中间件位置,因此您可以将作业中间件放置在应用程序中的任何位置。在此示例中,我们将中间件放在app/Jobs/Middleware目录中:

php
<?php

namespace App\Jobs\Middleware;

use Closure;
use Illuminate\Support\Facades\Redis;

class RateLimited
{
    /**
     * 处理排队作业。
     *
     * @param  \Closure(object): void  $next
     */
    public function handle(object $job, Closure $next): void
    {
        Redis::throttle('key')
                ->block(0)->allow(1)->every(5)
                ->then(function () use ($job, $next) {
                    // 获取锁...

                    $next($job);
                }, function () use ($job) {
                    // 无法获取锁...

                    $job->release(5);
                });
    }
}

正如您所看到的,像路由中间件一样,作业中间件接收正在处理的作业和一个回调,该回调应被调用以继续处理作业。

创建作业中间件后,可以通过从作业的middleware方法返回它们来将其附加到作业。此方法在通过make:job Artisan命令生成的作业上不存在,因此您需要手动将其添加到作业类中:

php
use App\Jobs\Middleware\RateLimited;

/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new RateLimited];
}
lightbulb

作业中间件也可以分配给可排队的事件监听器、邮件和通知。

速率限制

尽管我们刚刚演示了如何编写自己的速率限制作业中间件,但Laravel实际上包括一个速率限制中间件,您可以利用它来对作业进行速率限制。像路由速率限制器一样,作业速率限制器使用RateLimiter门面的方法for定义。

例如,您可能希望允许用户每小时备份一次数据,而对高级客户则没有此限制。为此,您可以在AppServiceProviderboot方法中定义一个RateLimiter

php
use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;

/**
 * 启动任何应用程序服务。
 */
public function boot(): void
{
    RateLimiter::for('backups', function (object $job) {
        return $job->user->vipCustomer()
                    ? Limit::none()
                    : Limit::perHour(1)->by($job->user->id);
    });
}

在上面的示例中,我们定义了一个每小时的速率限制;但是,您可以轻松地使用perMinute方法定义基于分钟的速率限制。此外,您可以将任何值传递给速率限制的by方法;但是,这个值通常用于按客户分段速率限制:

php
return Limit::perMinute(50)->by($job->user->id);

一旦定义了速率限制,您可以使用Illuminate\Queue\Middleware\RateLimited中间件将速率限制器附加到作业。每当作业超过速率限制时,此中间件将根据速率限制持续时间将作业释放回队列。

php
use Illuminate\Queue\Middleware\RateLimited;

/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new RateLimited('backups')];
}

将速率限制作业释放回队列仍将增加作业的总attempts数量。您可能希望相应地调整作业类上的triesmaxExceptions属性。或者,您可能希望使用retryUntil方法定义作业不再尝试的时间。

如果您不希望作业在速率限制时被重试,您可以使用dontRelease方法:

php
/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new RateLimited('backups'))->dontRelease()];
}
lightbulb

如果您使用Redis,您可以使用Illuminate\Queue\Middleware\RateLimitedWithRedis中间件,该中间件经过微调以适应Redis,比基本的速率限制中间件更高效。

防止作业重叠

Laravel包括一个Illuminate\Queue\Middleware\WithoutOverlapping中间件,允许您根据任意键防止作业重叠。当排队作业正在修改一个资源时,这可能很有用,该资源应该一次只由一个作业修改。

例如,假设您有一个排队作业,更新用户的信用评分,您希望防止对同一用户ID的信用评分更新作业重叠。为此,您可以从作业的middleware方法返回WithoutOverlapping中间件:

php
use Illuminate\Queue\Middleware\WithoutOverlapping;

/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new WithoutOverlapping($this->user->id)];
}

任何相同类型的重叠作业将被释放回队列。您还可以指定必须经过的秒数,然后释放的作业将再次尝试:

php
/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
}

如果您希望立即删除任何重叠作业,以便它们不会被重试,您可以使用dontRelease方法:

php
/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new WithoutOverlapping($this->order->id))->dontRelease()];
}

WithoutOverlapping中间件由Laravel的原子锁功能提供支持。有时,您的作业可能会以意外的方式失败或超时,以至于锁未被释放。因此,您可以使用expireAfter方法显式定义锁的过期时间。例如,下面的示例将指示Laravel在作业开始处理后3分钟释放WithoutOverlapping锁:

php
/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
}
exclamation

WithoutOverlapping中间件需要支持的缓存驱动程序。目前,memcachedredisdynamodbdatabasefilearray缓存驱动程序支持原子锁。

节流异常

Laravel包括一个Illuminate\Queue\Middleware\ThrottlesExceptions中间件,允许您节流异常。一旦作业抛出给定数量的异常,所有进一步尝试执行作业的操作都将延迟,直到指定的时间间隔过去。此中间件对于与不稳定的第三方服务交互的作业特别有用。

例如,假设一个排队作业与第三方API交互,开始抛出异常。要节流异常,您可以从作业的middleware方法返回ThrottlesExceptions中间件。通常,此中间件应与实现基于时间的尝试的作业配对:

php
use DateTime;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new ThrottlesExceptions(10, 5 * 60)];
}

/**
 * 确定作业应超时的时间。
 */
public function retryUntil(): DateTime
{
    return now()->addMinutes(30);
}

中间件接受的第一个构造函数参数是作业可以抛出的异常数量,第二个构造函数参数是作业在被节流后应再次尝试的秒数。在上面的代码示例中,如果作业抛出10个连续的异常,我们将在5分钟后再次尝试作业,受30分钟的时间限制。

当作业抛出异常但异常阈值尚未达到时,作业通常会立即重试。但是,您可以在将中间件附加到作业时调用backoff方法,指定此类作业应延迟的分钟数:

php
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 5 * 60))->backoff(5)];
}

在内部,此中间件使用Laravel的缓存系统实现速率限制,作业的类名用作缓存“键”。您可以在将中间件附加到作业时通过调用by方法覆盖此键。如果您有多个作业与同一第三方服务交互,并希望它们共享一个公共的节流“桶”,这可能会很有用:

php
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 10 * 60))->by('key')];
}

默认情况下,此中间件将节流每个异常。您可以通过在将中间件附加到作业时调用when方法来修改此行为。只有当提供给when方法的闭包返回true时,异常才会被节流:

php
use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 10 * 60))->when(
        fn (Throwable $throwable) => $throwable instanceof HttpClientException
    )];
}

如果您希望将节流的异常报告给应用程序的异常处理程序,可以在将中间件附加到作业时调用report方法。可选地,您可以向report方法提供一个闭包,只有当给定的闭包返回true时,异常才会被报告:

php
use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 10 * 60))->report(
        fn (Throwable $throwable) => $throwable instanceof HttpClientException
    )];
}
lightbulb

如果您使用Redis,您可以使用Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis中间件,该中间件经过微调以适应Redis,比基本的异常节流中间件更高效。

跳过作业

Skip中间件允许您指定作业应跳过/删除,而无需修改作业的逻辑。Skip::when方法将在给定条件评估为true时删除作业,而Skip::unless方法将在条件评估为false时删除作业:

php
use Illuminate\Queue\Middleware\Skip;

/**
* 获取作业应通过的中间件。
*/
public function middleware(): array
{
    return [
        Skip::when($someCondition),
    ];
}

您还可以将一个Closure传递给whenunless方法,以进行更复杂的条件评估:

php
use Illuminate\Queue\Middleware\Skip;

/**
* 获取作业应通过的中间件。
*/
public function middleware(): array
{
    return [
        Skip::when(function (): bool {
            return $this->shouldSkip();
        }),
    ];
}

调度作业

一旦您编写了作业类,您可以使用作业本身的dispatch方法调度它。传递给dispatch方法的参数将传递给作业的构造函数:

php
<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // ...

        ProcessPodcast::dispatch($podcast);

        return redirect('/podcasts');
    }
}

如果您希望有条件地调度作业,可以使用dispatchIfdispatchUnless方法:

php
ProcessPodcast::dispatchIf($accountActive, $podcast);

ProcessPodcast::dispatchUnless($accountSuspended, $podcast);

在新的Laravel应用程序中,sync驱动程序是默认的队列驱动程序。此驱动程序在当前请求的前台同步执行作业,这在本地开发期间通常很方便。如果您希望实际开始排队作业以进行后台处理,您可以在应用程序的config/queue.php配置文件中指定不同的队列驱动程序。

延迟调度

如果您希望指定作业在调度后不立即可供队列工作者处理,您可以在调度作业时使用delay方法。例如,让我们指定一个作业在调度后10分钟内不可用:

php
<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // ...

        ProcessPodcast::dispatch($podcast)
                    ->delay(now()->addMinutes(10));

        return redirect('/podcasts');
    }
}

在某些情况下,作业可能具有默认延迟配置。如果您需要绕过此延迟并调度作业以进行立即处理,可以使用withoutDelay方法:

php
ProcessPodcast::dispatch($podcast)->withoutDelay();
exclamation

Amazon SQS队列服务的最大延迟时间为15分钟。

在响应发送到浏览器后调度

另外,dispatchAfterResponse方法延迟调度作业,直到HTTP响应发送到用户的浏览器,如果您的Web服务器使用FastCGI。这将允许用户开始使用应用程序,即使排队作业仍在执行。通常,这仅应用于大约一秒的作业,例如发送电子邮件。由于它们在当前HTTP请求中处理,因此以这种方式调度的作业不需要运行队列工作者才能处理:

php
use App\Jobs\SendNotification;

SendNotification::dispatchAfterResponse();

您还可以调度一个闭包,并将afterResponse方法链接到dispatch助手,以在HTTP响应发送到浏览器后执行一个闭包:

php
use App\Mail\WelcomeMessage;
use Illuminate\Support\Facades\Mail;

dispatch(function () {
    Mail::to('taylor@example.com')->send(new WelcomeMessage);
})->afterResponse();

同步调度

如果您希望立即(同步)调度作业,可以使用dispatchSync方法。当使用此方法时,作业将不会被排队,而是在当前进程中立即执行:

php
<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // 创建播客...

        ProcessPodcast::dispatchSync($podcast);

        return redirect('/podcasts');
    }
}

作业与数据库事务

虽然在数据库事务中调度作业是完全可以的,但您应特别注意确保您的作业能够成功执行。当在事务中调度作业时,作业可能会在父事务提交之前被工作者处理。在这种情况下,您在数据库事务中所做的模型或数据库记录的更新可能尚未反映在数据库中。此外,在事务中创建的任何模型或数据库记录可能在数据库中不存在。

幸运的是,Laravel提供了几种方法来解决此问题。首先,您可以在队列连接的配置数组中设置after_commit连接选项:

php
'redis' => [
    'driver' => 'redis',
    // ...
    'after_commit' => true,
],

after_commit选项为true时,您可以在数据库事务中调度作业;但是,Laravel将在实际调度作业之前等待打开的父数据库事务提交。当然,如果当前没有打开的数据库事务,则作业将立即调度。

如果由于在事务中发生的异常导致事务回滚,则在该事务中调度的作业将被丢弃。

lightbulb

after_commit配置选项设置为true还将导致所有排队的事件监听器、邮件、通知和广播事件在所有打开的数据库事务提交后调度。

以行内方式指定提交调度行为

如果您没有将after_commit队列连接配置选项设置为true,您仍然可以指示特定作业在所有打开的数据库事务提交后调度。为此,您可以在调度操作上链接afterCommit方法:

php
use App\Jobs\ProcessPodcast;

ProcessPodcast::dispatch($podcast)->afterCommit();

同样,如果将after_commit配置选项设置为true,您可以指示特定作业立即调度,而无需等待任何打开的数据库事务提交:

php
ProcessPodcast::dispatch($podcast)->beforeCommit();

作业链

作业链允许您指定一系列排队作业,这些作业应在主作业成功执行后按顺序运行。如果链中的一个作业失败,则其余作业将不会运行。要执行排队作业链,您可以使用Bus门面提供的chain方法。Laravel的命令总线是排队作业调度构建的底层组件:

php
use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->dispatch();

除了链式作业类实例外,您还可以链式闭包:

php
Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    function () {
        Podcast::update(/* ... */);
    },
])->dispatch();
exclamation

在作业中使用$this->delete()方法删除作业不会阻止链式作业的处理。链将仅在链中的作业失败时停止执行。

链连接和队列

如果您希望指定应为链式作业使用的连接和队列,可以使用onConnectiononQueue方法。这些方法指定应使用的队列连接和队列名称,除非排队作业显式分配了不同的连接/队列:

php
Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();

向链中添加作业

偶尔,您可能需要从链中的另一个作业中预先添加或附加作业到现有作业链。您可以使用prependToChainappendToChain方法来实现:

php
/**
 * 执行作业。
 */
public function handle(): void
{
    // ...

    // 预先添加到当前链中,在当前作业后立即运行作业...
    $this->prependToChain(new TranscribePodcast);

    // 附加到当前链中,在链的末尾运行作业...
    $this->appendToChain(new TranscribePodcast);
}

链失败

在链式作业中,您可以使用catch方法指定一个闭包,该闭包将在链中的作业失败时调用。给定的回调将接收导致作业失败的Throwable实例:

php
use Illuminate\Support\Facades\Bus;
use Throwable;

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->catch(function (Throwable $e) {
    // 链中的作业失败...
})->dispatch();
exclamation

由于链回调是序列化的,并由Laravel队列稍后执行,因此您不应在链回调中使用$this变量。

自定义队列和连接

调度到特定队列

通过将作业推送到不同的队列,您可以“分类”排队作业,甚至优先考虑分配给各种队列的工作者数量。请记住,这不会将作业推送到不同的队列“连接”,如您的队列配置文件中定义的,而只是推送到单个连接中的特定队列。要指定队列,请在调度作业时使用onQueue方法:

php
<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // 创建播客...

        ProcessPodcast::dispatch($podcast)->onQueue('processing');

        return redirect('/podcasts');
    }
}

或者,您可以通过在作业的构造函数中调用onQueue方法来指定作业的队列:

php
<?php

namespace App\Jobs;

 use Illuminate\Contracts\Queue\ShouldQueue;
 use Illuminate\Foundation\Queue\Queueable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * 创建一个新的作业实例。
     */
    public function __construct()
    {
        $this->onQueue('processing');
    }
}

调度到特定连接

如果您的应用程序与多个队列连接交互,您可以使用onConnection方法指定将作业推送到哪个连接:

php
<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // 创建播客...

        ProcessPodcast::dispatch($podcast)->onConnection('sqs');

        return redirect('/podcasts');
    }
}

您可以将onConnectiononQueue方法链接在一起,以指定作业的连接和队列:

php
ProcessPodcast::dispatch($podcast)
              ->onConnection('sqs')
              ->onQueue('processing');

或者,您可以通过在作业的构造函数中调用onConnection方法来指定作业的连接:

php
<?php

namespace App\Jobs;

 use Illuminate\Contracts\Queue\ShouldQueue;
 use Illuminate\Foundation\Queue\Queueable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * 创建一个新的作业实例。
     */
    public function __construct()
    {
        $this->onConnection('sqs');
    }
}

指定最大作业尝试/超时值

最大尝试次数

如果您的排队作业遇到错误,您可能不希望它无限期重试。因此,Laravel提供了多种方法来指定作业可以尝试的次数或持续时间。

指定作业可以尝试的最大次数的一种方法是在Artisan命令行上使用--tries开关。这将适用于工作者处理的所有作业,除非正在处理的作业指定了它可以尝试的次数:

shell
php artisan queue:work --tries=3

如果作业超过其最大尝试次数,它将被视为“失败”作业。有关处理失败作业的更多信息,请参阅失败作业文档。如果在queue:work命令中提供--tries=0,则作业将无限期重试。

您可以通过在作业类上定义最大尝试次数来采取更细粒度的方法。如果在作业上指定了最大尝试次数,则它将优先于命令行上提供的--tries值:

php
<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 作业可以尝试的次数。
     *
     * @var int
     */
    public $tries = 5;
}

如果您需要动态控制特定作业的最大尝试次数,可以在作业上定义一个tries方法:

php
/**
 * 确定作业可以尝试的次数。
 */
public function tries(): int
{
    return 5;
}

基于时间的尝试

作为定义作业在失败之前可以尝试的次数的替代方法,您可以定义作业不再尝试的时间。这允许作业在给定的时间框架内尝试任意次数。要定义作业不再尝试的时间,请在作业类中添加一个retryUntil方法。此方法应返回一个DateTime实例:

php
use DateTime;

/**
 * 确定作业应超时的时间。
 */
public function retryUntil(): DateTime
{
    return now()->addMinutes(10);
}
lightbulb

您还可以在排队事件监听器上定义tries属性或retryUntil方法。

最大异常

有时,您可能希望指定作业可以尝试多次,但如果重试是由给定数量的未处理异常触发,则应失败。为此,您可以在作业类上定义一个maxExceptions属性:

php
<?php

namespace App\Jobs;

use Illuminate\Support\Facades\Redis;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 作业可以尝试的次数。
     *
     * @var int
     */
    public $tries = 25;

    /**
     * 允许的未处理异常的最大数量。
     *
     * @var int
     */
    public $maxExceptions = 3;

    /**
     * 执行作业。
     */
    public function handle(): void
    {
        Redis::throttle('key')->allow(10)->every(60)->then(function () {
            // 获取锁,处理播客...
        }, function () {
            // 无法获取锁...
            return $this->release(10);
        });
    }
}

在此示例中,如果应用程序无法获取Redis锁,则作业将在十秒后释放,并将继续重试最多25次。但是,如果作业抛出三次未处理的异常,则作业将失败。

超时

通常,您大致知道排队作业需要多长时间。因此,Laravel允许您指定“超时”值。默认情况下,超时值为60秒。如果作业处理时间超过指定的超时值,处理作业的工作者将以错误退出。通常,工作者将由配置在服务器上的进程管理器自动重启。

作业可以运行的最大秒数可以在作业类上定义。如果在作业上指定了超时,则它将优先于命令行上指定的任何超时:

php
<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 作业可以运行的最大秒数。
     *
     * @var int
     */
    public $timeout = 120;
}

有时,IO阻塞过程,例如套接字或外发HTTP连接,可能不会遵循您指定的超时。因此,在使用这些功能时,您应始终尝试使用其API指定超时。例如,在使用Guzzle时,您应始终指定连接和请求超时值。

exclamation

必须安装pcntl PHP扩展才能指定作业超时。此外,作业的“超时”值应始终小于其“重试后”值。否则,作业可能会在实际完成执行或超时之前被重新尝试。

超时失败

如果您希望指示作业在超时时应标记为失败,则可以在作业类中定义$failOnTimeout属性:

php
/**
 * 指示作业在超时时应标记为失败。
 *
 * @var bool
 */
public $failOnTimeout = true;

错误处理

如果在处理作业时抛出异常,作业将自动释放回队列,以便可以再次尝试。作业将继续释放,直到它已被应用程序允许的最大次数尝试。最大尝试次数由queue:work Artisan命令使用的--tries开关定义。或者,最大尝试次数可以在作业类本身上定义。有关运行队列工作者的更多信息,请参阅运行队列工作者

手动释放作业

有时,您可能希望手动将作业释放回队列,以便稍后可以再次尝试。您可以通过调用release方法来实现:

php
/**
 * 执行作业。
 */
public function handle(): void
{
    // ...

    $this->release();
}

默认情况下,release方法将立即将作业释放回队列以供处理。但是,您可以通过将整数或日期实例传递给release方法,指示队列在给定的秒数过去之前不将作业提供给处理:

php
$this->release(10);

$this->release(now()->addSeconds(10));

手动标记作业为失败

偶尔,您可能需要手动将作业标记为“失败”。为此,您可以调用fail方法:

php
/**
 * 执行作业。
 */
public function handle(): void
{
    // ...

    $this->fail();
}

如果您希望因为捕获的异常而将作业标记为失败,您可以将异常传递给fail方法。或者,出于方便,您可以传递一个字符串错误消息,系统会为您将其转换为异常:

php
$this->fail($exception);

$this->fail('发生了错误。');
lightbulb

有关失败作业的更多信息,请查看处理失败作业的文档

作业批处理

Laravel的作业批处理功能允许您轻松执行一批作业,然后在批处理作业完成执行时执行某些操作。在开始之前,您应该创建一个数据库迁移,以构建一个表,该表将包含有关作业批处理的元信息,例如其完成百分比。此迁移可以使用make:queue-batches-table Artisan命令生成:

shell
php artisan make:queue-batches-table

php artisan migrate

定义可批处理作业

要定义可批处理作业,您应该像正常一样创建可排队作业;但是,您应该在作业类中添加Illuminate\Bus\Batchable特性。此特性提供对batch方法的访问,该方法可用于检索作业正在执行的当前批处理:

php
<?php

namespace App\Jobs;

use Illuminate\Bus\Batchable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class ImportCsv implements ShouldQueue
{
    use Batchable, Queueable;

    /**
     * 执行作业。
     */
    public function handle(): void
    {
        if ($this->batch()->cancelled()) {
            // 确定批处理是否已取消...

            return;
        }

        // 导入CSV文件的一部分...
    }
}

调度批处理

要调度一批作业,您应该使用Bus门面的batch方法。当然,批处理在与完成回调结合使用时最有用。因此,您可以使用thencatchfinally方法为批处理定义完成回调。每个这些回调在调用时将接收一个Illuminate\Bus\Batch实例。在此示例中,我们将想象我们正在排队一批作业,每个作业处理CSV文件的给定行数:

php
use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;

$batch = Bus::batch([
    new ImportCsv(1, 100),
    new ImportCsv(101, 200),
    new ImportCsv(201, 300),
    new ImportCsv(301, 400),
    new ImportCsv(401, 500),
])->before(function (Batch $batch) {
    // 批处理已创建,但尚未添加任何作业...
})->progress(function (Batch $batch) {
    // 单个作业已成功完成...
})->then(function (Batch $batch) {
    // 所有作业均已成功完成...
})->catch(function (Batch $batch, Throwable $e) {
    // 检测到批处理作业失败...
})->finally(function (Batch $batch) {
    // 批处理已完成执行...
})->dispatch();

return $batch->id;

批处理的ID可以通过$batch->id属性访问,可用于查询Laravel命令总线以获取有关批处理的信息。

exclamation

由于批处理回调是序列化的,并由Laravel队列稍后执行,因此您不应在回调中使用$this变量。此外,由于批处理作业被包装在数据库事务中,因此在作业中不应执行触发隐式提交的数据库语句。

命名批处理

某些工具,例如Laravel Horizon和Laravel Telescope,可能会为批处理提供更友好的调试信息,如果批处理被命名。要为批处理分配任意名称,您可以在定义批处理时调用name方法:

php
$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // 所有作业均已成功完成...
})->name('Import CSV')->dispatch();

批处理连接和队列

如果您希望指定应为批处理作业使用的连接和队列,可以使用onConnectiononQueue方法。所有批处理作业必须在同一连接和队列中执行:

php
$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // 所有作业均已成功完成...
})->onConnection('redis')->onQueue('imports')->dispatch();

链与批处理

您可以通过将链式作业放置在数组中,在批处理中定义一组链式作业。例如,我们可以并行执行两条作业链,并在两条作业链都完成处理时执行回调:

php
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;

Bus::batch([
    [
        new ReleasePodcast(1),
        new SendPodcastReleaseNotification(1),
    ],
    [
        new ReleasePodcast(2),
        new SendPodcastReleaseNotification(2),
    ],
])->then(function (Batch $batch) {
    // ...
})->dispatch();

相反,您可以在中运行作业批处理,通过在链中定义批处理作业。例如,您可以首先运行一批作业以发布多个播客,然后运行一批作业以发送发布通知:

php
use App\Jobs\FlushPodcastCache;
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Support\Facades\Bus;

Bus::chain([
    new FlushPodcastCache,
    Bus::batch([
        new ReleasePodcast(1),
        new ReleasePodcast(2),
    ]),
    Bus::batch([
        new SendPodcastReleaseNotification(1),
        new SendPodcastReleaseNotification(2),
    ]),
])->dispatch();

向批处理中添加作业

有时,您可能希望从批处理作业中添加额外的作业。此模式在您需要批处理数千个作业时可能很有用,这可能在Web请求期间花费太长时间。因此,您可能希望调度一批初始的“加载”作业,以填充批处理更多作业:

php
$batch = Bus::batch([
    new LoadImportBatch,
    new LoadImportBatch,
    new LoadImportBatch,
])->then(function (Batch $batch) {
    // 所有作业均已成功完成...
})->name('Import Contacts')->dispatch();

在此示例中,我们将使用LoadImportBatch作业填充批处理更多作业。为此,我们可以使用可以通过作业的batch方法访问的批处理实例上的add方法:

php
use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;

/**
 * 执行作业。
 */
public function handle(): void
{
    if ($this->batch()->cancelled()) {
        return;
    }

    $this->batch()->add(Collection::times(1000, function () {
        return new ImportContacts;
    }));
}
exclamation

您只能从属于同一批处理的作业中向批处理添加作业。

检查批处理

提供给批处理完成回调的Illuminate\Bus\Batch实例具有多种属性和方法,以帮助您与给定的作业批处理进行交互和检查:

php
// 批处理的UUID...
$batch->id;

// 批处理的名称(如果适用)...
$batch->name;

// 分配给批处理的作业数量...
$batch->totalJobs;

// 尚未处理的作业数量...
$batch->pendingJobs;

// 失败的作业数量...
$batch->failedJobs;

// 到目前为止已处理的作业数量...
$batch->processedJobs();

// 批处理的完成百分比(0-100)...
$batch->progress();

// 指示批处理是否已完成执行...
$batch->finished();

// 取消批处理的执行...
$batch->cancel();

// 指示批处理是否已被取消...
$batch->cancelled();

从路由返回批处理

所有Illuminate\Bus\Batch实例都是JSON可序列化的,这意味着您可以直接从应用程序的某个路由返回它们,以检索包含有关批处理的信息的JSON有效负载,包括其完成进度。这使得在应用程序的UI中显示有关批处理完成进度的信息变得方便。

要通过其ID检索批处理,您可以使用Bus门面的findBatch方法:

php
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;

Route::get('/batch/{batchId}', function (string $batchId) {
    return Bus::findBatch($batchId);
});

取消批处理

有时,您可能需要取消给定批处理的执行。可以通过调用Illuminate\Bus\Batch实例上的cancel方法来实现:

php
/**
 * 执行作业。
 */
public function handle(): void
{
    if ($this->user->exceedsImportLimit()) {
        return $this->batch()->cancel();
    }

    if ($this->batch()->cancelled()) {
        return;
    }
}

正如您在前面的示例中所注意到的,批处理作业通常应在继续执行之前确定其对应的批处理是否已被取消。然而,为了方便,您可以将SkipIfBatchCancelled 中间件分配给作业。顾名思义,此中间件将指示Laravel在批处理被取消时不处理作业:

php
use Illuminate\Queue\Middleware\SkipIfBatchCancelled;

/**
 * 获取作业应通过的中间件。
 */
public function middleware(): array
{
    return [new SkipIfBatchCancelled];
}

批处理失败

当批处理作业失败时,将调用catch回调(如果分配)。此回调仅在批处理中的第一个作业失败时调用。

允许失败

当批处理中的作业失败时,Laravel将自动将批处理标记为“取消”。如果您愿意,您可以禁用此行为,以便作业失败不会自动标记批处理为取消。可以通过在调度批处理时调用allowFailures方法来实现:

php
$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // 所有作业均已成功完成...
})->allowFailures()->dispatch();

重试失败的批处理作业

为了方便,Laravel提供了一个queue:retry-batch Artisan命令,允许您轻松重试给定批处理的所有失败作业。queue:retry-batch命令接受应重试其失败作业的批处理的UUID:

shell
php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5

修剪批处理

如果不修剪,job_batches表可能会迅速积累记录。为此,您应该调度 queue:prune-batches Artisan命令每天运行:

php
use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches')->daily();

默认情况下,所有完成的批处理记录超过24小时将被修剪。您可以在调用命令时使用hours选项来确定保留批处理数据的时间。例如,以下命令将删除所有在48小时之前完成的批处理:

php
use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches --hours=48')->daily();

有时,您的jobs_batches表可能会积累未成功完成的批处理记录,例如未成功重试的批处理。您可以指示queue:prune-batches命令修剪这些未完成的批处理记录,使用unfinished选项:

php
use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();

同样,您的jobs_batches表也可能积累取消的批处理记录。您可以指示queue:prune-batches命令修剪这些取消的批处理记录,使用cancelled选项:

php
use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();

在DynamoDB中存储批处理

Laravel还提供了在DynamoDB中存储批处理元信息的支持,而不是关系数据库。但是,您需要手动创建一个DynamoDB表来存储所有批处理记录。

通常,此表应命名为job_batches,但您应根据应用程序的queue.batching.table配置值命名该表。

DynamoDB批处理表配置

job_batches表应具有一个名为application的字符串主分区键和一个名为id的字符串主排序键。application部分的键将包含您应用程序的名称,该名称由应用程序的name配置值定义。由于应用程序名称是DynamoDB表的键的一部分,因此您可以使用同一表存储多个Laravel应用程序的作业批处理。

此外,如果您希望利用DynamoDB的自动批处理修剪,则可以为您的表定义ttl属性。

DynamoDB配置

接下来,安装AWS SDK,以便您的Laravel应用程序可以与Amazon DynamoDB通信:

shell
composer require aws/aws-sdk-php

然后,将queue.batching.driver配置选项的值设置为dynamodb。此外,您应该在批处理配置数组中定义keysecretregion配置选项。这些选项将用于与AWS进行身份验证。当使用dynamodb驱动程序时,queue.batching.database配置选项是不必要的:

php
'batching' => [
    'driver' => env('QUEUE_BATCHING_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'job_batches',
],

在DynamoDB中修剪批处理

当利用DynamoDB存储作业批处理信息时,通常用于修剪存储在关系数据库中的批处理的命令将不起作用。相反,您可以利用DynamoDB的本地TTL功能自动删除旧批处理记录。

如果您为DynamoDB表定义了ttl属性,则可以定义配置参数,以指示Laravel如何修剪批处理记录。queue.batching.ttl_attribute配置值定义了保存TTL的属性的名称,而queue.batching.ttl配置值定义了批处理记录在最后一次更新后可以被删除的秒数:

php
'batching' => [
    'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'job_batches',
    'ttl_attribute' => 'ttl',
    'ttl' => 60 * 60 * 24 * 7, // 7天...
],

队列闭包

除了调度作业类到队列外,您还可以调度闭包。这对于需要在当前请求周期之外执行的快速、简单的任务非常有用。调度闭包到队列时,闭包的代码内容会被加密签名,以便在传输过程中无法修改:

php
$podcast = App\Podcast::find(1);

dispatch(function () use ($podcast) {
    $podcast->publish();
});

使用catch方法,您可以提供一个闭包,如果排队的闭包未能在耗尽所有队列的配置重试尝试后成功完成,则应执行该闭包:

php
use Throwable;

dispatch(function () use ($podcast) {
    $podcast->publish();
})->catch(function (Throwable $e) {
    // 此作业已失败...
});
exclamation

由于catch回调是序列化的,并由Laravel队列稍后执行,因此您不应在catch回调中使用$this变量。

运行队列工作者

queue:work 命令

Laravel 包含一个 Artisan 命令,可以启动一个队列工作者并处理新推送到队列的作业。您可以使用 queue:work Artisan 命令运行工作者。请注意,一旦 queue:work 命令启动,它将继续运行,直到手动停止或关闭终端:

shell
php artisan queue:work
lightbulb

为了使 queue:work 进程在后台永久运行,您应该使用进程监控工具,例如 Supervisor,以确保队列工作者不会停止运行。

如果您希望在调用 queue:work 命令时将处理的作业 ID 包含在命令的输出中,可以包含 -v 标志:

shell
php artisan queue:work -v

请记住,队列工作者是长时间运行的进程,并将启动的应用程序状态存储在内存中。因此,它们在启动后不会注意到代码库中的更改。因此,在部署过程中,请确保 重启您的队列工作者。此外,请记住,您应用程序创建或修改的任何静态状态在作业之间不会自动重置。

另外,您可以运行 queue:listen 命令。当使用 queue:listen 命令时,您不必在想要重新加载更新的代码或重置应用程序状态时手动重启工作者;然而,此命令的效率显著低于 queue:work 命令:

shell
php artisan queue:listen

运行多个队列工作者

要将多个工作者分配给一个队列并并发处理作业,您只需启动多个 queue:work 进程。这可以在本地通过终端中的多个标签页完成,或在生产中使用您的进程管理器的配置设置完成。使用 Supervisor 时,您可以使用 numprocs 配置值。

指定连接和队列

您还可以指定工作者应使用哪个队列连接。传递给 work 命令的连接名称应对应于您 config/queue.php 配置文件中定义的连接之一:

shell
php artisan queue:work redis

默认情况下,queue:work 命令仅处理给定连接的默认队列中的作业。然而,您可以通过仅处理特定连接的特定队列进一步自定义队列工作者。例如,如果您所有的电子邮件都在 redis 队列连接的 emails 队列中处理,您可以发出以下命令以启动仅处理该队列的工作者:

shell
php artisan queue:work redis --queue=emails

处理指定数量的作业

--once 选项可用于指示工作者仅处理队列中的单个作业:

shell
php artisan queue:work --once

--max-jobs 选项可用于指示工作者处理给定数量的作业,然后退出。此选项在与 Supervisor 结合使用时可能很有用,以便在处理给定数量的作业后自动重启工作者,释放它们可能积累的任何内存:

shell
php artisan queue:work --max-jobs=1000

处理所有排队作业然后退出

--stop-when-empty 选项可用于指示工作者处理所有作业,然后优雅地退出。此选项在处理 Laravel 队列时,如果您希望在队列为空后关闭 Docker 容器,则可能很有用:

shell
php artisan queue:work --stop-when-empty

处理给定数量的秒数的作业

--max-time 选项可用于指示工作者处理作业的给定秒数,然后退出。此选项在与 Supervisor 结合使用时可能很有用,以便在处理作业给定时间后自动重启工作者,释放它们可能积累的任何内存:

shell
# 处理作业一小时然后退出...
php artisan queue:work --max-time=3600

工作者睡眠持续时间

当队列中有作业可用时,工作者将继续处理作业,而不在作业之间有延迟。然而,sleep 选项决定了如果没有可用作业,工作者将“睡眠”多少秒。当然,在睡眠期间,工作者不会处理任何新作业:

shell
php artisan queue:work --sleep=3

维护模式和队列

当您的应用程序处于 维护模式 时,将不会处理任何排队作业。一旦应用程序退出维护模式,作业将继续正常处理。

要强制您的队列工作者即使在启用维护模式时也处理作业,您可以使用 --force 选项:

shell
php artisan queue:work --force

资源考虑

守护进程队列工作者在处理每个作业之前不会“重启”框架。因此,您应该在每个作业完成后释放任何重资源。例如,如果您使用 GD 库进行图像处理,则应在处理完图像后使用 imagedestroy 释放内存。

队列优先级

有时您可能希望优先处理队列。例如,在您的 config/queue.php 配置文件中,您可以将 redis 连接的默认 queue 设置为 low。但是,偶尔您可能希望将作业推送到 high 优先级队列,如下所示:

php
dispatch((new Job)->onQueue('high'));

要启动一个工作者,确保所有 high 队列作业在继续处理任何 low 队列作业之前被处理,请将逗号分隔的队列名称传递给 work 命令:

shell
php artisan queue:work --queue=high,low

队列工作者和部署

由于队列工作者是长时间运行的进程,它们在未重启的情况下不会注意到代码的更改。因此,使用队列工作者部署应用程序的最简单方法是在部署过程中重启工作者。您可以通过发出 queue:restart 命令优雅地重启所有工作者:

shell
php artisan queue:restart

此命令将指示所有队列工作者在完成处理当前作业后优雅地退出,以便不会丢失任何现有作业。由于队列工作者将在执行 queue:restart 命令时退出,因此您应该运行一个进程管理器,例如 Supervisor,以自动重启队列工作者。

lightbulb

队列使用 cache 存储重启信号,因此在使用此功能之前,您应该验证您的应用程序是否正确配置了缓存驱动程序。

作业过期和超时

作业过期

在您的 config/queue.php 配置文件中,每个队列连接定义了一个 retry_after 选项。此选项指定队列连接在重试正在处理的作业之前应等待多少秒。例如,如果 retry_after 的值设置为 90,则如果作业在 90 秒内未被释放或删除,它将被释放回队列。通常,您应该将 retry_after 值设置为作业完成处理所需的最大秒数。

exclamation

唯一不包含 retry_after 值的队列连接是 Amazon SQS。SQS 将根据 默认可见性超时 重试作业,该超时在 AWS 控制台中管理。

工作者超时

queue:work Artisan 命令公开了一个 --timeout 选项。默认情况下,--timeout 值为 60 秒。如果作业处理时间超过超时值指定的秒数,处理该作业的工作者将以错误退出。通常,工作者将由 配置在您的服务器上的进程管理器 自动重启:

shell
php artisan queue:work --timeout=60

retry_after 配置选项和 --timeout CLI 选项是不同的,但协同工作以确保作业不会丢失,并且作业仅成功处理一次。

exclamation

--timeout 值应始终比您的 retry_after 配置值短几秒。这将确保处理冻结作业的工作者在作业被重试之前始终被终止。如果您的 --timeout 选项长于 retry_after 配置值,您的作业可能会被处理两次。

Supervisor 配置

在生产中,您需要一种方法来保持 queue:work 进程运行。queue:work 进程可能因多种原因停止运行,例如超出工作者超时或执行 queue:restart 命令。

因此,您需要配置一个进程监控器,可以检测到您的 queue:work 进程退出并自动重启它们。此外,进程监控器可以让您指定希望并发运行的 queue:work 进程数量。Supervisor 是一个常用于 Linux 环境的进程监控器,我们将在以下文档中讨论如何配置它。

安装 Supervisor

Supervisor 是一个用于 Linux 操作系统的进程监控器,如果 queue:work 进程失败,它将自动重启。要在 Ubuntu 上安装 Supervisor,您可以使用以下命令:

shell
sudo apt-get install supervisor
lightbulb

如果自己配置和管理 Supervisor 听起来令人不知所措,请考虑使用 Laravel Forge,它将自动为您的生产 Laravel 项目安装和配置 Supervisor。

配置 Supervisor

Supervisor 配置文件通常存储在 /etc/supervisor/conf.d 目录中。在此目录中,您可以创建任意数量的配置文件,指示 Supervisor 如何监控您的进程。例如,让我们创建一个 laravel-worker.conf 文件,启动并监控 queue:work 进程:

ini
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600

在此示例中,numprocs 指令将指示 Supervisor 运行八个 queue:work 进程并监控所有这些进程,如果它们失败则自动重启。您应该更改配置的 command 指令,以反映您所需的队列连接和工作者选项。

exclamation

您应该确保 stopwaitsecs 的值大于您最长运行作业所消耗的秒数。否则,Supervisor 可能会在作业完成之前杀死作业。

启动 Supervisor

创建配置文件后,您可以使用以下命令更新 Supervisor 配置并启动进程:

shell
sudo supervisorctl reread

sudo supervisorctl update

sudo supervisorctl start "laravel-worker:*"

有关 Supervisor 的更多信息,请参阅 Supervisor 文档

处理失败的作业

有时,您的排队作业会失败。别担心,事情并不总是按计划进行!Laravel 提供了一种方便的方法来 指定作业应尝试的最大次数。在异步作业超过此尝试次数后,它将被插入到 failed_jobs 数据库表中。同步调度的作业 失败时不会存储在此表中,其异常会立即由应用程序处理。

创建 failed_jobs 表的迁移通常在新的 Laravel 应用程序中已经存在。但是,如果您的应用程序没有此表的迁移,您可以使用 make:queue-failed-table 命令创建迁移:

shell
php artisan make:queue-failed-table

php artisan migrate

在运行 队列工作者 进程时,您可以使用 --tries 开关指定作业应尝试的最大次数。 如果您未指定 --tries 选项的值,则作业将仅尝试一次,或尝试作业类的 $tries 属性指定的次数:

shell
php artisan queue:work redis --tries=3

使用 --backoff 选项,您可以指定 Laravel 在遇到异常时重试作业之前应等待多少秒。默认情况下,作业会立即释放回队列,以便可以再次尝试:

shell
php artisan queue:work redis --tries=3 --backoff=3

如果您希望在每个作业的基础上配置 Laravel 在遇到异常时重试作业之前应等待多少秒,您可以在作业类中定义 backoff 属性:

php
/**
 * 重试作业之前等待的秒数。
 *
 * @var int
 */
public $backoff = 3;

如果您需要更复杂的逻辑来确定作业的回退时间,您可以在作业类中定义 backoff 方法:

php
/**
* 计算重试作业之前等待的秒数。
*/
public function backoff(): int
{
    return 3;
}

您可以通过从 backoff 方法返回回退值数组轻松配置“指数”回退。在此示例中,第一次重试的延迟为 1 秒,第二次重试为 5 秒,第三次重试为 10 秒,之后的每次重试为 10 秒,如果还有更多尝试:

php
/**
* 计算重试作业之前等待的秒数。
*
* @return array<int, int>
*/
public function backoff(): array
{
    return [1, 5, 10];
}

清理失败的作业

当特定作业失败时,您可能希望向用户发送警报或撤销作业部分完成的任何操作。为此,您可以在作业类中定义 failed 方法。导致作业失败的 Throwable 实例将传递给 failed 方法:

php
<?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Throwable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * 创建一个新的作业实例。
     */
    public function __construct(
        public Podcast $podcast,
    ) {}

    /**
     * 执行作业。
     */
    public function handle(AudioProcessor $processor): void
    {
        // 处理上传的播客...
    }

    /**
     * 处理作业失败。
     */
    public function failed(?Throwable $exception): void
    {
        // 向用户发送失败通知等...
    }
}
exclamation

在调用 failed 方法之前,会实例化作业的新实例;因此,在 handle 方法中可能发生的任何类属性修改将丢失。

重试失败的作业

要查看已插入到 failed_jobs 数据库表中的所有失败作业,您可以使用 queue:failed Artisan 命令:

shell
php artisan queue:failed

queue:failed 命令将列出作业 ID、连接、队列、失败时间和有关作业的其他信息。作业 ID 可用于重试失败的作业。例如,要重试 ID 为 ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 的失败作业,请发出以下命令:

shell
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece

如果需要,您可以将多个 ID 传递给命令:

shell
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d

您还可以重试特定队列的所有失败作业:

shell
php artisan queue:retry --queue=name

要重试所有失败的作业,请执行 queue:retry 命令并将 all 作为 ID 传递:

shell
php artisan queue:retry all

如果您希望删除失败的作业,可以使用 queue:forget 命令:

shell
php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d
lightbulb

使用 Horizon 时,您应该使用 horizon:forget 命令删除失败的作业,而不是 queue:forget 命令。

要从 failed_jobs 表中删除所有失败的作业,您可以使用 queue:flush 命令:

shell
php artisan queue:flush

忽略缺失的模型

在作业中注入 Eloquent 模型时,模型会在放入队列之前自动序列化,并在作业被处理时从数据库中重新检索。然而,如果模型在作业等待被工作者处理时已被删除,则您的作业可能会因 ModelNotFoundException 而失败。

为了方便起见,您可以通过将作业的 deleteWhenMissingModels 属性设置为 true,自动删除缺失模型的作业。当此属性设置为 true 时,Laravel 将安静地丢弃作业,而不会引发异常:

php
/**
 * 如果其模型不再存在,则删除作业。
 *
 * @var bool
 */
public $deleteWhenMissingModels = true;

修剪失败的作业

您可以通过调用 queue:prune-failed Artisan 命令修剪应用程序的 failed_jobs 表中的记录:

shell
php artisan queue:prune-failed

默认情况下,所有超过 24 小时的失败作业记录将被修剪。如果您向命令提供 --hours 选项,则仅保留在过去 N 小时内插入的失败作业记录。例如,以下命令将删除所有超过 48 小时的失败作业记录:

shell
php artisan queue:prune-failed --hours=48

在 DynamoDB 中存储失败的作业

Laravel 还提供了支持将失败作业记录存储在 DynamoDB 中,而不是关系数据库表。然而,您必须手动创建一个 DynamoDB 表来存储所有失败的作业记录。通常,此表应命名为 failed_jobs,但您应根据应用程序的 queue.failed.table 配置值命名表。

failed_jobs 表应具有一个名为 application 的字符串主分区键和一个名为 uuid 的字符串主排序键。application 部分的键将包含您应用程序的名称,该名称由应用程序的 app 配置文件中的 name 配置值定义。由于应用程序名称是 DynamoDB 表键的一部分,您可以使用同一表存储多个 Laravel 应用程序的失败作业。

此外,请确保安装 AWS SDK,以便您的 Laravel 应用程序可以与 Amazon DynamoDB 通信:

shell
composer require aws/aws-sdk-php

接下来,将 queue.failed.driver 配置选项的值设置为 dynamodb。此外,您还应在失败作业配置数组中定义 keysecretregion 配置选项。这些选项将用于与 AWS 进行身份验证。使用 dynamodb 驱动程序时,queue.failed.database 配置选项是不必要的:

php
'failed' => [
    'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'failed_jobs',
],

禁用失败作业存储

您可以指示 Laravel 在不存储的情况下丢弃失败的作业,将 queue.failed.driver 配置选项的值设置为 null。通常,可以通过 QUEUE_FAILED_DRIVER 环境变量实现:

ini
QUEUE_FAILED_DRIVER=null

失败作业事件

如果您希望注册一个事件监听器,在作业失败时调用,您可以使用 Queue facade 的 failing 方法。例如,我们可以在 Laravel 附带的 AppServiceProviderboot 方法中附加一个闭包到此事件:

php
<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;

class AppServiceProvider extends ServiceProvider
{
    /**
     * 注册任何应用程序服务。
     */
    public function register(): void
    {
        // ...
    }

    /**
     * 启动任何应用程序服务。
     */
    public function boot(): void
    {
        Queue::failing(function (JobFailed $event) {
            // $event->connectionName
            // $event->job
            // $event->exception
        });
    }
}

从队列中清除作业

lightbulb

使用 Horizon 时,您应该使用 horizon:clear 命令从队列中清除作业,而不是 queue:clear 命令。

如果您希望删除默认连接的默认队列中的所有作业,可以使用 queue:clear Artisan 命令:

shell
php artisan queue:clear

您还可以提供 connection 参数和 queue 选项,以从特定连接和队列中删除作业:

shell
php artisan queue:clear redis --queue=emails
exclamation

从队列中清除作业仅适用于 SQS、Redis 和数据库队列驱动程序。此外,SQS 消息删除过程最多需要 60 秒,因此在您清除队列后 60 秒内发送到 SQS 队列的作业也可能被删除。

监控您的队列

如果您的队列接收到突然涌入的作业,它可能会变得不堪重负,导致作业完成的等待时间很长。如果您愿意,Laravel 可以在您的队列作业数量超过指定阈值时提醒您。

要开始,您应该安排 queue:monitor 命令 每分钟运行。该命令接受您希望监控的队列名称以及您希望的作业数量阈值:

shell
php artisan queue:monitor redis:default,redis:deployments --max=100

仅调度此命令不足以触发通知,提醒您队列的过载状态。当命令遇到作业数量超过阈值的队列时,将调度一个 Illuminate\Queue\Events\QueueBusy 事件。您可以在应用程序的 AppServiceProvider 中监听此事件,以便向您或您的开发团队发送通知:

php
use App\Notifications\QueueHasLongWaitTime;
use Illuminate\Queue\Events\QueueBusy;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Notification;

/**
 * 启动任何应用程序服务。
 */
public function boot(): void
{
    Event::listen(function (QueueBusy $event) {
        Notification::route('mail', 'dev@example.com')
                ->notify(new QueueHasLongWaitTime(
                    $event->connection,
                    $event->queue,
                    $event->size
                ));
    });
}

测试

在测试调度作业的代码时,您可能希望指示 Laravel 不实际执行作业本身,因为作业的代码可以直接测试,并且与调度它的代码分开。当然,要测试作业本身,您可以实例化作业实例并在测试中直接调用 handle 方法。

您可以使用 Queue facade 的 fake 方法来防止排队的作业实际被推送到队列。在调用 Queue facade 的 fake 方法后,您可以断言应用程序尝试将作业推送到队列:

php
<?php

use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;

test('orders can be shipped', function () {
    Queue::fake();

    // 执行订单发货...

    // 断言没有作业被推送...
    Queue::assertNothingPushed();

    // 断言作业被推送到给定队列...
    Queue::assertPushedOn('queue-name', ShipOrder::class);

    // 断言作业被推送两次...
    Queue::assertPushed(ShipOrder::class, 2);

    // 断言作业未被推送...
    Queue::assertNotPushed(AnotherJob::class);

    // 断言闭包被推送到队列...
    Queue::assertClosurePushed();

    // 断言推送的作业总数...
    Queue::assertCount(3);
});
php
<?php

namespace Tests\Feature;

use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
use Tests\TestCase;

class ExampleTest extends TestCase
{
    public function test_orders_can_be_shipped(): void
    {
        Queue::fake();

        // 执行订单发货...

        // 断言没有作业被推送...
        Queue::assertNothingPushed();

        // 断言作业被推送到给定队列...
        Queue::assertPushedOn('queue-name', ShipOrder::class);

        // 断言作业被推送两次...
        Queue::assertPushed(ShipOrder::class, 2);

        // 断言作业未被推送...
        Queue::assertNotPushed(AnotherJob::class);

        // 断言闭包被推送到队列...
        Queue::assertClosurePushed();

        // 断言推送的作业总数...
        Queue::assertCount(3);
    }
}

您可以将闭包传递给 assertPushedassertNotPushed 方法,以断言推送的作业通过给定的“真值测试”。如果至少有一个作业被推送并通过给定的真值测试,则断言将成功:

php
Queue::assertPushed(function (ShipOrder $job) use ($order) {
    return $job->order->id === $order->id;
});

假装一部分作业

如果您只需要假装特定作业,同时允许其他作业正常执行,您可以将应假装的作业类名传递给 fake 方法:

php
test('orders can be shipped', function () {
    Queue::fake([
        ShipOrder::class,
    ]);

    // 执行订单发货...

    // 断言作业被推送两次...
    Queue::assertPushed(ShipOrder::class, 2);
});
php
public function test_orders_can_be_shipped(): void
{
    Queue::fake([
        ShipOrder::class,
    ]);

    // 执行订单发货...

    // 断言作业被推送两次...
    Queue::assertPushed(ShipOrder::class, 2);
}

您可以使用 except 方法假装所有作业,除了指定的一组作业:

php
Queue::fake()->except([
    ShipOrder::class,
]);

测试作业链

要测试作业链,您需要利用 Bus facade 的假装功能。Bus facade 的 assertChained 方法可用于断言 作业链 被调度。assertChained 方法接受一个作业链的数组作为第一个参数:

php
use App\Jobs\RecordShipment;
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Support\Facades\Bus;

Bus::fake();

// ...

Bus::assertChained([
    ShipOrder::class,
    RecordShipment::class,
    UpdateInventory::class
]);

如上例所示,作业链的数组可以是作业类名的数组。但是,您也可以提供实际作业实例的数组。在这样做时,Laravel 将确保作业实例属于同一类,并且具有与您应用程序调度的作业相同的属性值:

php
Bus::assertChained([
    new ShipOrder,
    new RecordShipment,
    new UpdateInventory,
]);

您可以使用 assertDispatchedWithoutChain 方法断言作业在没有作业链的情况下被推送:

php
Bus::assertDispatchedWithoutChain(ShipOrder::class);

测试链修改

如果链中的作业 在现有链中添加或附加作业,您可以使用作业的 assertHasChain 方法断言作业具有预期的剩余作业链:

php
$job = new ProcessPodcast;

$job->handle();

$job->assertHasChain([
    new TranscribePodcast,
    new OptimizePodcast,
    new ReleasePodcast,
]);

assertDoesntHaveChain 方法可用于断言作业的剩余链为空:

php
$job->assertDoesntHaveChain();

测试链批处理

如果您的作业链 包含一批作业,您可以通过在链断言中插入 Bus::chainedBatch 定义来断言链批处理符合您的预期:

php
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;

Bus::assertChained([
    new ShipOrder,
    Bus::chainedBatch(function (PendingBatch $batch) {
        return $batch->jobs->count() === 3;
    }),
    new UpdateInventory,
]);

测试作业批处理

Bus facade 的 assertBatched 方法可用于断言 作业批处理 被调度。传递给 assertBatched 方法的闭包接收一个 Illuminate\Bus\PendingBatch 实例,可以用来检查批处理中的作业:

php
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;

Bus::fake();

// ...

Bus::assertBatched(function (PendingBatch $batch) {
    return $batch->name == 'import-csv' &&
           $batch->jobs->count() === 10;
});

您可以使用 assertBatchCount 方法断言调度了给定数量的批处理:

php
Bus::assertBatchCount(3);

您可以使用 assertNothingBatched 断言没有调度任何批处理:

php
Bus::assertNothingBatched();

测试作业/批处理交互

此外,您可能偶尔需要测试单个作业与其底层批处理的交互。例如,您可能需要测试作业是否取消了其批处理的进一步处理。为此,您需要通过 withFakeBatch 方法将假批处理分配给作业。withFakeBatch 方法返回一个包含作业实例和假批处理的元组:

php
[$job, $batch] = (new ShipOrder)->withFakeBatch();

$job->handle();

$this->assertTrue($batch->cancelled());
$this->assertEmpty($batch->added);

测试作业/队列交互

有时,您可能需要测试排队作业 将自己释放回队列。或者,您可能需要测试作业是否删除了自己。您可以通过实例化作业并调用 withFakeQueueInteractions 方法来测试这些队列交互。

一旦作业的队列交互被假装,您可以在作业上调用 handle 方法。调用作业后,可以使用 assertReleasedassertDeletedassertNotDeletedassertFailedassertFailedWithassertNotFailed 方法对作业的队列交互进行断言:

php
use App\Exceptions\CorruptedAudioException;
use App\Jobs\ProcessPodcast;

$job = (new ProcessPodcast)->withFakeQueueInteractions();

$job->handle();

$job->assertReleased(delay: 30);
$job->assertDeleted();
$job->assertNotDeleted();
$job->assertFailed();
$job->assertFailedWith(CorruptedAudioException::class);
$job->assertNotFailed();

作业事件

使用 Queue facade 上的 beforeafter 方法,您可以指定在处理排队作业之前或之后执行的回调。这些回调是执行额外日志记录或为仪表板增加统计数据的绝佳机会。通常,您应该在 服务提供者boot 方法中调用这些方法。例如,我们可以在 Laravel 附带的 AppServiceProvider 中:

php
<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;

class AppServiceProvider extends ServiceProvider
{
    /**
     * 注册任何应用程序服务。
     */
    public function register(): void
    {
        // ...
    }

    /**
     * 启动任何应用程序服务。
     */
    public function boot(): void
    {
        Queue::before(function (JobProcessing $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });

        Queue::after(function (JobProcessed $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });
    }
}

使用 looping 方法在 Queue facade 上,您可以指定在工作者尝试从队列获取作业之前执行的回调。例如,您可能会注册一个闭包,以回滚之前失败的作业留下的任何打开事务:

php
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Queue;

Queue::looping(function () {
    while (DB::transactionLevel() > 0) {
        DB::rollBack();
    }
});