基于swoole的swoolefy实现类似go的waitGroup多并发协程调度

WindsorSaxon 发布于1年前

swoolefy是一个基于swoole实现的轻量级高性能的常驻内存型的API和Web应用服务框架, 高度封装了http,websocket,udp服务器,以及基于tcp实现可扩展的rpc服务, 同时支持composer包方式安装部署项目。基于实用,swoolefy抽象Event事件处理类, 实现与底层的回调的解耦,支持协程调度,同步|异步调用,全局事件注册,心跳检查,异步任务,多进程(池)等, 内置view、log、session、mysql、redis、mongodb等常用组件等。

目前swoolefy4.2+版本完全支持swoole4.2.13+的协程,推荐使用swoole4.3+

GitHub:https://github.com/bingcool/s...

下面主要讲解一下如何实现了类似go的waitGroup的功能
1、定义GoWaitGroup的类:

<?php
/**
+----------------------------------------------------------------------
| swoolefy framework bases on swoole extension development, we can use it easily!
+----------------------------------------------------------------------
| Licensed ( https://opensource.org/licenses/MIT )
+----------------------------------------------------------------------
| Author: bingcool <bingcoolhuang@gmail.com || 2437667702@qq.com>
+----------------------------------------------------------------------
 */

namespace Swoolefy\Core;

use Swoole\Coroutine\Channel;

class GoWaitGroup {
    /**
     * @var int
     */
    private $count = 0;

    /**
     * @var Channel
     */
    private $chan;

    /**
     * @var array
     */
    private $result = [];

    /**
     * WaitGroup constructor
     */
    public function __construct() {
        $this->chan = new Channel;
    }

    /**
     * add
     */
    public function go(\Closure $go_func = null) {
        $this->count++;
        if($go_func instanceof \Closure) {
            go($go_func);
        }
    }

    /**
     * start
     */
    public function start() {
        $this->count++;
        return $this->count;
    }

    /**
     * done
     */
    public function done(string $key, $data = null) {
        if(!empty($data)) {
            $this->result[$key] = $data;
        }
        $this->chan->push(1);
    }

    /**
     * wait
     */
    public function wait() {
        while($this->count--) {
            $this->chan->pop();
        }
        $result = $this->result;
        $this->result = [];
        $this->count = 0;
        return $result;
    }

}

2、在swoolefy中调用

class GroupController extends BController {
    public function waitgroup() {
        // 创建一个waitGroup实例
        $wg = new \Swoolefy\Core\GoWaitGroup();
         
         // 第一种方式,直接$wg->go()函数中执行go的协程函数
        $wg->go(function() use ($wg) {
            // 挂起协程
            $fp = stream_socket_client("tcp://www.baidu.com:80", $errno, $errstr, 30);
            // 协程返回的数据
            $wg->done('mysql', 'mysql');
        });

        $wg->go(function() use ($wg) {
            sleep(1);
            $wg->done('tengxun', 'weixin and qq');
        });

        //挂起当前协程,等待所有任务完成后恢复
        //$result = $wg->wait();
        //这里 $result 包含了 1 个任务执行结果
        //var_dump($result);
        
        //第二种方式,添加$wg->start(),启动协程,然后使用swoole的原生go执行协程函数
        $wg->start();
        go(function () use ($wg) {
            // 挂起协程
            sleep(1);
            $wg->done('taobao', 'ali baba');
        });
        
         //第二种方式,添加$wg->start(),启动协程,然后使用swoole的原生go执行协程函数
        $wg->start();
        go(function () use ($wg) {
            // 挂起协程
            sleep(1);
            $wg->done('baidu', 'baidu');
        });
        // 以上三个协程将会并发调用,wait()函数实现等待三个协程数据返回
        //挂起当前协程,让出cpu控制权,cpu可以做其他的事情,直到待所有任务完成后恢复
        $result = $wg->wait();
        //这里 $result 包含了 2 个任务执行结果
        var_dump($result);
    }
}

至此一个最简单的并发调用就完成了,你可以愉快使用gowaitGroup的协程调用了

查看原文: 基于swoole的swoolefy实现类似go的waitGroup多并发协程调度

  • crazylion