Threads and Boost.Asio

本文最后更新于:December 1, 2021 am

介绍了Boost.Asio下的线程模型

主要参考 Threads and Boost.Asio - 1.72.0

Thread

Thread Safety

In general, it is safe to make concurrent use of distinct objects, but unsafe to make concurrent use of a single object. However, types such as io_context provide a stronger guarantee that it is safe to use a single object concurrently.

一般情况下,并发操作不同的对象是线程安全的,而不加限制地并发操作同一个对象是不安全的。然而,例如io_context一类的类型,能在并发上提供一个更强的保证,程序员可以认为并发地操作一个单一的io_context对象是安全的。

Thread Pools

多个线程往往会调用io_context::run()来设置一个线程池,并且从它调用事件完成回调completion handler。 也可以使用post()来达到线程池内随机计算任务的效果。

需要注意的是,所有对io_context的线程池执行过join的线程都被视为等同的,io_context会用随机的方式来分派工作给这些线程。

Internal Threads

对于一个特定平台的库,实现上可能会使用一个或者多个内部线程来模拟异步性。 库会尽可能地使得这些线程对于使用者不可见。

这些线程尤其需要注意:

  • 禁止直接调用用户的代码
  • 必须阻塞所有的信号

通过保证 异步完成回调Asynchronous Completion Handler只会被正在调用io_context::run()的线程调用,进一步保证上述两点成立。

Consequently, it is the library user’s responsibility to create and manage all threads to which the notifications will be delivered.

因此,创建和管理所有会接收通知的线程就成了库使用者的职责。

这一实现方案的原因,包括以下几点:

  • 通过只从单线程中调用io_context::run(),用户代码可以避免管理线程之间同步上的复杂性。

    举例来说,一个使用者可以实现一个大小可以估计的”单线程“的服务器,尽管这里的单线程是从用户的角度来说。

  • 线程启动后,使用者往往需要紧接着做一些初始化操作,而这些操作应该在任何其它应用代码执行前被完成。

    举例来说,Microsoft’s COM 的使用者可能会在其它任何COM操作被当前线程调用之前,调用CoInitializeEx

  • 库的接口从线程的创建和管理中解耦,并且允许在不支持线程的平台上实现。

Strand

Strand被定义为一个严格的顺序执行的事件回调序列(没有并发调用)。使用Strand可以使多线程程序执行时不需要显式地使用锁(比如mutex)。

Strand可以是显式的,也可以是隐式的:

  • 仅仅从一个线程里调用io_context::run()表示,所有的事件回调函数都在一个隐式的strand里执行,因为io_context保证所有的回调都只在run()中被调用。
  • 当一个连接只有一条异步操作链时(半双工的协议,比如HTTP),不可能有并发的回调执行场景。这种情况下有一个隐式的Strand
  • 显式的Strand是一个strand<>或者io_context::strand实例。所有事件回调函数对象需要被绑定到这个Strand(使用boost::asio::bind_executor()),或者被posted/dispatched分派给这个Strand对象。

在组合异步操作的情况下,比如async_read()或者async_read_until(),只要完成回调completion handler经过了某个strand,那么所有的中间经过的回调也需要经过这个strand

为什么要这么做呢?考虑一个socket的情形,某线程(此处为caller)调用async_read()之后,接着会引发一个执行链(async_read()就是用组合操作的方式实现的),这个socket对象可能在多个函数中被共享。让所有的中间操作都经过同一个strand可以保证线程访问变量时的安全。

为了保证这一点,所有的异步操作都用过函数get_associated_executor来得到和回调相关联的的执行器。

比如boost::asio::associated_executor<Handler> a = boost::asio::get_associated_executor(h);

这个相关联的执行器必须满足Executor的要求。异步操作会用它来提交中间步骤的回调和最终步骤的回调。

此外,也可以为某一种回调类型定制一个执行器,这可以通过声明一个嵌套的类型executor_typeget_executor()成员函数来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class my_handler
{
public:
// Custom implementation of Executor type requirements.
typedef my_executor executor_type;

// Return a custom executor implementation.
executor_type get_executor() const noexcept
{
return my_executor();
}

void operator()() { ... }
};

在更复杂的情况下,可以直接偏特化associated_executor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
struct my_handler
{
void operator()() { ... }
};

namespace boost { namespace asio {

template <class Executor>
struct associated_executor<my_handler, Executor>
{
// Custom implementation of Executor type requirements.
typedef my_executor type;

// Return a custom executor implementation.
static type get(const my_handler&,
const Executor& = Executor()) noexcept
{
return my_executor();
}
};

} } // namespace boost::asio

boost::asio::bind_executor()函数用于帮助绑定一个特定的执行器executor对象,比如将一个strand绑定到一个completion handler,可以写为

1
2
3
4
5
6
my_socket.async_read_some(my_buffer,
boost::asio::bind_executor(my_strand,
[](error_code ec, size_t length)
{
// ...
}));

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!