Python3中的asyncio

blackgoose 发布于2年前
0 条问题

最近想换换口味,在用 asyncio 写一个小东西,过程中碰到各种概念上、实践上的问题,悄悄 记在这里XD.

所谓的“异步”

回归到最初的定义的话,“异步”是指不同硬件之间可以工作在 不同的时钟信号下——试想一下要求所有硬件工作在相同时钟信 号下的系统该有多脆弱。所以同步总线通常出现在与系统本身 工作时钟接近的硬件接口上,而异步总线正好相反,用来连接 远远达不到系统工作频率的硬件。

这些概念投射到同根同源的软件上的话,由于软件命令最终都 是由硬件去执行的,所以软件的操作也有快有慢,“异步操作” 的重点就是协调“快”的操作和“慢”的操作。然后众所周知,最 慢的操作是IO.

与PC硬件里琳琅满目的总线速度不同,软件里一般只有两种速 度:指令执行速度和IO速度——像Erlang计算reduction的时候 也只考虑指令执行时间和IO时间,所有的指令和IO类型都是一 视同仁的。这是为神马呢?大概是因为人脑跟不上电脑吧……

“正确”的异步操作

网上到处都是这样的例子:

 
import socket
import multiprocessing

def handler(conn, addr):
    message = conn.recv(1024)
    while message:
        conn.send(message)
    conn.close()

def server(host, port):
    listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    listener.bind((host, port))
    listener.listen(10)

    while True:
        conn, addr = listener.accept()
        process = multiprocessing.Process(target=handler, args=(conn, addr))
        process.start()

这是异步操作没错,用另一个进程来处理请求,只是粒度略大, 因为这只是将快的进程(handler进程)和慢的进程(server进 程)分开了而已,无论在handler还是server里都要等待IO. 而 且我在#python上贴出 这个程序的时候马上有一堆人出来说服务器不应该这样写之类 的XD.

于是武林中就有了下面这种模式:

 
import socket
import selectors

def handler(conn):
    message = conn.recv(1024)
    conn.send(message)

def server(host, port):
    listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    listener.bind((host, port))
    listener.listen(10)

    selector = selectors.DefaultSelector()
    selector.register(listener, selectors.EVENT_READ)

    while True:
        event_list = selector.select()
        for key, events in event_list:
            conn = key.fileobj
            if conn == listener:
                new_conn, addr = conn.accept()
                selector.register(new_conn, selectors.EVENT_READ)
            else:
                handler(conn)

我将这个模式称为“在忙完指令之后等待IO”,坊间说的“异步IO” 一般也是特指这种两段式模式:一段等待IO,一段执行指令。别 看上面的程序很简单,很大一部分高性能的服务器或者框架(像 Tornado、Twisted和Erlang)都是基于这个模式的,因为它的资 源消耗比基于多进程/多线程的服务器实在是少太多了(参考Nginx 和Apache的对比),所以扩展性(scalability)也好太多了。

asyncio模块

asyncio是在Python 3.4中添加的新模块,实现了上面的“忙完指 令之后等待IO”模式。

这世上已经有好多异步框架和库了,Guido老爷子为什么要推行这 样一个新模块?他在 PEP 里说的原因是,这些第三方异步代码相互之间不兼容不能移植 blahblah……我倒觉得是Twisted的camelCaseNaming不合老爷子胃 口而已……

还是上面的echo server例子,用外星科技实现:

 
import asyncio

class EchoProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        self.transport = transport

    def connection_lost(self, exc):
        self.transport.close()

    def data_received(self, data):
        self.transport.write(data)

def server(host, port):
    loop = asyncio.get_event_loop()
    srv = loop.create_server(EchoProtocol, host, port)
    asyncio.async(srv)
    loop.run_forever()

熟悉Twisted的同学应该会有deja vu的感觉,我们提供一个接收 事件的对象(实际上是产生对象的factory),然后事件就源源不 断地自动出现了。

但是这看起来和之前selector的例子不像啊?提示:真正的循环 在 BaseEventLoop的_run_once方法里 :)

实践中使用asyncio的要点

好了上面的例子很美好,但其实只展示了asyncio一小部分的威力, 下面来一些私人干货。

在事件处理方法中创建TCP连接

由于实际上asyncio是在事件循环中调用asyncio.Protocol类 (或者子类)的data_received等方法的,这些事件处理方法 如果阻塞的话,会将整个事件循环也阻塞住,失去了所有“异 步IO”模式带来的好处,所以所有的事件处理方法——包括 data_received、connection_made、connection_lost等——都 不能调用任何可能阻塞的函数,包括socket对象的recv方法、 文件对象的read方法等,当然socket的connect方法由于域名 解析和网络延迟等也是会阻塞的……那我们要怎么从事件处理 方法里做connect操作呢?

答案是asyncio提供了一系列异步操作的、不会阻塞的接口, 当然也包括“创建TCP连接”。这些接口全部以coroutine的形 式提供,调用时要使用 yield from 语法。例如可以这样搭 建一个简单的代理服务器:

 
HOST = 'www.google.com'
PORT = '80'

@asyncio.coroutine
def new_connection(host, port):
    loop = asyncio.get_event_loop()
    client_transport, client_proto = yield from \
        loop.create_connection(ClientProtocol, host, port)
    return client_transport, client_proto

class ClientProtocol(asyncio.Protocol):
    ....

class ServerProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        self.transport = transport
        self.client_task = asyncio.Task(new_connection(HOST, PORT))
        self.client_task.add_done_callback(self.client_connect_done)
        ....

    def client_connect_done(self, future):
        client_transport, client_proto = future.result()
        ....

def server(host, port):
    loop = asyncio.get_event_loop()
    srv = loop.create_server(ProxyProtocol, host, port)
    asyncio.async(srv)
    loop.run_forever()

ServerProtocol在收到一个新连接(connection_made)的时候用 asyncio.Task调度一个创建新连接的异步函数,这个函数会由asyncio 的事件循环在connection_made返回后择机执行,执行完成后事 件循环再去调用通过add_done_callback注册的处理函数 (client_connect_done),满满的javascript既视感啊有木有。

为什么yield from不能直接写在connection_made里,而需要另外 封装一个函数呢?因为asyncio的事件循环认定了Protocol对象的 事件处理方法是普通函数,如果yield from直接出现在 connection_made中的话,事件循环调用connection_made的时候 只会返回一个generator,connection_made的函数体完全不会被 执行,所以在事件处理方法中只能通过调度Task(或者使用 asyncio.async(...),效果一样)的方式执行异步操作。

替换事件循环使用的selector

Python 3.4中还有一个和asyncio配套的新模块:selectors. 这个模块将select、epoll、kqueue等等系统级异步IO接口抽象 成“selector”类型,规定了统一的对外接口,于是程序只管使 用selector的接口就行了,不用管底层的实现到底是select还 是epoll.

asyncio中用的就是selector模块, asyncio.selector_events.BaseSelectorEventLoop类的构造 函数有一个selector参数,通常使用默认值就可以了,但是 当然我们也可以把它给换成我们自己的类。比方说如果我们希 望事件循环能支持 ZeroMQ 的socket, 可以把selector的底层实现换成zmq.Poller():

 
class ZmqSelector(selectors._BaseSelectorImpl):
    def __init__(self, poller=None):
        super().__init__()
        if poller is not None:
            self._zmq_poller = poller
        else:
            self._zmq_poller = zmq.Poller()

    def _fileobj_lookup(self, fileobj):
        if isinstance(fileobj, zmq.Socket):
            return fileobj
        else:
            return super()._fileobj_lookup(fileobj)

    def register(self, fileobj, events, data=None):
        key = super().register(fileobj, events, data)
        flags = 0
        if events & selectors.EVENT_READ:
            flags |= zmq.POLLIN
        if events & selectors.EVENT_WRITE:
            flags |= zmq.POLLOUT
        self._zmq_poller.register(fileobj, flags)
        return key

    def unregister(self, fileobj):
        key = super().unregister(fileobj)
        self._zmq_poller.unregister(fileobj)
        return key

    def select(self, timeout=None):
        if timeout