一、概述

在Web开发中,我们经常会遇到需要批量处理任务的场景,比如群发邮件、秒杀资格获取等,我们将这些耗时或者高并发的操作放到队列中异步执行可以有效缓解系统压力、提高系统响应速度和负载能力。

二、配置文件

我们仍然从配置文件开始,首先我们需要在配置文件中配置默认队列驱动为Redis。lumen没有配置文件,可以从laravel项目中拷贝一份config目录过来。
队列配置文件是config/queue.php

return [

    'default' => env('QUEUE_DRIVER', 'sync'),

    'connections' => [
        'database' => [
            'driver' => 'database',
            'table' => 'jobs',
            'queue' => 'default',
            'expire' => 60,
        ],
        'redis' => [
            'driver' => 'redis',
            'connection' => 'default',
            'queue' => 'default',
            'expire' => 60,
        ],
    ],

    'failed' => [
        'database' => 'mysql', 'table' => 'failed_jobs',
    ],
];

配置文件第一个配置项default用于指定默认的队列驱动,修改.env中的QUEUE_DRIVER即可。

connections配置项包含了Laravel支持的所有队列驱动,我们使用Redis驱动,所以需要配置redis项:connection对应config/database.php中redis的default配置queue为默认队列名称;expire为队列任务过期时间(秒)。这里我们可以保持其默认配置不变。

failed配置项用于配置失败队列任务存放的数据库及数据表。这里我们需要按照自己的数据库配置对其做相应修改。

要使用 redis 队列驱动,需要在配置文件 config/database.php 中配置 Redis 数据库连接。

如果 Redis 队列连接使用 Redis Cluster(集群),队列名称必须包含 key hash tag,以确保给定队列对应的所有 Redis keys 都存放到同一个 hash slot

'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => '{default}',
    'retry_after' => 90,
],

注:对一般中小型应用推荐使用 Redis 作为队列驱动。

三、驱动预备知识

数据库
要使用 database 队列驱动,你需要数据表保存任务信息(比如失败任务)。要生成创建这些表的迁移,可以在项目目录下运行 Artisan 命令 queue:table,迁移被创建之后,可以使用 migrate 命令生成这些表:

php artisan queue:table
php artisan migrate

运行后生成failed_jobsjobsmigrations三张表。

四、创建任务

1、生成任务类

通常,所有的任务类都保存在 app/Jobs 目录。laravel中 app/Jobs 不存在,在运行 Artisan 命令 make:job 的时候,它将会自动创建。你可以通过 Artisan CLI 来生成队列任务类:

php artisan make:job ProcessPodcast

生成的类都实现了 Illuminate\Contracts\Queue\ShouldQueue 接口, 告诉 Laravel 将该任务推送到队列,而不是立即运行:

clipboard.png

lumen中 app/Jobs目录已经存在,由于不能执行artisan命令,直接复制目录中的ExampleJob.php即可。该文件继承Job.php 从而实现了ShouldQueue

clipboard.png

clipboard.png

clipboard.png

2、任务类结构

任务类非常简单,通常只包含处理该任务的 handle 方法,在任务被处理的时候调用,注意我们可以在任务的 handle 方法中进行依赖注入。Laravel 服务容器会自动注入这些依赖。

3、分发任务

创建好任务类后,就可以通过任务自身的 dispatch 方法将其分发到队列。dispatch 方法需要的唯一参数就是该任务的实例:

clipboard.png

lumen中用法:

clipboard.png

4、指定最大失败次数

指定队列任务最大失败次数的一种实现方式是通过 Artisan 命令 --tries 切换:

php artisan queue:work --tries=3

不过,你还可以在任务类自身定义最大失败次数来实现更加细粒度的控制,如果最大失败次数在任务中指定,则其优先级高于命令行指定的数值:

   <?php
    
    namespace App\Jobs;
    
    class ProcessPodcast implements ShouldQueue
    {
        /**
         * The number of times the job may be attempted.
         *
         * @var int
         */
        public $tries = 5;
    }

5、超时

注:timeout 方法为 PHP7.1+ 和 pcntl 扩展做了优化。

类似的,队列任务最大运行时长(秒)可以通过 Artisan 命令上的 --timeout 开关来指定:

php artisan queue:work --timeout=30

同样,你也可以在任务类中定义该任务允许运行的最大时长(单位:秒),任务中指定的超时时间优先级也高于命令行定义的数值:

<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * The number of seconds the job can run before timing out.
     *
     * @var int
     */
    public $timeout = 120;
}

6、基于时间的尝试次数

除了定义在任务失败前的最大尝试次数外,还可以定义在指定时间内允许任务的最大尝试次数,这可以通过在任务类中添加 retryUntil 方法来实现:

/**
 * Determine the time at which the job should timeout.
 *
 * @return \DateTime
 */
public function retryUntil()
{
    return now()->addSeconds(5);
}

注:还可以在队列时间监听器中定义 retryUntil 方法。

7、频率限制

注:该功能要求应用可以与 Redis 服务器进行交互。

如果应用使用了 Redis,那么可以使用时间或并发来控制队列任务。该功能特性在队列任务与有频率限制的 API 交互时很有帮助,例如,通过 throttle 方法,你可以限定给定类型任务每 60 秒只运行 10 次。如果不能获取锁,需要将任务释放回队列以便可以再次执行:

Redis::throttle('key')->allow(10)->every(60)->then(function () {
    // Job logic...
}, function () {
    // Could not obtain lock...

    return $this->release(10);
});

注:在上面的例子中,上面的方法可能无法找到,但是直接复制即可使用(具体还不清楚,知道的大神可以留言指教)。key 可以是任意可以唯一标识你想要限定访问频率的任务类型的字符串。举个例子,这个键可以基于任务类名和操作 Eloquent 模型的 ID 进行构建。

8、最大进程数量

除此之外,还可以指定可以同时处理给定任务的最大进程数量。这个功能在队列任务正在编辑一次只能由一个任务进行处理的资源时很有用。例如,使用 funnel 方法你可以给定类型任务一次只能由一个工作进程进行处理:

Redis::funnel('key')->limit(1)->then(function () {
    // Job logic...
}, function () {
    // Could not obtain lock...

    return $this->release(10);
});

注:使用频率限制时,任务在运行成功之前需要的最大尝试次数很难权衡,因此,将频率限制和基于时间的尝试次数结合起来使用是个不错的选择。

9、运行队列进程

Laravel 自带了一个队列进程用来处理被推送到队列的新任务。你可以使用 queue:work 命令运行这个队列进程。请注意,队列进程开始运行后,会持续监听队列,直至你手动停止或关闭终端:

php artisan queue:work

注:为了保持队列进程 queue:work 持续在后台运行,需要使用进程守护程序,比如 Supervisor 来确保队列进程持续运行。简单处理可以使用 php artisan queue:work --daemon &

10、运行队列监听器

开始进行队列监听
laravel 包含了一个 Artisan 命令来运行推送到队列中的任务的执行。你可以使用 queue:listen 命令来运行监听器:

php artisan queue:listen

注意:queue:listen要比queue:work --daemon 性能差很多。

你也可以指定监听哪一个连接的队列:

php artisan queue:listen connection-name

请记住,队列进程是长生命周期的进程,会在启动后驻留内存。若应用有任何改动将不会影响到已经启动的进程。所以请在发布程序后,重启队列进程

可以通过 Aritisan 命令 queue:restart 来优雅地重启队列进程:

php artisan queue:restart

该命令将在队列进程完成正在进行的任务后,结束该进程,避免队列任务的丢失或错误。由于队列进程会在执行 queue:restart 命令后死掉,你仍然需要通过进程守护程序如 Supervisor 来自动重启队列进程。

注:队列使用缓存来存储重启信号,所以在使用此功能前你需要验证缓存驱动配置正确。

五、配置 Supervisor

  • 安装 Supervisor

Supervisor 是 Linux 系统中常用的进程守护程序。如果队列进程 queue:work 意外关闭,它会自动重启启动队列进程。在 Ubuntu 安装Supervisor 非常简单:

sudo apt-get install supervisor

注:如果自己配置 Supervisor 有困难,可以考虑使用 Laravel Forge,它会为 Laravel 项目自动安装并配置 Supervisor。

  • 配置 Supervisor

Supervisor 配置文件通常存放在 /etc/supervisor/conf.d 目录,在该目录下,可以创建多个配置文件指示 Supervisor 如何监视进程,例如,让我们创建一个开启并监视 queue:work 进程的 laravel-worker.conf 文件:

[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d 
command= php /home/forge/app.com/artisan queue:work redis --sleep=3 --tries=3 --daemon
autostart=true
autorestart=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log

在本例中,numprocs 指令让 Supervisor 运行 8 个 queue:work 进程并监视它们,如果失败的话自动重启。当然,你需要修改 queue:work sqs 的 command 指令来映射你的队列连接。

  • 启动 Supervisor

当成功创建配置文件后,需要刷新 Supervisor 的配置信息并使用如下命令启动进程:

sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start laravel-worker:*

使用top 或者ps aux | grep php 命令可以看到启动的php进程。
你可以通过 Supervisor 官方文档获取更多信息。

CentOS中配置稍微有些区别:

yum -y install python-setuptools
easy_install supervisor

supervisor安装完成后会生成三个执行程序:

supervisortd supervisor的守护进程服务(用于接收进程管理命令)
supervisorctl 客户端(用于和守护进程通信,发送管理进程的指令)
echo_supervisord_conf 生成初始配置文件程序。

将配置文件重定向到/etc/目录下面

mkdir /etc/supervisor
echo_supervisord_conf > /etc/supervisor/supervisord.conf

默认配置文件在/etc/supervisor/supervisord.conf 。
编辑配置文件:找到最后一行,引入自定义配置文件

;[include]
;files = conf.d/*.ini

去掉[include]files前面的“;” include生效,在/etc/supervisor/下创建conf.d文件夹,在其中添加类似ubuntu中配置文件。

mkdir conf.d

启动:
supervisord 启动supervisor
supervisorctl 控制supervisord
启动后会看到一堆信息,但是不影响。

/usr/lib/python2.7/site-packages/supervisor/options.py:296: UserWarning: 
Supervisord is running as root and it is searching for its configuration file 
in default locations (including its current working directory); 
you probably want to specify a "-c" argument specifying an absolute path 
to a configuration file for improved security.
  'Supervisord is running as root and it is searching '

可指定配置文件: supervisord -c /etc/supervisord.conf

每次修改配置后都需要重启supervisor才能生效

supervisorctl reload 

监控状态:

supervisorctl status

附一个sqs错误处理,redis方式不使用sqs

In SqsConnector.php line 26:
                                       
  Class 'Aws\Sqs\SqsClient' not found 

clipboard.png

使用 composer 安装:

composer require aws/aws-sdk-php-laravel