FutureTask与fork

在多线程代码中我们经常会遇到这种模型,将一个耗时任务, new一个新的Thread或者通常放到线程池后台执行,当前线程执行另外任务,之后通过某个api接口阻塞获取后台任务结果。

Java童鞋应该对这个概念非常熟悉——JDK给予直接支持的Future。

同样的模型我们可以利用channel对多个协程进行同步来实现,代码很简单:

<?php

go(function() {
    $start = microtime(true);

    $ch = chan();

    // 开启一个新的协程,异步执行耗时任务
    spawn(function() use($ch) {
        yield delay(1000);
        yield $ch->send(42); // 通知+传送结果
    });

    yield delay(500);
    $r = (yield $ch->recv()); // 阻塞等待结果
    echo $r; // 42

    // 我们这里两个耗时任务并发执行,总耗时约1000ms
    echo "cost ", microtime(true) - $start, "\n";
});

事实上我们也很容易把Future模型移植过来:

<?php
final class FutureTask {
    const PENDING = 1;
    const DONE = 2;
    private $cc;

    public $state;
    public $result;
    public $ex;

    public function __construct(\Generator $gen) {
        $this->state = self::PENDING;

        $asyncTask = new AsyncTask($gen);
        $asyncTask->begin(function($r, $ex = null) {
            $this->state = self::DONE;
            if ($cc = $this->cc) {
                // 有cc,说明有call get方法挂起协程,在此处唤醒
                $cc($r, $ex);
            } else {
                // 无挂起,暂存执行结果
                $this->result = $r;
                $this->ex = $ex;
            }
        });
    }

    public function get() {
        return callcc(function($cc) use($timeout) {
            if ($this->state === self::DONE) {
                // 获取结果时,任务已经完成,同步返回结果
                // 这里也可以考虑用defer实现,异步返回结果,首先先释放php栈,降低内存使用
                $cc($this->result, $this->ex);
            } else {
                // 获取结果时未完成,保存$cc,挂起等待
                $this->cc = $cc;
            }
        });
    }
}


// helper
function fork($task, ...$args) {
    $task = await($task); // 将task转换为生成器
    yield new FutureTask($task);
}

还是刚才那个例子, 我们改写为FutureTask版本:

<?php
go(function() {
    $start = microtime(true);

    // fork 子协程执行耗时任务
    /** @var $future FutureTask */
    $future = (yield fork(function() {
        yield delay(1000);
        yield 42;
    }));

    yield delay(500);

    // 阻塞等待结果
    $r = (yield $future->get());
    echo $r; // 42

    // 总耗时仍旧只有1000ms
    echo "cost ", microtime(true) - $start, "\n";
});

再进一步,我们扩充FutureTask的状态,阻塞获取结果加入超时选项:

<?php

final class FutureTask {
    const PENDING = 1;
    const DONE = 2;
    const TIMEOUT = 3;

    private $timerId;
    private $cc;

    public $state;
    public $result;
    public $ex;

    // 我们这里加入新参数,用来链接futureTask与caller父任务
    // 这样的好处比如可以共享父子任务上下文
    public function __construct(\Generator $gen, AsyncTask $parent = null) {
        $this->state = self::PENDING;

        if ($parent) {
            $asyncTask = new AsyncTask($gen, $parent);
        } else {
            $asyncTask = new AsyncTask($gen);
        }

        $asyncTask->begin(function($r, $ex = null) {
            // PENDING or TIMEOUT
            if ($this->state === self::TIMEOUT) {
                return;
            }

            // PENDING -> DONE
            $this->state = self::DONE;

            if ($cc = $this->cc) {
                if ($this->timerId) {
                    swoole_timer_clear($this->timerId);
                }
                $cc($r, $ex);
            } else {
                $this->result = $r;
                $this->ex = $ex;
            }
        });
    }

    // 这里超时时间0为永远阻塞,
    // 否则超时未获取到结果,将向父任务传递超时异常
    public function get($timeout = 0) {
        return callcc(function($cc) use($timeout) {
            // PENDING or DONE
            if ($this->state === self::DONE) {
                $cc($this->result, $this->ex);
            } else {
                // 获取结果时未完成,保存$cc,开启定时器(如果需要),挂起等待
                $this->cc = $cc;
                $this->getResultTimeout($timeout);
            }
        });
    }

    private function getResultTimeout($timeout) {
        if (!$timeout) {
            return;
        }

        $this->timerId = swoole_timer_after($timeout, function() {
            assert($this->state === self::PENDING);
            $this->state = self::TIMEOUT;
            $cc = $this->cc;
            $cc(null, new AsyncTimeoutException());
        });
    }
}

因为引入parentTask参数,需要将父任务隐式传参,而我们执行通过Syscall与执行当前生成器的父任务交互,所以我们重写fork辅助函数,改用Syscall实现:

<?php

/** * @param $task * @return Syscall */
function fork($task) {
    $task = await($task);
    return new Syscall(function(AsyncTask $parent) use($task) {
        return new FutureTask($task, $parent);
    });
}

下面看一些关于超时的示例:

<?php

go(function() {
    $start = microtime(true);

    /** @var $future FutureTask */
    $future = (yield fork(function() {
        yield delay(500);
        yield 42;
    }));

    // 阻塞等待超时,捕获到超时异常
    try {
        $r = (yield $future->get(100));
        var_dump($r);
    } catch (\Exception $ex) {
        echo "get result timeout\n";
    }

    yield delay(1000);

    // 因为我们只等待子任务100ms,我们的总耗时只有 1100ms
    echo "cost ", microtime(true) - $start, "\n";
});

go(function() {
    $start = microtime(true);

    /** @var $future FutureTask */
    $future = (yield fork(function() {
        yield delay(500);
        yield 42;
        throw new \Exception();
    }));

    yield delay(1000);

    // 子任务500ms前发生异常,已经处于完成状态
    // 我们调用get会当即引发异常
    try {
        $r = (yield $future->get());
        var_dump($r);
    } catch (\Exception $ex) {
        echo "something wrong in child task\n";
    }

    // 因为耗时任务并发执行,这里总耗时仅1000ms
    echo "cost ", microtime(true) - $start, "\n";
});
powered by Gitbook 该教程制作时间: 2017-04-12 13:42:37