基于 Asio 的 C++ 网络编程

whitekoala 发布于2年前 阅读14124次
0 条评论

 

概述

近期学习 Boost.Asio,依葫芦画瓢,写了不少例子,对这个「轻量级」的网络库算是有了一定理解。但是秉着理论与实践结合的态度,决定写一篇教程,把脑子里一知半解的东西,试图说清楚。

Asio,即「异步 IO」(Asynchronous Input/Output),本是一个 独立的 C++ 网络程序库 ,似乎并不为人所知,后来因为被 Boost 相中,才声名鹊起。

从设计上来看,Asio 相似且重度依赖于 Boost,与 thread、bind、smart pointers 等结合时,体验顺滑。从使用上来看,依然是重组合而轻继承,一贯的 C++ 标准库风格。

什么是「异步 IO」?

简单来说,就是你发起一个 IO 操作,却不用等它结束,你可以继续做其他事情,当它结束时,你会得到通知。

当然这种表述是不精确的,操作系统并没有直接提供这样的机制。以 Unix 为例,有五种 IO 模型可用:

  • 阻塞 I/O

  • 非阻塞 I/O

  • I/O 多路复用(multiplexing)( selectpoll

  • 信号驱动 I/O( SIGIO

  • 异步 I/O(POSIX aio_ 系列函数)

这五种模型的定义和比较,详见「Unix Network Programming, Volume 1: The Sockets Networking API」一书 6.2 节,或者可参考这篇笔记。

Asio 封装的正是「I/O 多路复用」。具体一点, epoll 之于 Linux, kqueue 之于 Mac 和 BSD。 epollkqueueselectpoll 更高效。当然在 Windows 上封装的则是 IOCP(完成端口)。

Asio 的「I/O 操作」,主要还是指「网络 IO」,比如 socket 读写。由于网络传输的特性,「网络 IO」相对比较费时,设计良好的服务器,不可能同步等待一个 IO 操作的结束,这太浪费 CPU 了。

对于普通的「文件 IO」,操作系统并没有提供“异步”读写机制,libuv 的做法是用线程模拟异步,为网络和文件提供了一致的接口。Asio 并没有这样做,它专注于网络。提供机制而不是策略,这很符合 C++ 哲学。

下面以示例,由浅到深,由简单到复杂,逐一介绍 Asio 的用法。

注:简明起见, include 语句一概从略。

I/O Service

每个 Asio 程序都至少有一个 io_service 对象,它代表了操作系统的 I/O 服务,并把你的程序和这些服务链接起来。

下面这个程序空有 io_service 对象,却没有任何异步操作,所以它其实什么也没做,也没有任何输出。

int main() {
  boost::asio::io_service io_service;
  io_service.run();
  return 0;
}

io_service.run 是一个阻塞调用,姑且把它想象成一个 loop,直到所有异步操作完成后,loop 才结束, run 才返回。 run 的返回值正是被执行的异步操作的个数。

Timer

有了 io_service 还不足以完成 I/O 操作,用户一般也不跟 io_service 直接交互。

根据 I/O 操作的不同,Asio 提供了不同的 I/O 对象,比如 timer(定时器),socket,等等。Timer 可以用来实现 watcher,是最简单的一种 I/O 对象。

void SayHello(const boost::system::error_code&) {
  std::cout << "Hello, world!" << std::endl;
}

int main() {
  boost::asio::io_service io_service;

  boost::asio::deadline_timer timer(io_service, boost::posix_time::seconds(3));
  timer.async_wait(&SayHello);

  io_service.run();

  return 0;
}

此例创建了一个 deadline_timer ,指定时间 3 秒,然后异步等待这个 timer,3 秒后,timer 超时结束, SayHello 被调用。

以下几点需要注意:

  • 所有 I/O 对象都依赖 io_service ,由构造时指定。

  • async_wait 初始化了一个异步操作,但是这个异步操作的执行,要等到 io_service.run 时才开始。

  • Timer 除了异步等待( async_wait ),还可以同步等待( wait )。同步等待是阻塞的,直到 timer 超时结束。基本上所有 I/O 对象的操作都有同步和异步两个版本(出于设计上的完整性),我们主要考虑异步操作。

  • async_wait 的参数是一个函数对象,异步操作完成时它会被调用,所以也叫 completion handler,简称 handler,可以作回调函数理解。

  • 所有 I/O 对象的 async_xyz 函数都有 handler 参数,对于 handler 的签名,不同的异步操作有不同的要求,官方文档里有明确说明。

如果 Handler 的签名与需求不一致,就得用 bind 绑定额外参数。不妨修改一下 SayHello ,让它每隔一秒打印一次 Hello, world! ,直到 count 递减至 0

void SayHello(const boost::system::error_code&,
              boost::asio::deadline_timer* timer,
              int* count) {
  std::cout << "Hello, world!" << std::endl;
  if (--(*count) > 0) {
    timer->expires_at(timer->expires_at() + boost::posix_time::seconds(1));
    timer->async_wait(boost::bind(SayHello, _1, timer, count));
  }
}

与前版相比, SayHello 多了两个参数,以便访问当前计数及重启 timer。

int main() {
  boost::asio::io_service io_service;
  boost::asio::deadline_timer timer(io_service, boost::posix_time::seconds(1));
  int count = 3;
  timer.async_wait(boost::bind(SayHello, _1, &timer, &count));
  io_service.run();
  return 0;
}

调用 bind 时,使用了占位符(placeholder) _1 ,占位符共有 9 个, _1 - _9 。官方文档里用的是 boost::asio::placeholders::error ,但是 _1 更为简洁。

Echo Server

Socket 也是一种 I/O 对象,这一点前面已经提及。相比于 timer,socket 更为常用,毕竟这是一个网络程序库。

下面以经典的 echo 程序为例,实现一个 TCP server。所谓 echo,就是 server 把 client 发来的内容原封不动发回给 client。

先从同步方式开始,异步太复杂,慢慢来。

同步方式

using boost::asio::ip::tcp;

int main() {
  boost::asio::io_service io_service;

  tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 2016));

  boost::array<char, 1024> data;

  while (true) {
    tcp::socket socket(io_service);
    acceptor.accept(socket);

    boost::system::error_code ec;
    size_t bytes = socket.read_some(boost::asio::buffer(data), ec);
    if (!ec && bytes > 0) {
      boost::asio::write(socket, boost::asio::buffer(data, bytes), ec);
    }
  }

  return 0;
}

先用 netcat 测试一下:

$ nc localhost 2016
hello
hello

以下几点需要注意:

  • tcp::acceptor 也是一个 I/O 对象,用来接收一个 TCP 连接,端口号由 tcp::endpoint 指定。

  • 数据 buffer 以 boost::array<char, 1024> data 表示,也可以用 char data[1024] ,或 std::vector<char> data(1024) 。事实上,用 std::vector 是最推荐的,因为它可以支持 Buffer Debugging

  • 同步方式下,没有调用 io_service.run ,因为 acceptread_somewrite 都是阻塞的。这也意味着一次只能处理一个 client 连接,并且 echo 完了连接也就断了(socket 对象被销毁)。

  • 写回数据时,没有直接调用 socket.write_some ,因为它不能保证一次写完所有数据,但是 boost::asio::write 可以。我觉得这是 Asio 接口设计不周,应该提供 socket.write

异步方式

异步方式下,麻烦在于对象的生命周期,解决方法也不难,用 boost::shared_ptr

为了同时处理多个 client 连接,需要保留每个连接的 socket,不能再像前面那样,echo 完就把 socket 销毁了。于此抽象出一个表示连接的类,叫 Connection

class Connection {
public:
  Connection(boost::asio::io_service& io_service)
      : socket_(io_service) {
  }

private:
  tcp::socket socket_;
  boost::array<char, 1024> data_;
};

Connection 有两个成员变量, socket_ 与 client 通信, data_ 是接收 client 数据的缓存。只要 Connection 对象在,socket 就在,连接就不断。

Socket 需要 io_service ,通过 Connection 的构造函数传进来。

下面为 Connection 添加 echo 的逻辑:

class Connection {
public:
  void Start() {
    auto handler = boost::bind(&Connection::HandleRead, this, _1, _2);
    socket_.async_read_some(boost::asio::buffer(data_), handler);
  }

private:
  void HandleRead(const boost::system::error_code& ec,
                  size_t bytes_transferred) {
    if (ec) {
      return;
    }

    auto handler = boost::bind(&Connection::HandleWrite, this, _1);

    boost::asio::async_write(socket_,
                             boost::asio::buffer(data_, bytes_transferred),
                             handler);
  }

  void HandleWrite(const boost::system::error_code& ec) {
  }
};

基本上就是把 read_some 换成 async_read_some ,把 write 换成 async_write ,对应的 Completion Handler 为 HandleReadHandleWrite

接收 Client 连接的代码,提取出来,抽象成一个类 Server:

class Server {
public:
  Server(boost::asio::io_service& io_service, short port)
      : io_service_(io_service)
      , acceptor_(io_service, tcp::endpoint(tcp::v4(), port)) {
    StartAccept();
  }

private:
  void StartAccept() {
    boost::shared_ptr<Connection> conn(new Connection(io_service_));

    auto handler = boost::bind(&Server::HandleAccept, this, conn, _1);
    acceptor_.async_accept(conn->socket(), handler);
  }

  void HandleAccept(boost::shared_ptr<Connection> conn,
                    const boost::system::error_code& ec) {
    if (!ec) {
      conn->Start();
    }
    StartAccept();
  }

private:
  boost::asio::io_service& io_service_;
  tcp::acceptor acceptor_;
};

同样, async_accept 替换了 acceptasync_accept 不再阻塞, StartAccept 即刻就会返回,为了保证连接继续存在,使用 boost::shared_ptr<Connection> 代替普通的栈对象,也不要直接 new 出来,因为根本不知道在什么时候 delete

但是上面的代码其实还有问题, conn->Start 之后, conn 也就销毁了,因为没有其他地方再引用它,引用计数为 0。问题出在 Connection::Start :

void Start() {
  auto handler = boost::bind(&Connection::HandleRead, this, _1, _2);
  socket_.async_read_some(boost::asio::buffer(data_), handler);
}

此处, bind 应该传入 boost::shared_ptr<Connection> 而不是 this ,但是这里根本没办法知道 this 对象是不是被 boost::shared_ptr 所管理,即使知道也没办法访问。除非像下面这样:

class Connection {
public:
  void set_weak_this(boost::shared_ptr<Connection> shared_this) {
    weak_this_ = shared_this;
  }

  boost::shared_ptr<Connection> shared_from_this() {
    if (!weak_this_.expired()) {
      return weak_this_.lock();
    }
    return boost::shared_ptr<Connection>();
  }

private:
  boost::weak_ptr<Connection> weak_this_;
};

添加一个成员变量 weak_this_ ,通过 set_weak_this 手动设置,必要时通过 shared_from_this 转成 boost::shared_ptr<Connection>

class Server {
private:
  void StartAccept() {
    boost::shared_ptr<Connection> conn(new Connection(io_service_));
    conn->set_weak_this(conn);
    ...
  }
};
auto handler = boost::bind(&Connection::HandleRead, shared_from_this(), _1, _2);
auto handler = boost::bind(&Connection::HandleWrite, shared_from_this(), _1);

手动写这些代码太麻烦了,好在有 boost::enable_shared_from_this 可用,只要让 Connection 继承自它就可以了:

class Connection
  : public boost::enable_shared_from_this<Connection> {
};

至此,异步方式下对象生命周期的问题就算解决了。

关于 Connection ,值得改进的地方是,在 HandleWrite (目前为空)中继续调用 async_read_some ,这样跟 client 的连接就不会在一次 echo 后断掉,可以连续 echo。

Echo Client

虽然用 netcat 测试 Echo Server 非常方便,但是自己动手写一个 Echo Client 仍然十分必要。老规矩,先考虑同步方式。

同步方式

首先定义 io_service 对象:

boost::asio::io_service io_service;

然后通过 hostport 解析出 endpoint(s):

tcp::resolver resolver(io_service);
tcp::resolver::query query(tcp::v4(), "localhost", "2016");
tcp::resolver::iterator it = resolver.resolve(query);

简单起见, hostport 都是 hardcode,可以改成由命令行参数指定。

接着创建 socket,建立连接:

tcp::socket socket(io_service);
boost::asio::connect(socket, it);

这里没有直接调用 socket.connect ,因为 it 可能指向多个 endpoints, boost::asio::connect 会挨个尝试,逐一调用 socket.connect 直到成功连接到某个 endpoint。

从标准输入读一行数据,并发给 server:

char req_buf[MAX_SIZE];
std::cin.getline(req_buf, MAX_SIZE);
size_t req_len = strlen(req_buf);
boost::asio::write(socket, boost::asio::buffer(req_buf, req_len));

从 server 接收返回的数据并输出:

boost::array<char, MAX_SIZE> res_buf;
size_t res_len = socket.read_some(boost::asio::buffer(res_buf));
std::cout.write(res_buf.data(), res_len);
std::cout << std::endl;

发送和接收的代码可以放在循环里。

值得注意的是,我们并没有为各函数指定输出参数 boost::system::error_code 。代码块没有拆分,紧挨在一起,使用异常似乎更易于错误处理。

try {
  // ...
} catch (std::exception& e) {
  std::cerr << e.what() << std::endl;
}

Asio 里的函数,基本上都有重载,同时提供 error_code 和异常两个版本。

异步方式下,使用 error_code 则更方便一些。

异步方式

首先,就 client 来说,异步也许并非必要,除非想同时连接多个 server。

其次,异步读写前面都已经演示过,这里不再赘述。剩下 async_resolveasync_connect ,先看 async_connect

打着面向对象的幌子,我们把 client 封装成一个类:

class Client {
public:
  Client(boost::asio::io_service& io_service)
      : socket_(io_service) {
  }

  void Connect(const std::string& host, const std::string& port) {
    // TODO
  }

private:
  tcp::socket socket_;
};

用法如下:

int main() {
  boost::asio::io_service io_service;
  Client client(io_service);
  client.Connect("localhost", "2016");
  io_service.run();
  return 0;
}

async_connect 及其 handler:

auto handler = boost::bind(&Client::HandleConnect, this, _1);
socket_.async_connect(*it, handler);
void HandleConnect(const boost::system::error_code& ec) {
  if (ec) {
    return;
  }

  DoEcho();
}

DoEcho 从标准输入读一行数据,发给 server,再接收 server 发回的数据并输出。从略。

async_resolve 就比较麻烦了,下面这样是不够的:

void Connect(const std::string& host, const std::string& port) {
  tcp::resolver resolver(socket_.get_io_service());
  tcp::resolver::query query(tcp::v4(), host, port);
    
  auto handler = boost::bind(&Client::HandleResolve, this, _1, _2);
  resolver.async_resolve(query, handler);  // Oops!
}

前面已经说明,异步方式难在对象的生命周期,这里 resolverquery 都有这个问题。等到 async_resolve 所初始化的异步操作执行时,这两个栈对象已经销毁了。

应该把它们作为 Client 的成员变量:

boost::scoped_ptr<tcp::resolver> resolver_;
boost::scoped_ptr<tcp::resolver::query> query_;
void Connect(const std::string& host, const std::string& port) {
  resolver_.reset(new tcp::resolver(socket_.get_io_service()));
  query_.reset(new tcp::resolver::query(tcp::v4(), host, port));

  auto handler = boost::bind(&Client::HandleResolve, this, _1, _2);
  resolver_->async_resolve(*query_, handler);
}

但是这样就意味着一个 Client 对象同时只能 resolve 一个 host + port ,所以 Connect 函数也没必要提供了(至少不应公开)。于是最终得到下面这个版本:

class Client {
public:
  Client(boost::asio::io_service& io_service,
         const std::string& host,
         const std::string& port)
      : socket_(io_service) {
    resolver_.reset(new tcp::resolver(socket_.get_io_service()));
    query_.reset(new tcp::resolver::query(tcp::v4(), host, port));

    auto handler = boost::bind(&Client::HandleResolve, this, _1, _2);
    resolver_->async_resolve(*query_, handler);
  }

private:
  void HandleResolve(const boost::system::error_code& ec,
                     tcp::resolver::iterator it) {
    if (!ec) {
      auto handler = boost::bind(&Client::HandleConnect, this, _1);
      socket_.async_connect(*it, handler);
    }
  }

  void HandleConnect(const boost::system::error_code& ec) {
    if (!ec) {
      DoEcho();
    }
  }

private:
  tcp::socket socket_;
  boost::scoped_ptr<tcp::resolver> resolver_;
  boost::scoped_ptr<tcp::resolver::query> query_;
};

至此,异步方式的 echo client 就算实现了。

为了避免文章太长,Asio 的介绍暂时先告一段落。后续若有补遗,自当另成篇章记录。

完整的示例代码位于 GitHub,请移步: https://github.com/sprinfall/...

需要 登录 后回复方可回复, 如果你还没有账号你可以 注册 一个帐号。