laravel
5. 深入探讨
队列

介绍

在构建您的 Web 应用程序时,您可能会有一些任务,例如解析和存储上传的 CSV 文件,这些任务在典型的 Web 请求期间执行时间过长。值得庆幸的是,Laravel 允许您轻松创建可在后台处理的排队任务。通过将耗时的任务移到队列中,您的应用程序可以以极快的速度响应 Web 请求,并为您的客户提供更好的用户体验。

Laravel 队列在各种不同的队列后端(如 Amazon SQS (opens in a new tab)Redis (opens in a new tab) 或甚至关系数据库)上提供了统一的排队 API。

Laravel 的队列配置选项存储在您的应用程序的 config/queue.php 配置文件中。在这个文件中,您将找到框架所包含的每个队列驱动程序的连接配置,包括数据库、Amazon SQS (opens in a new tab)Redis (opens in a new tab)Beanstalkd (opens in a new tab) 驱动程序,以及一个将立即执行任务的同步驱动程序(用于本地开发期间)。还包括一个 null 队列驱动程序,它会丢弃排队的任务。

[!注意]
Laravel 现在提供了 Horizon,这是一个为您的 Redis 驱动的队列提供的漂亮的仪表板和配置系统。查看完整的 Horizon 文档 以获取更多信息。

连接与队列的区别

在开始使用 Laravel 队列之前,理解“连接”和“队列”之间的区别很重要。在您的 config/queue.php 配置文件中,有一个 connections 配置数组。此选项定义了到后端队列服务(如 Amazon SQS、Beanstalk 或 Redis)的连接。然而,任何给定的队列连接都可以有多个“队列”,可以将其视为不同的任务栈或任务堆。

请注意,queue 配置文件中的每个连接配置示例都包含一个 queue 属性。这是当任务被发送到给定连接时将被分发到的默认队列。换句话说,如果您分发一个任务而没有明确定义它应该被分发到哪个队列,该任务将被放置在连接配置的 queue 属性中定义的队列上:

use App\Jobs\ProcessPodcast;

// 此任务被发送到默认连接的默认队列...
ProcessPodcast::dispatch();

// 此任务被发送到默认连接的“emails”队列...
ProcessPodcast::dispatch()->onQueue('emails');

有些应用程序可能不需要将任务推送到多个队列上,而是更愿意只有一个简单的队列。然而,将任务推送到多个队列对于希望对任务的处理进行优先级排序或分段的应用程序特别有用,因为 Laravel 队列工作器允许您按优先级指定它应该处理的队列。例如,如果您将任务推送到 high 队列,您可以运行一个工作器,为它们提供更高的处理优先级:

php artisan queue:work --queue=high,default

驱动注意事项及前提条件

数据库

为了使用 database 队列驱动程序,您需要一个数据库表来保存任务。通常,这包含在 Laravel 的默认 0001_01_01_000002_create_jobs_table.php 数据库迁移 中;但是,如果您的应用程序不包含此迁移,您可以使用 make:queue-table Artisan 命令来创建它:

php artisan make:queue-table
 
php artisan migrate

Redis

为了使用 redis 队列驱动程序,您应该在 config/database.php 配置文件中配置一个 Redis 数据库连接。

[!警告]
redis 队列驱动程序不支持 serializercompression Redis 选项。

Redis 集群

如果您的 Redis 队列连接使用 Redis 集群,则您的队列名称必须包含一个 键哈希标签 (opens in a new tab)。这是为了确保给定队列的所有 Redis 键都被放置在同一个哈希槽中:

'redis' => [
    'driver' => 'redis',
    'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
    'queue' => env('REDIS_QUEUE', '{default}'),
    'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
    'block_for' => null,
    'after_commit' => false,
],

阻塞

当使用 Redis 队列时,您可以使用 block_for 配置选项来指定在驱动程序遍历工作器循环并重新轮询 Redis 数据库之前,驱动程序应该等待任务可用的时间。

根据您的队列负载调整此值可以比不断轮询 Redis 数据库以获取新任务更有效。例如,您可以将值设置为 5,表示驱动程序在等待任务可用时应阻塞 5 秒:

'redis' => [
    'driver' => 'redis',
    'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
    'queue' => env('REDIS_QUEUE', 'default'),
    'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
    'block_for' => 5,
    'after_commit' => false,
],

[!警告]
block_for 设置为 0 将导致队列工作器无限期阻塞,直到有任务可用。这也将阻止诸如 SIGTERM 之类的信号在处理下一个任务之前被处理。

其他驱动程序前提条件

以下是列出的队列驱动程序所需的依赖项。这些依赖项可以通过 Composer 包管理器安装:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~5.0
  • Redis: predis/predis ~2.0 或 phpredis PHP 扩展

创建任务

生成任务类

默认情况下,您的应用程序的所有可排队任务都存储在 app/Jobs 目录中。如果 app/Jobs 目录不存在,当您运行 make:job Artisan 命令时将创建它:

php artisan make:job ProcessPodcast

生成的类将实现 Illuminate\Contracts\Queue\ShouldQueue 接口,向 Laravel 表明该任务应被推送到队列中以异步方式运行。

[!注意]
可以使用 存根发布 自定义任务存根。

类结构

任务类非常简单,通常只包含一个 handle 方法,当队列处理任务时会调用该方法。首先,让我们看一个示例任务类。在这个例子中,我们假设我们管理一个播客发布服务,需要在发布之前处理上传的播客文件:

<?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * 创建一个新的任务实例。
     */
    public function __construct(
        public Podcast $podcast,
    ) {}

    /**
     * 执行任务。
     */
    public function handle(AudioProcessor $processor): void
    {
        // 处理上传的播客...
    }
}

在这个例子中,请注意,我们能够将一个 Eloquent 模型 直接传递到排队任务的构造函数中。由于该任务使用的 Queueable 特征,当任务处理时,Eloquent 模型及其加载的关系将被优雅地序列化和反序列化。

如果您的排队任务在其构造函数中接受一个 Eloquent 模型,则只有该模型的标识符将被序列化到队列中。当任务实际被处理时,队列系统将自动从数据库中重新检索完整的模型实例及其加载的关系。这种模型序列化方法允许将更小的任务负载发送到您的队列驱动程序。

handle 方法的依赖注入

当队列处理任务时,会调用 handle 方法。请注意,我们能够在任务的 handle 方法上进行类型提示依赖项。Laravel 的 服务容器 会自动注入这些依赖项。

如果您希望完全控制容器如何将依赖项注入到 handle 方法中,您可以使用容器的 bindMethod 方法。bindMethod 方法接受一个回调,该回调接收任务和容器。在回调中,您可以自由地以您希望的方式调用 handle 方法。通常,您应该从您的 App\Providers\AppServiceProvider 服务提供者boot 方法中调用此方法:

use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Foundation\Application;

$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
    return $job->handle($app->make(AudioProcessor::class));
});

[!警告]
二进制数据,如原始图像内容,在传递到排队任务之前应通过 base64_encode 函数进行处理。否则,当任务被放置在队列上时,可能无法正确序列化为 JSON。

排队关系

因为当任务排队时,所有加载的 Eloquent 模型关系也会被序列化,所以序列化的任务字符串有时会变得相当大。此外,当任务被反序列化并且从数据库中重新检索模型关系时,它们将被完整地检索。在任务排队过程中对模型进行序列化之前应用的任何先前的关系约束在任务反序列化时都不会应用。因此,如果您希望处理给定关系的子集,则应该在排队任务中重新约束该关系。

或者,为了防止关系被序列化,您可以在设置属性值时在模型上调用 withoutRelations 方法。此方法将返回一个没有加载关系的模型实例:

/**
 * 创建一个新的任务实例。
 */
public function __construct(
    Podcast $podcast,
) {
    $this->podcast = $podcast->withoutRelations();
}

如果您使用 PHP 构造函数属性提升,并且希望表明一个 Eloquent 模型不应序列化其关系,您可以使用 WithoutRelations 属性:

use Illuminate\Queue\Attributes\WithoutRelations;

/**
 * 创建一个新的任务实例。
 */
public function __construct(
    #[WithoutRelations]
    public Podcast $podcast,
) {}

如果一个任务接收的是 Eloquent 模型的集合或数组而不是单个模型,则在任务反序列化和执行时,该集合中的模型的关系将不会被恢复。这是为了防止在处理大量模型的任务时过度使用资源。

唯一任务

[!警告]
唯一任务需要一个支持 的缓存驱动程序。目前,memcachedredisdynamodbdatabasefilearray 缓存驱动程序支持原子锁。此外,唯一任务约束不适用于批次中的任务。

有时,您可能希望确保在任何时候队列中只有一个特定任务的实例。您可以通过在任务类上实现 ShouldBeUnique 接口来实现此目的。此接口不需要您在类上定义任何其他方法:

<?php

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
   ...
}

在上面的示例中,UpdateSearchIndex 任务是唯一的。因此,如果队列中已经存在该任务的另一个实例且尚未完成处理,则不会分发该任务。

在某些情况下,您可能想要定义一个特定的“键”以使任务唯一,或者您可能想要指定一个超时时间,超过该时间后任务不再保持唯一。要实现这一点,您可以在任务类上定义 uniqueIduniqueFor 属性或方法:

<?php

use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    /**
     * 产品实例。
     *
     * @var \App\Product
     */
    public $product;

    /**
     * 作业的唯一锁定将在多少秒后释放。
     *
     * @var int
     */
    public $uniqueFor = 3600;

    /**
     * 获取任务的唯一 ID。
     */
    public function uniqueId(): string
    {
        return $this->product->id;
    }
}

在上面的示例中,UpdateSearchIndex 任务通过产品 ID 唯一。因此,对于具有相同产品 ID 的任何新的任务分发将被忽略,直到现有任务完成处理。此外,如果现有任务在一小时内未被处理,则唯一锁定将被释放,并且可以将具有相同唯一键的另一个任务分发到队列中。

[!警告]
如果您的应用程序从多个 Web 服务器或容器分发任务,您应该确保您的所有服务器都与同一个中央缓存服务器通信,以便 Laravel 能够准确地确定任务是否唯一。

在处理开始前保持任务唯一

默认情况下,唯一任务在任务完成处理或所有重试尝试失败后“解锁”。但是,在某些情况下,您可能希望您的任务在处理之前立即解锁。要实现这一点,您的任务应该实现 ShouldBeUniqueUntilProcessing 契约,而不是 `ShouldBeUnique

延迟调度

如果您希望指定一个任务不应立即被队列工作者处理,您可以在调度任务时使用delay方法。例如,我们指定一个任务在被调度后10分钟内不可被处理:

<?php
 
namespace App\Http\Controllers;
 
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
 
class PodcastController extends Controller
{
    /**
     * 存储一个新的播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/*... */);
 
        //...
 
        ProcessPodcast::dispatch($podcast)
                    ->delay(now()->addMinutes(10));
 
        return redirect('/podcasts');
    }
}

在某些情况下,任务可能具有默认的延迟配置。如果您需要绕过此延迟并立即调度一个任务进行处理,可以使用withoutDelay方法:

ProcessPodcast::dispatch($podcast)->withoutDelay();

[!WARNING]
Amazon SQS 队列服务的最大延迟时间为 15 分钟。

在响应发送到浏览器后调度

另外,如果您的 Web 服务器使用 FastCGI,dispatchAfterResponse方法可以将任务的调度延迟到 HTTP 响应发送到用户浏览器之后。这仍然允许用户开始使用应用程序,即使排队任务仍在执行。这通常只应用于大约需要一秒钟的任务,例如发送电子邮件。由于它们在当前 HTTP 请求内处理,因此以这种方式调度的任务不需要队列工作者运行即可进行处理:

use App\Jobs\SendNotification;
 
SendNotification::dispatchAfterResponse();

您也可以dispatch一个闭包,并将afterResponse方法链接到dispatch辅助函数上,以便在 HTTP 响应发送到浏览器后执行闭包:

use App\Mail\WelcomeMessage;
use Illuminate\Support\Facades\Mail;
 
dispatch(function () {
    Mail::to('taylor@example.com')->send(new WelcomeMessage);
})->afterResponse();

同步调度

如果您希望立即(同步地)调度一个任务,可以使用dispatchSync方法。使用此方法时,任务不会进入队列,而是在当前进程中立即执行:

<?php
 
namespace App\Http\Controllers;
 
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
 
class PodcastController extends Controller
{
    /**
     * 存储一个新的播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/*... */);
 
        // 创建播客...
 
        ProcessPodcast::dispatchSync($podcast);
 
        return redirect('/podcasts');
    }
}

任务与数据库事务

虽然在数据库事务中调度任务是完全可以的,但您应该特别注意确保您的任务实际上能够成功执行。在事务中调度任务时,可能会出现任务在父事务提交之前被工作者处理的情况。当这种情况发生时,在数据库事务期间对模型或数据库记录所做的任何更新可能尚未反映在数据库中。此外,在事务中创建的任何模型或数据库记录可能尚未存在于数据库中。

值得庆幸的是,Laravel 提供了几种解决此问题的方法。首先,您可以在队列连接的配置数组中设置after_commit连接选项:

'redis' => [
    'driver' =>'redis',
    //...
    'after_commit' => true,
],

after_commit选项为true时,您可以在数据库事务中调度任务;然而,Laravel 将等待开放的父数据库事务提交后才实际调度任务。当然,如果当前没有打开的数据库事务,任务将立即被调度。

如果由于事务期间发生的异常导致事务回滚,则在该事务期间调度的任务将被丢弃。

[!NOTE]
after_commit配置选项设置为true还将导致在所有打开的数据库事务提交后调度任何排队的事件监听器、可邮寄对象、通知和广播事件。

内联指定提交调度行为

如果您没有将after_commit队列连接配置选项设置为true,您仍然可以指示特定任务应在所有打开的数据库事务提交后进行调度。要实现此目的,您可以将afterCommit方法链接到您的调度操作上:

use App\Jobs\ProcessPodcast;
 
ProcessPodcast::dispatch($podcast)->afterCommit();

同样,如果after_commit配置选项设置为true,您可以指示特定任务应立即调度,而无需等待任何打开的数据库事务提交:

ProcessPodcast::dispatch($podcast)->beforeCommit();

任务链

任务链允许您指定一系列排队任务,这些任务应在主任务成功执行后按顺序运行。如果序列中的一个任务失败,其余任务将不会运行。要执行排队任务链,您可以使用Bus外观提供的chain方法。Laravel 的命令总线是一个较低级别的组件,排队任务调度是建立在其之上的:

use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;
 
Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->dispatch();

除了链接任务类实例外,您还可以链接闭包:

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    function () {
        Podcast::update(/*... */);
    },
])->dispatch();

[!WARNING]
在任务中使用$this->delete()方法删除任务不会阻止链接任务的处理。只有当链中的任务失败时,链才会停止执行。

链的连接和队列

如果您想为链接任务指定应使用的连接和队列,可以使用onConnectiononQueue方法。这些方法指定了应使用的队列连接和队列名称,除非排队任务被明确分配了不同的连接/队列:

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();

向链中添加任务

有时,您可能需要从链中的另一个任务向现有任务链的开头或结尾添加一个任务。您可以使用prependToChainappendToChain方法来实现:

/**
 * 执行任务。
 */
public function handle(): void
{
    //...
 
    // 前置到当前链,在当前任务后立即运行任务...
    $this->prependToChain(new TranscribePodcast);
 
    // 追加到当前链,在链的末尾运行任务...
    $this->appendToChain(new TranscribePodcast);
}

链失败

在链接任务时,您可以使用catch方法指定一个闭包,如果链中的任务失败,应调用该闭包。给定的回调将接收导致任务失败的Throwable实例:

use Illuminate\Support\Facades\Bus;
use Throwable;
 
Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->catch(function (Throwable $e) {
    // 链中的一个任务失败了...
})->dispatch();

[!WARNING]
由于链回调是由 Laravel 队列序列化并在稍后执行的,因此您不应在链回调中使用$this变量。

自定义队列和连接

调度到特定队列

通过将任务推送到不同的队列,您可以对排队任务进行“分类”,甚至可以确定分配给各个队列的工作者数量的优先级。请记住,这不会将任务推送到由您的队列配置文件定义的不同队列“连接”,而只是推送到单个连接内的特定队列。要指定队列,在调度任务时使用onQueue方法:

<?php
 
namespace App\Http\Controllers;
 
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
 
class PodcastController extends Controller
{
    /**
     * 存储一个新的播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/*... */);
 
        // 创建播客...
 
        ProcessPodcast::dispatch($podcast)->onQueue('processing');
 
        return redirect('/podcasts');
    }
}

或者,您可以在任务的构造函数中调用onQueue方法来指定任务的队列:

<?php
 
namespace App\Jobs;
 
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
 
class ProcessPodcast implements ShouldQueue
{
    use Queueable;
 
    /**
     * 创建一个新的任务实例。
     */
    public function __construct()
    {
        $this->onQueue('processing');
    }
}

调度到特定连接

如果您的应用程序与多个队列连接进行交互,您可以使用onConnection方法指定将任务推送到哪个连接:

<?php
 
namespace App\Http\Controllers;
 
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
 
class PodcastController extends Controller
{
    /**
     * 存储一个新的播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/*... */);
 
        // 创建播客...
 
        ProcessPodcast::dispatch($podcast)->onConnection('sqs');
 
        return redirect('/podcasts');
    }
}

您可以将onConnectiononQueue方法链接在一起,为任务指定连接和队列:

ProcessPodcast::dispatch($podcast)
              ->onConnection('sqs')
              ->onQueue('processing');

或者,您可以在任务的构造函数中调用onConnection方法来指定任务的连接:

<?php
 
namespace App\Jobs;
 
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
 
class ProcessPodcast implements ShouldQueue
{
    use Queueable;
 
    /**
     * 创建一个新的任务实例。
     */
    public function __construct()
    {
        $this->onConnection('sqs');
    }
}

指定最大任务尝试次数/超时值

最大尝试次数

如果您的排队任务遇到错误,您可能不希望它无限次地重试。因此,Laravel 提供了多种方法来指定一个任务可以尝试的次数或时间。

指定一个任务可以尝试的最大次数的一种方法是通过 Artisan 命令行的--tries开关。这将适用于工作者处理的所有任务,除非正在处理的任务指定了可以尝试的次数:

php artisan queue:work --tries=3

如果一个任务超过了其最大尝试次数,它将被视为“失败”任务。有关处理失败任务的更多信息,请参考处理失败任务的文档。如果向queue:work命令提供--tries=0,则任务将无限次重试。

您可以通过在任务类本身定义最大尝试次数来采取更细粒度的方法。如果在任务上指定了最大尝试次数,它将优先于在命令行上提供的--tries值:

<?php
 
namespace App\Jobs;
 
class ProcessPodcast implements ShouldQueue
{
    /**
     * 任务可以尝试的次数。
     *
     * @var int
     */
    public $tries = 5;
}

如果您需要对特定任务的最大尝试次数进行动态控制,可以在任务上定义一个tries方法:

/**
 * 确定任务可以尝试的次数。
 */
public function tries(): int
{
    return 5;
}

基于时间的尝试

作为定义在任务失败前可以尝试的次数的替代方法,您可以定义一个任务不应再被尝试的时间。这允许在给定的时间范围内对任务进行任意次数的尝试。要定义任务不应再被尝试的时间,向您的任务类添加一个retryUntil方法。该方法应返回一个DateTime实例:

use DateTime;
 
/**
 * 确定任务应超时的时间。
 */
public function retryUntil(): DateTime
{
    return now()->addMinutes(10);
}

[!NOTE]
您还可以在排队事件监听器上定义tries属性或retryUntil方法。

最大异常数

有时您可能希望指定一个任务可以尝试多次,但如果重试是由给定数量的未处理异常触发的(而不是直接由release方法释放),则该任务应该失败。要实现此目的,您可以在任务类上定义一个maxExceptions属性:

<?php
 
namespace App\Jobs;
 
use Illuminate\Support\Facades\Redis;
 
class ProcessPodcast implements ShouldQueue
{
    /**
     * 任务可以尝试的次数。
     *
     * @var int
     */
    public $tries = 25;
 
    /**
     * 允许的未处理异常的最大数量,超过此数量任务将失败。
     *
     * @var int
     */
    public $maxExceptions = 3;
 
    /**
     * 执行任务。
     */
    public function handle(): void
    {
        Redis::throttle('key')->allow(10)->every(60)->then(function () {
            // 获取到锁,处理播客...
        }, function () {
            // 无法获取锁...
            return $this->release(10);
        });
    }
}

在这个例子中,如果应用程序无法获取 Redis 锁,任务将被释放 10 秒,并将继续重试最多 25 次。然而,如果任务抛出了 3 个未处理的异常,任务将失败。

超时

通常,您大致知道您的排队任务需要多长时间。因此,Laravel 允许您指定一个“超时”值。默认情况下,超时值为 60 秒。如果一个任务的处理时间超过了超时值指定的秒数,处理该任务的工作者将以错误退出。通常,工作者将由您服务器上配置的进程管理器自动重新启动。

可以使用 Artisan 命令行的--timeout开关指定任务可以运行的最大秒数:

php artisan queue:work --timeout=30

如果任务由于持续超时而超过其最大尝试次数,它将被标记为失败。

您也可以在任务类本身定义一个任务应该被允许运行的最大秒数。如果在任务上指定了超时,它将优先于在命令行上指定的任何超时:

<?php
 
namespace App\Jobs;
 
class ProcessPodcast implements ShouldQueue
{
    /**
     * 任务在超时前可以运行的秒数。
     *
     * @var int
     */
    public $timeout = 120;
}

有时,IO 阻塞进程,如套接字或传出的 HTTP 连接,可能不会尊重您指定的超时。因此,在使用这些功能时,您应该始终尝试使用它们的 API 指定超时。例如,在使用 Guzzle 时,您应该始终指定连接和请求超时值。

[!WARNING]
为了指定任务超时,必须安装pcntl PHP 扩展。此外,任务的“超时”值应始终小于其“重试后”值。否则,任务可能在实际完成执行或超时之前被重新尝试。

在超时时将任务标记为失败

如果您希望在超时时将任务标记为失败,您可以在任务类上定义$failOnTimeout属性:

/**
 * 指示在超时时任务是否应被标记为失败。
 *
 * @var bool
 */
public $failOnTimeout = true;

错误处理

如果在任务处理过程中抛出异常,任务将自动被释放回队列,以便可以再次尝试。任务将继续被释放,直到达到您的应用程序允许的最大尝试次数。最大尝试次数由在queue:work Artisan 命令中使用的--tries开关定义。或者,最大尝试次数也可以在任务类本身定义。有关运行队列工作者的更多信息可以在下面找到

手动将任务释放回队列

有时您可能希望手动将任务释放回队列,以便稍后可以再次尝试。您可以通过调用release方法来实现:

/**
 * 执行任务。
 */
public function handle(): void
{
   
## 队列闭包
 
除了将作业类分发到队列中,您还可以分发一个闭包。这对于需要在当前请求周期之外执行的快速、简单任务非常有用。当将闭包分发到队列时,闭包的代码内容会进行加密签名,以确保其在传输过程中不会被修改:
 
    $podcast = App\Podcast::find(1);
 
    dispatch(function () use ($podcast) {
        $podcast->publish();
    });
 
使用 `catch` 方法,您可以提供一个闭包,如果排队的闭包在耗尽您的队列的[配置重试尝试次数](#max-job-attempts-and-timeout)后仍未能成功完成,则应执行该闭包:
 
    use Throwable;
 
    dispatch(function () use ($podcast) {
        $podcast->publish();
    })->catch(function (Throwable $e) {
        // 此作业已失败...
    });
 
> [!WARNING]  
> 由于 `catch` 回调是由 Laravel 队列序列化并在稍后执行的,因此您不应在 `catch` 回调中使用 `$this` 变量。
 
 
## 运行队列工作者
 
 
### `queue:work` 命令
 
Laravel 包含一个 Artisan 命令,该命令将启动一个队列工作者,并在将新作业推送到队列时处理它们。您可以使用 `queue:work` Artisan 命令运行工作者。请注意,一旦 `queue:work` 命令启动,它将持续运行,直到手动停止或您关闭终端:
 
```shell
php artisan queue:work

[!NOTE]
为了使 queue:work 进程在后台永久运行,您应该使用进程监视器,如Supervisor,以确保队列工作者不会停止运行。

如果您希望在命令的输出中包含已处理的作业 ID,则在调用 queue:work 命令时可以包含 -v 标志:

php artisan queue:work -v

请记住,队列工作者是长期运行的进程,并将启动的应用程序状态存储在内存中。因此,在它们启动后,它们不会注意到您的代码库中的更改。因此,在您的部署过程中,务必重新启动您的队列工作者。此外,请记住,您的应用程序创建或修改的任何静态状态不会在作业之间自动重置。

或者,您可以运行 queue:listen 命令。当使用 queue:listen 命令时,当您想要重新加载更新的代码或重置应用程序状态时,您不必手动重新启动工作者;但是,此命令的效率明显低于 queue:work 命令:

php artisan queue:listen

运行多个队列工作者

要为一个队列分配多个工作者并同时处理作业,您只需启动多个 queue:work 进程。这可以在本地通过终端的多个选项卡完成,也可以在生产环境中使用您的进程管理器的配置设置完成。当使用 Supervisor 时,您可以使用 numprocs 配置值。

指定连接和队列

您还可以指定工作者应使用的队列连接。传递给 work 命令的连接名称应与您的 config/queue.php 配置文件中定义的连接之一相对应:

php artisan queue:work redis

默认情况下,queue:work 命令仅处理给定连接上的默认队列的作业。但是,您可以通过仅为给定连接处理特定队列来进一步自定义您的队列工作者。例如,如果您的所有电子邮件都在 redis 队列连接的 emails 队列中处理,则可以发出以下命令来启动一个仅处理该队列的工作者:

php artisan queue:work redis --queue=emails

处理指定数量的作业

可以使用 --once 选项指示工作者仅从队列中处理一个作业:

php artisan queue:work --once

可以使用 --max-jobs 选项指示工作者处理给定数量的作业然后退出。当与Supervisor结合使用时,此选项可能很有用,以便在处理给定数量的作业后自动重新启动您的工作者,释放它们可能积累的任何内存:

php artisan queue:work --max-jobs=1000

处理所有排队的作业然后退出

可以使用 --stop-when-empty 选项指示工作者处理所有作业然后优雅地退出。如果您希望在队列为空后关闭 Docker 容器中处理的 Laravel 队列,此选项可能很有用:

php artisan queue:work --stop-when-empty

处理作业指定的秒数

可以使用 --max-time 选项指示工作者处理作业指定的秒数然后退出。当与Supervisor结合使用时,此选项可能很有用,以便在处理作业指定的时间后自动重新启动您的工作者,释放它们可能积累的任何内存:

# 处理作业一小时然后退出...
php artisan queue:work --max-time=3600

工作者睡眠持续时间

当队列中有作业时,工作者将不间断地处理作业。但是,sleep 选项决定了如果没有可用作业,工作者将“睡眠”多少秒。当然,在睡眠期间,工作者不会处理任何新作业:

php artisan queue:work --sleep=3

维护模式和队列

当您的应用程序处于维护模式时,不会处理排队的作业。一旦应用程序退出维护模式,作业将继续正常处理。

要强制您的队列工作者即使在启用维护模式的情况下也处理作业,您可以使用 --force 选项:

php artisan queue:work --force

资源考虑因素

守护进程队列工作者在处理每个作业之前不会“重新启动”框架。因此,您应该在每个作业完成后释放任何繁重的资源。例如,如果您使用 GD 库进行图像操作,则在处理完图像后应使用 imagedestroy 释放内存。

队列优先级

有时您可能希望确定队列的处理优先级。例如,在您的 config/queue.php 配置文件中,您可以将 redis 连接的默认 queue 设置为 low。但是,有时您可能希望将作业推送到 high 优先级队列,如下所示:

dispatch((new Job)->onQueue('high'));

要启动一个工作者,该工作者在继续处理 low 队列上的任何作业之前,先验证所有 high 队列作业是否已处理,请将队列名称的逗号分隔列表传递给 work 命令:

php artisan queue:work --queue=high,low

队列工作者和部署

由于队列工作者是长期运行的进程,如果不重新启动,它们不会注意到您的代码的更改。因此,使用队列工作者部署应用程序的最简单方法是在部署过程中重新启动工作者。您可以通过发出 queue:restart 命令来优雅地重新启动所有工作者:

php artisan queue:restart

此命令将指示所有队列工作者在完成当前作业的处理后优雅地退出,以确保不会丢失任何现有作业。由于在执行 queue:restart 命令时队列工作者将退出,因此您应该运行一个进程管理器,如Supervisor,以自动重新启动队列工作者。

[!NOTE]
队列使用缓存来存储重新启动信号,因此在使用此功能之前,您应该验证您的应用程序是否正确配置了缓存驱动程序。

作业过期和超时

作业过期

在您的 config/queue.php 配置文件中,每个队列连接都定义了一个 retry_after 选项。此选项指定队列连接在重试正在处理的作业之前应等待的秒数。例如,如果 retry_after 的值设置为 90,如果作业在未被释放或删除的情况下处理了 90 秒,则该作业将被释放回队列。通常,您应该将 retry_after 值设置为您的作业合理完成处理所需的最大秒数。

[!WARNING]
唯一不包含 retry_after 值的队列连接是 Amazon SQS。SQS 将根据 默认可见性超时 (opens in a new tab) 重试作业,该超时在 AWS 控制台中进行管理。

工作者超时

queue:work Artisan 命令公开了一个 --timeout 选项。默认情况下,--timeout 值为 60 秒。如果作业处理的时间超过超时值指定的秒数,则处理该作业的工作者将以错误退出。通常,服务器上配置的进程管理器会自动重新启动工作者:

php artisan queue:work --timeout=60

retry_after 配置选项和 --timeout CLI 选项是不同的,但它们共同作用以确保作业不会丢失,并且作业仅成功处理一次。

[!WARNING]
--timeout 值应始终比您的 retry_after 配置值短至少几秒。这将确保处理冻结作业的工作者在作业重试之前始终被终止。如果您的 --timeout 选项比您的 retry_after 配置值长,您的作业可能会被处理两次。

Supervisor 配置

在生产环境中,您需要一种方法来保持您的 queue:work 进程运行。queue:work 进程可能由于多种原因停止运行,例如工作者超时或执行 queue:restart 命令。

因此,您需要配置一个进程监视器,该监视器可以检测到您的 queue:work 进程何时退出并自动重新启动它们。此外,进程监视器可以允许您指定要同时运行的 queue:work 进程的数量。Supervisor 是在 Linux 环境中常用的进程监视器,我们将在以下文档中讨论如何对其进行配置。

安装 Supervisor

Supervisor 是 Linux 操作系统的进程监视器,如果 queue:work 进程失败,它将自动重新启动它们。要在 Ubuntu 上安装 Supervisor,您可以使用以下命令:

sudo apt-get install supervisor

[!NOTE]
如果自己配置和管理 Supervisor 听起来令人生畏,可以考虑使用Laravel Forge (opens in a new tab),它将为您的生产 Laravel 项目自动安装和配置 Supervisor。

配置 Supervisor

Supervisor 配置文件通常存储在 /etc/supervisor/conf.d 目录中。在该目录中,您可以创建任意数量的配置文件,以指示 Supervisor 如何监视您的进程。例如,让我们创建一个 laravel-worker.conf 文件,该文件启动并监视 queue:work 进程:

[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600

在这个例子中,numprocs 指令将指示 Supervisor 运行八个 queue:work 进程并监视它们所有,如果它们失败则自动重新启动它们。您应该更改配置的 command 指令,以反映您所需的队列连接和工作者选项。

[!WARNING]
您应该确保 stopwaitsecs 的值大于您最长运行作业所消耗的秒数。否则,Supervisor 可能会在作业完成处理之前杀死作业。

启动 Supervisor

创建配置文件后,您可以使用以下命令更新 Supervisor 配置并启动进程:

sudo supervisorctl reread
 
sudo supervisorctl update
 
sudo supervisorctl start "laravel-worker:*"

有关 Supervisor 的更多信息,请查阅Supervisor 文档 (opens in a new tab)

处理失败的作业

有时您的排队作业会失败。别担心,事情并不总是按计划进行的!Laravel 提供了一种方便的方式来指定作业应尝试的最大次数。在异步作业超过此尝试次数后,它将被插入到 failed_jobs 数据库表中。同步分发的作业如果失败,则不会存储在此表中,它们的异常将立即由应用程序处理。

在新的 Laravel 应用程序中,通常已经存在创建 failed_jobs 表的迁移。但是,如果您的应用程序中没有为此表的迁移,您可以使用 make:queue-failed-table 命令创建迁移:

php artisan make:queue-failed-table
 
php artisan migrate

在运行队列工作者进程时,您可以使用 queue:work 命令的 --tries 开关指定作业应尝试的最大次数。如果您未为 --tries 选项指定值,则作业将仅尝试一次或根据作业类的 $tries 属性指定的次数进行尝试:

php artisan queue:work redis --tries=3

使用 --backoff 选项,您可以指定 Laravel 在遇到异常的作业重试之前应等待的秒数。默认情况下,作业会立即释放回队列,以便可以再次尝试:

php artisan queue:work redis --tries=3 --backoff=3

如果您希望在每个作业的基础上配置 Laravel 在遇到异常的作业重试之前应等待的秒数,您可以通过在作业类中定义 backoff 属性来实现:

/**
 * 等待重试作业的秒数。
 *
 * @var int
 */
public $backoff = 3;

如果您需要更复杂的逻辑来确定作业的退避时间,您可以在作业类中定义一个 backoff 方法:

/**
* 计算等待重试作业的秒数。
*/
public function backoff(): int
{
    return 3;
}

您可以通过从 backoff 方法返回一个退避值数组来轻松配置“指数”退避。在这个例子中,如果还有更多的尝试次数,第一次重试的重试延迟将为 1 秒,第二次重试为 5 秒,第三次重试为 10 秒,后续的每次重试都为 10 秒:

/**
* 计算等待重试作业的秒数。
*
* @return array<int, int>
*/
public function backoff(): array
{
    return [1, 5, 10];
}

失败作业的清理

当特定作业失败时,您可能希望向您的用户发送警报或还原作业部分完成的任何操作。要实现此目的,您可以在作业类中定义一个 failed 方法。导致作业失败的 Throwable 实例将传递给 failed 方法:

<?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Throwable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * 创建一个新的作业实例。
     */
    public function __construct(
        public Podcast $podcast,
    ) {}

    /**
     * 执行作业。
     */
    public function handle(AudioProcessor $processor): void
    {
        // 处理上传的播客...
    }

    /**
     * 处理作业失败。
     */
    public function failed(?Throwable $exception): void
    {
        // 发送用户失败通知等...
    }
}

[!WARNING]
在调用 failed 方法之前,会实例化作业的一个新实例;因此,在 handle 方法中可能发生的任何类属性修改都将丢失。

重试失败的作业

要查看已插入到 failed_jobs 数据库表中的所有失败作业,您可以使用 queue:failed Artisan 命令:

php artisan queue:failed

queue:failed 命令将列出作业 ID、连接、队列、失败时间以及有关作业的其他信息。作业 ID 可用于重试失败的作业。例如,要重试 ID 为 ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 的失败作业,请发出以下命令:

php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece

如果需要,您可以将多个 ID 传递给该命令:

php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c