之前我们主要关注的是协程与外部调用者的交互,这次我们也关注一下对等的协程之间的通信。
实现目标 Go routine 的 Channel Go routine 当中有一个重要的特性就是 Channel。我们可以向 Channel 当中写数据,也可以从中读数据。例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 channel := make (chan int ) var readChannel <-chan int = channelvar writeChannel chan <- int = channelgo func () { fmt.Println("wait for read" ) for true { i, ok := <-readChannel if ok { fmt.Println("read" , i) } else { break } } fmt.Println("read end" ) }() go func () { for i := 0 ; i < 3 ; i++{ fmt.Println("write" , i) writeChannel <- i time.Sleep(time.Second) } close (writeChannel) }()
这个例子是我写 《深入理解 Kotlin 协程》 这本书时用到过的一个非常简单的 Go routine 的例子,它的运行输出如下:
1 2 3 4 5 6 7 8 wait for read write 0 read 0 write 1 read 1 write 2 read 2 read end
Go 当中的 Channel 默认是没有 buffer 的,我们也可以通过 make chan
在初始化 Channel 的时候指定 buffer。在 buffer 已满的情况下,写入者会先挂起等待读取者后再恢复执行,反之亦然。等待的过程中,所处的协程会挂起,执行调度的线程自然也会被释放用于调度其他逻辑。
C++ 协程的 Channel 实现设计 Kotlin 协程当中也有 Channel,与 Go 的不同之处在于 Kotlin 的 Channel 其实是基于协程最基本的 API 在框架层面实现的,并非语言原生提供的能力。C++ 的协程显然也可以采用这个思路,实际上整个这一系列 C++ 协程的文章都是在介绍如何使用 C++ 20 标准当中提供的基本的协程 API 在构建更复杂的框架支持。
我们来看一下我们最终的 Channel 的用例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 Task<void , LooperExecutor> Producer (Channel<int > &channel) { int i = 0 ; while (i < 10 ) { co_await channel.write (i++); co_await (channel << i++); } channel.close (); } Task<void , LooperExecutor> Consumer (Channel<int > &channel) { while (channel.is_active ()) { try { auto received = co_await channel.read (); int received; co_await (channel >> received); } catch (std::exception &e) { debug ("exception: " , e.what ()); } } }
我们的 Channel 也可以在构造的时候传入 buffer 的大小,默认没有 buffer。
co_await 表达式的支持 想要支持 co_await
表达式,只需要为 Channel 读写函数返回的 Awaiter 类型添加相应的 await_transform
函数。我们姑且认为 read
和 write
两个函数的返回值类型 ReaderAwaiter
和 WriterAwaiter
,接下来就添加一个非常简单的 await_transform
的支持:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 template <typename ResultType, typename Executor>struct TaskPromise { ... template <typename _ValueType> auto await_transform (ReaderAwaiter<_ValueType> reader_awaiter) { reader_awaiter.executor = &executor; return reader_awaiter; } template <typename _ValueType> auto await_transform (WriterAwaiter<_ValueType> writer_awaiter) { writer_awaiter.executor = &executor; return writer_awaiter; } ... }
由于 Channel
的 buffer 和对 Channel
的读写本身会决定协程是否挂起或恢复,因此这些逻辑我们都将在 Channel
当中给出,TaskPromise
能做的就是把调度器传过去,当协程恢复时使用。
Awaiter 的实现 Awaiter 负责在挂起时将自己存入 Channel
,并且在需要时恢复协程。因此除了前面看到需要在恢复执行协程时的调度器之外,Awaiter 还需要持有 Channel
、需要读写的值。
下面是 WriterAwaiter
的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 template <typename ValueType>struct WriterAwaiter { Channel<ValueType> *channel; AbstractExecutor *executor = nullptr ; ValueType _value; std::coroutine_handle<> handle; WriterAwaiter (Channel<ValueType> *channel, ValueType value) : channel (channel), _value(value) {} bool await_ready () { return false ; } auto await_suspend (std::coroutine_handle<> coroutine_handle) { this ->handle = coroutine_handle; channel->try_push_writer (this ); } void await_resume () { channel->check_closed (); } void resume () { if (executor) { executor->execute ([this ]() { handle.resume (); }); } else { handle.resume (); } } };
相对应的,还有 ReaderAwaiter
,实现类似:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 template <typename ValueType>struct ReaderAwaiter { Channel<ValueType> *channel; AbstractExecutor *executor = nullptr ; ValueType _value; ValueType* p_value = nullptr ; std::coroutine_handle<> handle; explicit ReaderAwaiter (Channel<ValueType> *channel) : channel(channel) { } bool await_ready () { return false ; } auto await_suspend (std::coroutine_handle<> coroutine_handle) { this ->handle = coroutine_handle; channel->try_push_reader (this ); } int await_resume () { channel->check_closed (); return _value; } void resume (ValueType value) { this ->_value = value; if (p_value) { *p_value = value; } resume (); } void resume () { if (executor) { executor->execute ([this ]() { handle.resume (); }); } else { handle.resume (); } } };
简单说来,Awaiter 的功能就是:
负责用协程的调度器在需要时恢复协程 处理读写的值的传递 Channel 的实现 接下来我们给出 Channel
当中根据 buffer 的情况来处理读写两端的挂起和恢复的逻辑。
Channel 的基本结构 我们先来看一下 Channel
的基本结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 template <typename ValueType>struct Channel { ... struct ChannelClosedException : std::exception { const char *what () const noexcept override { return "Channel is closed." ; } }; void check_closed () { if (!_is_active.load (std::memory_order_relaxed)) { throw ChannelClosedException (); } } explicit Channel (int capacity = 0 ) : buffer_capacity(capacity) { _is_active.store (true , std::memory_order_relaxed); } bool is_active () { return _is_active.load (std::memory_order_relaxed); } void close () { bool expect = true ; if (_is_active.compare_exchange_strong (expect, false , std::memory_order_relaxed)) { clean_up (); } } Channel (Channel &&channel) = delete ; Channel (Channel &) = delete ; Channel &operator =(Channel &) = delete ; ~Channel () { close (); } private : int buffer_capacity; std::queue<ValueType> buffer; std::list<WriterAwaiter<ValueType> *> writer_list; std::list<ReaderAwaiter<ValueType> *> reader_list; std::atomic<bool > _is_active; std::mutex channel_lock; std::condition_variable channel_condition; void clean_up () { std::lock_guard lock (channel_lock) ; for (auto writer : writer_list) { writer->resume (); } writer_list.clear (); for (auto reader : reader_list) { reader->resume (); } reader_list.clear (); decltype (buffer) empty_buffer; std::swap (buffer, empty_buffer); } };
通过了解 Channel
的基本结构,我们已经知道了 Channel
当中存了哪些信息。接下来我们就要填之前埋下的坑了:分别是在协程当中读写值用到的 read
和 write
函数,以及在挂起协程时 Awaiter 当中调用的 try_push_writer
和 try_push_reader
。
read 和 write 这两个函数也没什么实质的功能,就是把 Awaiter 创建出来,然后填充信息再返回:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 template <typename ValueType>struct Channel { auto write (ValueType value) { check_closed (); return WriterAwaiter <ValueType>(this , value); } auto operator <<(ValueType value) { return write (value); } auto read () { check_closed (); return ReaderAwaiter <ValueType>(this ); } auto operator >>(ValueType &value_ref) { auto awaiter = read (); awaiter.p_value = &value_ref; return awaiter; } ... }
这当中除了 operator>>
的实现需要多保存一个变量的地址以外,大家只需要注意一下对于 check_closed
的调用即可,它的功能很简单:在 Channel
关闭之后调用它会抛出 ChannelClosedException
。
try_push_writer
和 try_push_reader
这是 Channel
当中最为核心的两个函数了,他们的功能正好相反。
try_push_writer
调用时,意味着有一个新的写入者挂起准备写入值到 Channel
当中,这时候有以下几种情况:
Channel
当中有挂起的读取者,写入者直接将要写入的值传给读取者,恢复读取者,恢复写入者Channel
的 buffer 没满,写入者把值写入 buffer,然后立即恢复执行。Channel
的 buffer 已满,则写入者被存入挂起列表(writer_list)等待新的读取者读取时再恢复。了解了思路之后,它的实现就不难写出了,具体如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 void try_push_writer (WriterAwaiter<ValueType> *writer_awaiter) { std::unique_lock lock (channel_lock) ; check_closed (); if (!reader_list.empty ()) { auto reader = reader_list.front (); reader_list.pop_front (); lock.unlock (); reader->resume (writer_awaiter->_value); writer_awaiter->resume (); return ; } if (buffer.size () < buffer_capacity) { buffer.push (writer_awaiter->_value); lock.unlock (); writer_awaiter->resume (); return ; } writer_list.push_back (writer_awaiter); }
相对应的,try_push_reader
调用时,意味着有一个新的读取者挂起准备从 Channel
当中读取值,这时候有以下几种情况:
Channel
的 buffer 非空,读取者从 buffer 当中读取值,如果此时有挂起的写入者,需要去队头的写入者将值写入 buffer,然后立即恢复该写入者和当次的读取者。Channel
当中有挂起的写入者,写入者直接将要写入的值传给读取者,恢复读取者,恢复写入者Channel
的 buffer 为空,则读取者被存入挂起列表(reader_list)等待新的写入者写入时再恢复。接下来是具体的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 void try_push_reader (ReaderAwaiter<ValueType> *reader_awaiter) { std::unique_lock lock (channel_lock) ; check_closed (); if (!buffer.empty ()) { auto value = buffer.front (); buffer.pop (); if (!writer_list.empty ()) { auto writer = writer_list.front (); writer_list.pop_front (); buffer.push (writer->_value); lock.unlock (); writer->resume (); } else { lock.unlock (); } reader_awaiter->resume (value); return ; } if (!writer_list.empty ()) { auto writer = writer_list.front (); writer_list.pop_front (); lock.unlock (); reader_awaiter->resume (writer->_value); writer->resume (); return ; } reader_list.push_back (reader_awaiter); }
至此,我们已经完整给出 Channel
的实现。
说明 :我们当然也可以在 await_ready
的时候提前做一次判断,如果命中第 1、2 两种情况可以直接让写入/读取协程不挂起继续执行,这样可以避免写入/读取者的无效挂起。为了方便介绍,本文就不再做相关优化了。
监听协程的提前销毁 截止目前,我们给出的 Channel
仍然有个小小的限制,即 Channel
对象必须在持有 Channel
实例的协程退出之前关闭。
这主要是因为我们在 Channel
当中持有了已经挂起的读写协程的 Awaiter
的指针,一旦协程销毁,这些 Awaiter
也会被销毁,Channel
在关闭时试图恢复这些读写协程时就会出现程序崩溃(访问了野指针)。
为了解决这个问题,我们需要在 Awaiter
销毁时主动将自己的指针从 Channel
当中移除。以 ReaderAwaiter
为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 template <typename ValueType>struct ReaderAwaiter { ... ReaderAwaiter (ReaderAwaiter &&other) noexcept : channel (std::exchange (other.channel, nullptr )), executor (std::exchange (other.executor, nullptr )), _value(other._value), p_value (std::exchange (other.p_value, nullptr )), handle (other.handle) {} ... int await_resume () { channel->check_closed (); channel = nullptr ; return _value; } ... ~ReaderAwaiter () { if (channel) channel->remove_reader (this ); } };
我们在 ReaderAwaiter
的析构函数当中主动检查并移除了自己的指针,避免后续 Channel
对自身指针的无效访问。
对应的,Channel
当中也需要增加 remove_reader
函数:
1 2 3 4 5 6 7 8 9 10 11 template <typename ValueType>struct Channel { ... void remove_reader (ReaderAwaiter<ValueType> *reader_awaiter) { std::lock_guard lock (channel_lock) ; reader_list.remove (reader_awaiter); } }
WriterAwaiter
的修改类似,不再赘述。
这样修改之后,即使我们把正在等待读写 Channel
的协程提前结束销毁,也不会影响 Channel
的继续使用以及后续的正常关闭了。
小试牛刀 我们终于又实现了一个新的玩具,现在我们来给它通电试试效果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 using namespace std::chrono_literals;Task<void , LooperExecutor> Producer (Channel<int > &channel) { int i = 0 ; while (i < 10 ) { debug ("send: " , i); co_await (channel << i++); co_await 300 ms; } channel.close (); debug ("close channel, exit." ); } Task<void , LooperExecutor> Consumer (Channel<int > &channel) { while (channel.is_active ()) { try { int received; co_await (channel >> received); debug ("receive: " , received); co_await 2 s; } catch (std::exception &e) { debug ("exception: " , e.what ()); } } debug ("exit." ); } Task<void , LooperExecutor> Consumer2 (Channel<int > &channel) { while (channel.is_active ()) { try { auto received = co_await channel.read (); debug ("receive2: " , received); co_await 3 s; } catch (std::exception &e) { debug ("exception2: " , e.what ()); } } debug ("exit." ); } int main () { auto channel = Channel <int >(2 ); auto producer = Producer (channel); auto consumer = Consumer (channel); auto consumer2 = Consumer2 (channel); producer.get_result (); consumer.get_result (); consumer2.get_result (); return 0 ; }
例子非常简单,我们用一个写入者两个接收者向 Channel
当中读写数据,为了让示例更加凌乱,我们还加了一点点延时,运行结果如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 08:39:58.129 [Thread-26004] (main.cpp:15) Producer: send: 0 08:39:58.130 [Thread-27716] (main.cpp:31) Consumer: receive: 0 08:39:58.443 [Thread-26004] (main.cpp:15) Producer: send: 1 08:39:58.444 [Thread-17956] (main.cpp:45) Consumer2: receive2: 1 08:39:58.759 [Thread-26004] (main.cpp:15) Producer: send: 2 08:39:59.071 [Thread-26004] (main.cpp:15) Producer: send: 3 08:39:59.382 [Thread-26004] (main.cpp:15) Producer: send: 4 08:40:00.145 [Thread-27716] (main.cpp:31) Consumer: receive: 4 08:40:00.454 [Thread-26004] (main.cpp:15) Producer: send: 5 08:40:01.448 [Thread-17956] (main.cpp:45) Consumer2: receive2: 5 08:40:01.762 [Thread-26004] (main.cpp:15) Producer: send: 6 08:40:02.152 [Thread-27716] (main.cpp:31) Consumer: receive: 6 08:40:02.464 [Thread-26004] (main.cpp:15) Producer: send: 7 08:40:04.164 [Thread-27716] (main.cpp:31) Consumer: receive: 7 08:40:04.460 [Thread-17956] (main.cpp:45) Consumer2: receive2: 2 08:40:04.475 [Thread-26004] (main.cpp:15) Producer: send: 8 08:40:04.787 [Thread-26004] (main.cpp:15) Producer: send: 9 08:40:06.169 [Thread-27716] (main.cpp:31) Consumer: receive: 9 08:40:06.481 [Thread-26004] (main.cpp:22) Producer: close channel, exit. 08:40:07.464 [Thread-17956] (main.cpp:52) Consumer2: exit. 08:40:08.181 [Thread-27716] (main.cpp:38) Consumer: exit.
结果我就不分析了。
小结 本文给出了 C++ 协程版的 Channel
的 demo 实现,这进一步证明了 C++ 协程的基础 API 的设计足够灵活,能够支撑非常复杂的需求场景。
关于作者 霍丙乾 bennyhuo ,Google 开发者专家(Kotlin 方向);《深入理解 Kotlin 协程》 作者(机械工业出版社,2020.6);《深入实践 Kotlin 元编程》 作者(机械工业出版社,2023.8);移动客户端工程师,先后就职于腾讯地图、猿辅导、腾讯视频。