一起聊聊thinkphp6使用think-queue实现普通队列和延迟队列

 6292

本篇文章给大家带来了关于thinkphp的相关知识,其中主要介绍了关于使用think-queue来实现普通队列延迟队列的相关内容,think-queue是thinkphp官方提供的一个消息队列服务,下面一起来看一下,希望对大家有帮助。

一起聊聊thinkphp6使用think-queue实现普通队列和延迟队列

###TP6 队列

TP6 中使用 think-queue 可以实现普通队列和延迟队列。

think-queue 是thinkphp 官方提供的一个消息队列服务,它支持消息队列的一些基本特性:

消息的发布,获取,执行,删除,重发,失败处理,延迟执行,超时控制等

队列的多队列, 内存限制 ,启动,停止,守护等

消息队列可降级为同步执行

消息队列实现过程

1、通过生产者推送消息到消息队列服务中

2、消息队列服务将收到的消息存入redis队列中(zset)

3、消费者进行监听队列,当监听到队列有新的消息时,获取队列第一条

4、处理获取下来的消息调用业务类进行处理相关业务

5、业务处理后,需要从队列中删除消息


composer 安装 think-queue

  1. composer require topthink/think-queue

配置文件

安装完 think-queue 后会在 config 目录中生成 queue.php,这个文件是队列的配置文件。

tp6中提供了多种消息队列的实现方式,默认使用sync,我这里选择使用Redis。

  1. return [
  2.     'default'     => 'redis',
  3.     'connections' => [
  4.         'sync'     => [
  5.             'type' => 'sync',
  6.         ],
  7.         'database' => [
  8.             'type'       => 'database',
  9.             'queue'      => 'default',
  10.             'table'      => 'jobs',
  11.             'connection' => null,
  12.         ],
  13.         'redis'    => [
  14.             'type'       => 'redis',
  15.             'queue'      => 'default',
  16.             'host'       => env('redis.host', '127.0.0.1'),
  17.             'port'       => env('redis.port', '6379'),
  18.             'password'   => env('redis.password','123456'),
  19.             'select'     => 0,
  20.             'timeout'    => 0,
  21.             'persistent' => false,
  22.         ],
  23.     ],
  24.     'failed'      => [
  25.         'type'  => 'none',
  26.         'table' => 'failed_jobs',
  27.     ],
  28. ];

创建目录及队列消费类文件

在 app 目录下创建 queue 目录,然后在该目录下新建一个抽象类 Queue.php 文件,作为基础类

  1. <?php
  2. namespace app\queue;
  3. use think\facade\Cache;
  4. use think\queue\Job;
  5. use think\facade\Log;
  6. /**
  7.  * Class Queue 队列消费基础类
  8.  * @package app\queue
  9.  */
  10. abstract class Queue
  11. {
  12.     /**
  13.      * @describe:fire是消息队列默认调用的方法
  14.      * @param \think\queue\Job $job
  15.      * @param $message
  16.      */
  17.     public function fire(Job $job, $data)
  18.     {
  19.         if (empty($data)) {
  20.             Log::error(sprintf('[%s][%s] 队列无消息', __CLASS__, __FUNCTION__));
  21.             return ;
  22.         }
  23.  
  24.         $jobId = $job->getJobId(); // 队列的数据库id或者redis key
  25.         // $jobClassName = $job->getName(); // 队列对象类
  26.         // $queueName = $job->getQueue(); // 队列名称
  27.  
  28.         // 如果已经执行中或者执行完成就不再执行了
  29.         if (!$this->checkJob($jobId, $data)) {
  30.             $job->delete();
  31.             Cache::store('redis')->delete($jobId);
  32.             return ;
  33.         }
  34.  
  35.         // 执行业务处理
  36.         if ($this->execute($data)) {
  37.             Log::record(sprintf('[%s][%s] 队列执行成功', __CLASS__, __FUNCTION__));
  38.             $job->delete(); // 任务执行成功后删除
  39.             Cache::store('redis')->delete($jobId); // 删除redis中的缓存
  40.         } else {
  41.             // 检查任务重试次数
  42.             if ($job->attempts() > 3) {
  43.                 Log::error(sprintf('[%s][%s] 队列执行重试次数超过3次,执行失败', __CLASS__, __FUNCTION__));
  44.                  // 第1种处理方式:重新发布任务,该任务延迟10秒后再执行;也可以不指定秒数立即执行
  45.                 //$job->release(10); 
  46.                 // 第2种处理方式:原任务的基础上1分钟执行一次并增加尝试次数
  47.                 //$job->failed();   
  48.                 // 第3种处理方式:删除任务
  49.                 $job->delete(); // 任务执行后删除
  50.                 Cache::store('redis')->delete($jobId); // 删除redis中的缓存
  51.             }
  52.         }
  53.     }
  54.  
  55.     /**
  56.      * 消息在到达消费者时可能已经不需要执行了
  57.      * @param  string  $jobId
  58.      * @param $message
  59.      * @return bool 任务执行的结果
  60.      * @throws \Psr\SimpleCache\InvalidArgumentException
  61.      */
  62.     protected function checkJob(string $jobId, $message): bool
  63.     {
  64.         // 查询redis
  65.         $data = Cache::store('redis')->get($jobId);
  66.         if (!empty($data)) {
  67.             return false;
  68.         }
  69.         Cache::store('redis')->set($jobId, $message);
  70.         return true;
  71.     }
  72.  
  73.     /**
  74.      * @describe: 根据消息中的数据进行实际的业务处理
  75.      * @param $data 数据
  76.      * @return bool 返回结果
  77.      */
  78.     abstract protected function execute($data): bool;
  79. }

所有真正的消费类继承基础抽象类

  1. <?php
  2. namespace app\queue\test;
  3. use app\queue\Queue;
  4. class Test extends Queue
  5. {
  6.     protected function execute($data): bool
  7.     {
  8.        // 具体消费业务逻辑
  9.     }
  10. }

生产者逻辑

  1. use think\facade\Queue;
  2.  
  3. // 普通队列生成调用方式
  4. Queue::push($job, $data, $queueName);
  5. // 例:
  6. Queue::push(Test::class, $data, $queueName);
  7.  
  8. // 延时队列生成调用方式
  9. Queue::later($delay, $job, $data, $queueName);
  10. // 例如使用延时队列 10 秒后执行:
  11. Queue::later(10 , Test::class, $data, $queueName);

开启进程监听任务并执行

  1. php think queue:listen
  2. php think queue:work


命令模式介绍

命令模式

queue:work 命令

work 命令: 该命令将启动一个 work 进程来处理消息队列。

  1. php think queue:work --queue TestQueue

queue:listen 命令

listen 命令: 该命令将会创建一个 listen 父进程 ,然后由父进程通过 proc_open(‘php think queue:work’) 的方式来创建一个work 子 进程来处理消息队列,且限制该work进程的执行时间。

  1. php think queue:listen --queue TestQueue


命令行参数

Work 模式

  1. php think queue:work \
  2. --daemon            //是否循环执行,如果不加该参数,则该命令处理完下一个消息就退出
  3. --queue  helloJobQueue  //要处理的队列的名称
  4. --delay  0 \        //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
  5. --force  \          //系统处于维护状态时是否仍然处理任务,并未找到相关说明
  6. --memory 128 \      //该进程允许使用的内存上限,以 M 为单位
  7. --sleep  3 \        //如果队列中无任务,则sleep多少秒后重新检查(work+daemon模式)或者退出(listen或非daemon模式)
  8. --tries  2          //如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0

Listen 模式

  1. php think queue:listen \
  2. --queue  helloJobQueue \   //监听的队列的名称
  3. --delay  0 \         //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
  4. --memory 128 \       //该进程允许使用的内存上限,以 M 为单位
  5. --sleep  3 \         //如果队列中无任务,则多长时间后重新检查,daemon模式下有效
  6. --tries  0 \         //如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0
  7. --timeout 60         //创建的work子进程的允许执行的最长时间,以秒为单位

可以看到 listen 模式下,不包含 --deamon 参数,原因下面会说明

消息队列的开始,停止与重启

开始一个消息队列:

  1. php think queue:work

停止所有的消息队列:

  1. php think queue:restart

重启所有的消息队列:

  1. php think queue:restart 
  2. php think queue:work


本文网址:https://www.zztuku.com/detail-11674.html
站长图库 - 一起聊聊thinkphp6使用think-queue实现普通队列和延迟队列
申明:本文转载于《CSDN》,如有侵犯,请 联系我们 删除。

评论(0)条

您还没有登录,请 登录 后发表评论!

提示:请勿发布广告垃圾评论,否则封号处理!!

    编辑推荐

    电竞游戏装备矢量素材