Laravel中Kafka的使用详解

 4047

这篇文章主要介绍了LaravelKafka的使用详解,kafka是一个分布式消息队列,具有高性能、持久化、多副本备份、横向扩展能力,有对于消息队列感兴趣的同学可以参考下

本文并没有kafka的安装教程,本文是针对已经安装kafka及其配置好kafka的php拓展并且使用laravel框架进行开发项目,配置一个可供laravel框架使用的生产及消费者类。

以下代码修改自本站的YII框架关于kafka类的代码,经过测试使用在本人的项目中,可正常运行,larvael版本:5.6 代码放置larvael框架位置:app/Tools/Kafka.php

  1. <?php
  2. namespace App\Tools;
  3.  
  4. use Illuminate\Config\Repository;
  5.  
  6. use Illuminate\Support\Facades\DB;
  7. use Monolog\Logger;
  8. use Monolog\Handler\StreamHandler;
  9.  
  10. use Illuminate\Http\Request;
  11.  
  12. class Kafka
  13. {
  14.     public $broker_list = '127.0.0.1';//配置kafka,可以用逗号隔开多个kafka
  15.     public $topic = 'test';//管道名称
  16.     public $partition = 0;
  17.  
  18.     protected $producer = null;
  19.     protected $consumer = null;
  20.  
  21.     public function __construct()
  22.     {
  23.         if (empty($this->broker_list)) {
  24.             throw new InvalidConfigException("broker not config");
  25.         }
  26.         $rk = new \RdKafka\Producer();
  27.         if (empty($rk)) {
  28.             throw new InvalidConfigException("producer error");
  29.         }
  30.         $rk->setLogLevel(LOG_DEBUG);
  31.         if (!$rk->addBrokers($this->broker_list)) {
  32.             throw new InvalidConfigException("producer error");
  33.         }
  34.         $this->producer = $rk;
  35.     }
  36.  
  37.     /**
  38.      * 生产者
  39.      * @param array $messages
  40.      * @return mixed
  41.      */
  42.     public function send($messages = [],$topic)
  43.     {
  44.         $topic = $this->producer->newTopic($topic);
  45.         return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages));
  46.     }
  47.  
  48.     /**
  49.      * 消费者
  50.      */
  51.     public function consumer($object, $callback){
  52.         $conf = new \RdKafka\Conf();
  53.         $conf->set('group.id', 0);
  54.         $conf->set('metadata.broker.list', $this->broker_list);
  55.      
  56.         $topicConf = new \RdKafka\TopicConf();
  57.         $topicConf->set('auto.offset.reset', 'smallest');
  58.      
  59.         $conf->setDefaultTopicConf($topicConf);
  60.      
  61.         $consumer = new \RdKafka\KafkaConsumer($conf);
  62.      
  63.         $consumer->subscribe([$this->topic]);
  64.      
  65.         echo "waiting for messages.....\n";
  66.         while(true) {
  67.             $message = $consumer->consume(120*1000);
  68.             switch ($message->err) {
  69.                 case RD_KAFKA_RESP_ERR_NO_ERROR:
  70.                 echo "message payload....";
  71.                 $object->$callback($message->payload);
  72.                 break;
  73.             }
  74.             sleep(1);
  75.         }
  76.     }
  77. }
  78. ?>

在控制器中如何使用:

首先再头部导入这个类:use App\Tools\Kafka;

下面是使用生产者实例:

  1. public function test(){ 
  2.     $topic = 'tool';//输入使用管道名称
  3.     $data['shop_id'] = 58;
  4.     $data['bar_code']=586;
  5.     $data['goods_num'] = 1;
  6.     $data['goods_unit'] = '个';
  7.  
  8.     $Kafka = new Kafka();
  9.     $Error_Msg = $Kafka->send($data,$topic);//传入数组会自动转换json
  10.     var_dump($Error_Msg); 
  11. }

下面是消费者实例,消费者我这里使用了的是php脚本进行的操作:

  1. <?php
  2.  
  3. $conf = new RdKafka\Conf();
  4.  
  5. $conf->set('group.id', 'myConsumerGroup');
  6.  
  7. $rk = new RdKafka\Consumer($conf);
  8. $rk->addBrokers("localhost:9092");
  9.  
  10. $topicConf = new RdKafka\TopicConf();
  11. $topicConf->set('auto.commit.interval.ms', 100);
  12. $topicConf->set('offset.store.method', 'file');
  13. $topicConf->set('offset.store.path', sys_get_temp_dir());
  14. $topicConf->set('auto.offset.reset', 'smallest');
  15.  
  16. $topic = $rk->newTopic("tool", $topicConf);//读取的管道
  17.  
  18. // Start consuming partition 0
  19. $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
  20.  
  21. while (true) {
  22.     $message = $topic->consume(0, 120*10000);
  23.     switch ($message->err) {
  24.         case RD_KAFKA_RESP_ERR_NO_ERROR:
  25.             //没有错误打印信息
  26.             $message = json_decode(json_encode($message),true);
  27.             $data = json_decode($message['payload'],true);
  28.             var_dump($data);
  29.             break;
  30.         case RD_KAFKA_RESP_ERR__PARTITION_EOF:
  31.             echo "等待接收信息\n";
  32.             break;
  33.         case RD_KAFKA_RESP_ERR__TIMED_OUT:
  34.             echo "超时\n";
  35.             break;
  36.         default:
  37.             throw new \Exception($message->errstr(), $message->err);
  38.             break;
  39.     }
  40.     sleep(1);
  41. }
  42.  
  43. ?>



TAG标签:
本文网址:https://www.zztuku.com/index.php/detail-8748.html
站长图库 - Laravel中Kafka的使用详解
申明:如有侵犯,请 联系我们 删除。

评论(0)条

您还没有登录,请 登录 后发表评论!

提示:请勿发布广告垃圾评论,否则封号处理!!

    编辑推荐