高并发实例分享:Swoole通过聚合请求高效实现业务

 2848

本篇文章给大家分享一个Swoole高并发聚合请求实例,介绍在高并发场景下如何通过聚合请求,充分利用数据库的批量处理更高效地实现业务功能。此示例仅用作抛砖引玉,希望能激发大家更深入的思考。


高并发实例分享:Swoole通过聚合请求高效实现业务


本示例选取的背景是并发下单业务。常规情况下,后端创建订单是逐条 insert 的操作。在并发较低的时候,数据库的 insert 操作的确能保持不错的效率,但是当遇到请求数量增多,数据库 频繁地单次 insert 就会让下单业务整体效率变低 (本文简单地假设1次下单=1个 insert

通过上面的描述,其实已经很容易想到需要优化的地方了。类比现实生活中乘坐电梯的场景:一架电梯 装满后再上行,可以最快地缓解人流压力。

下面我们就来用代码简单实现一下我们思路:

  1. <?php
  2.  
  3. Swoole\Runtime::enableCoroutine($flags = SWOOLE_HOOK_ALL);
  4.  
  5. // 最大等待次数
  6. const MAX_TIMES = 10;
  7. // 按批处理时, 每一批的最大请求暂留数量
  8. const MAX_REQUEST = 3;
  9. // 服务端最大超时时间, 避免客户端一直等待
  10. const MAX_TIMEOUT = 5;
  11.  
  12. Co\run(function () {
  13.     // 请求传输的channel, 原因是不要在swoole的协程环境中, 使用多个协程修改同一个全局变量
  14.     // 如果是golang, 当然是可以不定义这里的$rqChannel
  15.     // 只需要简单的将下面的$rqQueue和$times定义为全局变量即可达到一样的效果
  16.     // 但是最好的方式任然是是通过channel共享内存
  17.     $rqChannel = new Swoole\Coroutine\Channel(MAX_REQUEST);
  18.  
  19.     // 模拟创建订单
  20.     $createOrder = function () use ($rqChannel) {
  21.         // 使用数组模拟请求暂留队列
  22.         $rqQueue = [];
  23.         // 使用等待次数模拟tick效果
  24.         $times = MAX_TIMES;
  25.         while (true) {
  26.             $times--;
  27.             // 必须带上timeout参数, 否则channel是阻塞的
  28.             $rq = $rqChannel->pop(1);
  29.             // 保存1个正常的请求数据
  30.             if (!empty($rq)) {
  31.                 $rqQueue[] = $rq;
  32.             }
  33.             // 请求数量未达上限或者还有等待次数时, 提前进入下一次循环
  34.             if ($times > 0 && count($rqQueue) < MAX_REQUEST) {
  35.                 continue;
  36.             }
  37.             // 重置等待次数
  38.             $times = MAX_TIMES;
  39.             // 初始化SQL
  40.             $sql = "INSERT INTO orders VALUES ";
  41.             $inserts = [];
  42.             // 模拟数据验证
  43.             $validator = function ($input): bool {
  44.                 // 为了缩减代码, 没有真的做数据验证的处理
  45.                 array_filter($input);
  46.                 return true;
  47.             };
  48.             // $rqQueue在协程上下文是并发安全的, 所以遍历时不用担心
  49.             foreach ($rqQueue as $index => $rq) {
  50.                 list($data, $chan) = $rq;
  51.                 // 这里可以考虑后置执行, 原因是后面可以有一些补救逻辑
  52.                 unset($rqQueue[$index]);
  53.                 // 判断$chan是否关闭å
  54.                 if ($chan->errCode === SWOOLE_CHANNEL_CLOSED) {
  55.                     $data = null;
  56.                     continue;
  57.                 }
  58.                 $bool = $validator($data);
  59.                 if ($bool) {
  60.                     $inserts[] = "({$data['user_name']}, {$data['amount']}, {$data['mobile']})";
  61.                     $chan->push(['state' => 1]);
  62.                 } else {
  63.                     $chan->push(['state' => 0]);
  64.                 }
  65.                 // unset($rqQueue[$index]);
  66.             }
  67.             $sql .= (implode(',', $inserts) . ';');
  68.             // 模拟创建订单落库的逻辑
  69.             echo $sql;
  70.         }
  71.     };
  72.  
  73.     // 新手要注意这一句代码的位置, 原因是 $server->start() 之后的代码不会执行
  74.     go($createOrder);
  75.  
  76.     // 路由处理器
  77.     $orderHandler = function ($rq, $res) use ($rqChannel) {
  78.         $chan = new Swoole\Coroutine\Channel(1);
  79.         // 使用timeout参数模拟超时
  80.         $bool = $rqChannel->push([$rq->post, $chan], MAX_TIMEOUT);
  81.         if (!$bool) {
  82.             // 关闭$chan
  83.             $chan->close();
  84.             $res->end('timeout');
  85.         }
  86.         if (!empty($data = $chan->pop())) {
  87.             // 关闭$chan
  88.             $chan->close();
  89.             // 区分成功或失败状态再输出响应
  90.             if ($data['state'] === 1) {
  91.                 $res->end(microtime());
  92.             } else {
  93.                 $res->end('error');
  94.             }
  95.         }
  96.     };
  97.  
  98.     $server = new Co\Http\Server("0.0.0.0", 9502, false);
  99.  
  100.     $server->handle('/order/create', $orderHandler);
  101.     // 当前协程容器的终点
  102.     $server->start();
  103. });

代码整体上还是很容易理解的,变量 $rqQueue 就是类比电梯,暂留请求等待一定时间的次数 $times 就是类比电梯需要等待人流依次进入。当然最在希望读者注意的一点是:在协程环境下,不要使用共享内存而通信,应该使用通信来共享内存


本文网址:https://www.zztuku.com/detail-12033.html
站长图库 - 高并发实例分享:Swoole通过聚合请求高效实现业务
申明:如有侵犯,请 联系我们 删除。

评论(0)条

您还没有登录,请 登录 后发表评论!

提示:请勿发布广告垃圾评论,否则封号处理!!

    编辑推荐