c++20出来有一段时间了。其中一大功能就是终于支持协程了(c++作为行业大哥大级别的语言,居然到C++20才开始支持协程,我也是无力吐槽了,让多少人等了多少年,等了多少青春)但千呼万唤他终于还是来了,c++标准委员会的谨慎态度也造就了c++20的给出来协程:“性能之优秀”,“开发之灵活”和让人劝退的“门槛之高”。
不过话说回来,c++从出身就注定了背负性能使命,他不是为简单为应用层维度开发的语言(如果应用层你大可以用python java ruby lua等语言),他是一门可以开发其他语言的语言,所以追逐高性能和灵活性,舍弃矫情的低门槛,毕竟C++不是设计来给所有人用的语言。
之前用过python的协程,协程易用程度高,所以c++20到来也想尝试c++状态下的协程,但是接触以后发现问题,c++20的协程状态是:只有基础设施,也就是实现了无栈协程的所有机制和功能,但没有封装到具体的应用层标准库STL。此时大部分人就只能干瞪眼了,由于复杂的协程运作机制,没有实现标准库的情况下,说要用上协程你是在开玩笑,网上一致的意见c++20是半成品,要真的用上c++协程得等c++23协程标准库完善后才行。
一贯本着不作死就不会死得态度,只会用库不懂底层机制,那不是用c++的态度,所以深入学习c++20协程,半个月时间,写了一个简单的协程库,在此过程中也对复杂的c++协程机制有了深入的了解。话说asio和cppcoro两个库已经支持了c++20协程,但是我觉得还是庞大和复杂,对于想通过看库源代码学习c++协程的同学,我觉得还是算了,在不懂协程机理的情况下,你连看源代码都看不懂好吧!!有人会说有源代码了你都看不懂,你是吹牛。那还真不是,c++协程在语法上会有些颠覆你的三观,我们来举个例子:
A func(int a){
co_await std::suspend_always{};
co_yield a;
co_return 0;
}
int main(){
auto co = func(1);
co.hd.resume();
int a = co.hd.resume();
int b = co.hd.resume();
}
有人说func是一个协程函数,main中的func运行后会返回0,也就是 co是一个int变量值为0;
如果你按常规代码理解,没错。但是在c++协程的世界,他完全不是上面说的情况。
正确的情况是: func在这里是一个协程生成器(这个概念很重要,他不是函数)返回值co是一个协程管理类A关联了具体协程执行体后的协程实例的控制句柄(的包装对象)。明确co不是协程实例(协程帧),是协程实例的控制句柄的包装对象,在func(1)执行之后他只是“动态”生成了一个协程实例,并把控制句柄返回给用户,但此时这个协程是挂起的,协程体{}代码块还没有被执行过,所以不存在返回值。这非常的绕,让人难以理解(后面还有更难理解的)。
在三次co.hd.resume();调用后协程才被完全执行完毕,此时a=1,b=0;
返回值保存在协程的实例(协程帧)中,通过协程管理类A的内部流程控制函数管理着返回值(A的promise_type定义了所有的协程控制行为)。
总结几点 (重要,不要混淆):
1、“协程管理类A是包含协程行为控制的类定义 ,A不是协程,形如 A func(int a, …){ … } 才是一个完整的协程定义”;所以A func1(){}; A func2(){}; A func3(){}; 都可以与同一个协程控制A绑定,但他们是3个不同的协程定义,只是协程控制行为都为A。好处是,你可以用一个std::vector< A > 保存下这3个不同的协程,他们的主协程体(功能实现)各不相同。要让A为一个协程管理类,必须包含struct promise_type{}定义,和一个控制句柄对象std::coroutine_handle< promise_type > hd; 特别的,A可以不实现await_xxx接口,他可以不是可等待体。
2、代码块体中有co_await ,co_yield,co_return关键字,则为协程体代码块,运行到关键字位置会**“触发协程挂起” ** ,此时原调用者代码阻塞在resume函数位置,运行权重新回到调用者,此时resume会返回,调用者继续执行;
3、特别的:
co_await可以与可等待对象配合,形成更为复杂的协程挂起行为:一般异步IO操作,都是通过co_await + io可等待对象,完成异步操作后挂起协程,等待异步io完成后,再由**“调度器”**恢复协程继续运行,从而发挥异步的意义,形成io复杂度向cpu复杂度的转移。因此,协程解决的是问题是“异步”而不是“并行”,要实现并行只能考虑多线程或多进程,协程可以将单个线程cpu效率发挥到最大,而不会被io阻塞浪费掉当前线程的cpu算能,那问题来了,如果我们用 协程 + 多线程/多进程 结合模式呢,那恭喜你,世界都将是你的;
co_yield实现简单挂起,简单的立即放弃运行权,返回调用者,可恢复(异步应用场景相对较少,多用于循环生成器);
co_return实现最后一次简单挂起,立即放弃运行权,返回调用者,协程后续不再可恢复(应用于协程退出);
4、可等待体(类形如 struct B{ await_ready();await_suspend();await_resume(); } 实现 三个await_xxx接口的类B是一个可等待体定义),他的实例是一个可等待对象;其中await_suspend()在执行后(不是执行前),会触发当前协程挂起(记住,此处不是可等待对象挂起,是co_await 此可等待对象的当前协程挂起,不能混淆,由于概念不清,我在这个位置耽误了很久的时间)
5、协程管理类A,和可等待体B,他们没有直接关系,是两个不同的东西。可等待体B控制挂起时点的行为是局部的,协程控制A控制协程整体创建,运行,挂起,销毁,异常捕获等过程的行为是整体的;协程只对应有一个控制类A,但是内部可以有多次挂起操作,每次操作对应一个可等待对象;
库开发
本文重点是库实战开发,关于协程框架中的 3大概念:协程定义类及promise_type{},可等待体awaitable,协程控制句柄std::coroutine_handle< > ,此处不做介绍,自行了解。
但是要介绍一下协程调度的运行逻辑,以此加深库开发过程的理解。这个过程在多线程下面是由内核管理的我们很少会了解,但是到了协程,你还要自己写库,那必须自己实现协程的调度算法和event loop模式
在此,我打个形象比喻:
现在一个家中有5个儿子,他们能力各不相同(工作者协程),还有一个妈妈(调度者协程),现在只有一台电脑(单线程时间片),同一时刻,这台电脑只能被老妈分给其中一个儿子来使用(协程抢占),其中一个儿子首先得到电脑开始工作(协程恢复),其他儿子只能等着无法工作(协程等待状态),有电脑的儿子工作一会后此时他发送一封对外邮件(可等待对象)但要等待邮件回复后才能继续工作(io等待完成),因为其他人此时还在等着用电脑而自己此时不具备继续工作的条件,所以他识趣的放弃电脑的使用权,并把电脑交还给老妈(协程挂起等待,执行权交还caller)并等着老妈下次再把电脑给他使用,老妈拿到电脑后(调度协程恢复执行)检查是否有回复邮件到来(调度协程检查事件完成,对应事件循环iocp/epoll),如果有了,老妈检查这封回复邮件是回复给哪个儿子的,并叫来对应的儿子(协程调度),把电脑交给他(协程恢复),得到电脑的儿子打开回复邮件拿到结果(await_resume() 返回异步io结果)继续工作,…, 不断循环。至此,完成一个协程完整调度流程。
要实现一个协程库,他需要几个东西:
1、实现具体的异步操作的可等待体(类似比喻中的发邮件操作,定义是否将电脑归还,获取回复后打开查询结果等行为);
2、协程控制类A(他是一个协程任务task),A的promise_type中应该记录协程的相关状态,记录挂起点的可等待对象的指针(很重要),可等待对象也可以充当task和调度协程,信息交换的媒介,可等待对象指针通过 await_suspend() 过程传递给task的promise做记录并保存。调度协程通过可等待对象指针在异步操作完成时将异步操作结果传回给等待的task。
3、 如总结和比喻所说,最重要的,还需要一个“协程调度器”。第一、他有一个主调度协程,调度协程具有一系列的调度算法,他的工作就是监测io异步完成事件的到来和分配执行权给task,第二,他维护有一个task协程队列(可以多种方法实现),队列记录着所有的协程实例的句柄,这个队列是为了协程调度准备的。
(注:之所以C++20无法直接使用的原因,其实就是,以上3个具体的工具没有现成的库,由于高度灵活,c++希望使用者自己实现以上组件,这让用惯成品库的我们非常难受,望而却步,天天喊着等c++23的标准库,但c++23也不能将所有的需求都囊括,遇到特殊需求还是要自己写)
实现思路
调度器:
1 调度协程中的event loop本例是在Windows下采用的iocp模型(linux下可以使用epoll也很好改,原理一样)
2、调度算法采用简单的等权重调度,也就是挂入协程队列的task,轮流调度,每个被调度的task被调度的机会相同;
3、完成事件标记和task恢复,业务分开,这样目的是使得通过co_yield简单挂起的任务有重新执行的机会(因为co_yeild不会在后续触发完成事件)
4、调度器中记录着协程队列的开始task和末尾task的handle,以便调度协程;
可等待体:
1、文件file异步read,write操作;
2、网络套接字,tcp协议下异步listen,accept, send, recv 操作;
3、网络套接字,udp协议下异步sendto, recvfrom 操作;
4、协程休眠,实现sleepFor,sleepForEx操作,分别实现协程任务的毫秒和微秒级休眠;
5、在iocp模型下以上api都提供了重叠io操作,此时将api执行成功的重叠io操作,将对应的可等待体指针记录到当前协程变量中(promise_type中的变量),一旦完成事件到来,调度协程就会设置可等待对象的完成标记状态为true,调度协程只要在轮询中逐个检查task保存的可等待对象指针,检查完成标记是否为true,为true恢复执行该协程,为false则跳过该协程,继续轮询 event loop;
任务定义(task协程):
1、task协程的promise_type中定义3个变量,
2、保存当前挂起的可等待提指针,如果当前协程不是io挂起或者是没有挂起,该指针应该为null
3、保存当前协程自身所属调度器Scheduler的指针;
4、保存此刻协程队列中的前一个协程task的handle和后一个协程task的handle;
5、若当前task的可等待对象完成标记为true,则调度协程会将该task的before task和behind task链接,将该task的handle移动到协程队列尾部,并且resume task,完成调度和恢复;
启动协程调度:
1、实例化调度器 CoScheduler;
2、通过lambda表达方式定义task协程,并加入到调度器的协程队列;
3、通过run方法启动调度器调度运行各协程任务;
4、task协程中又可以动态嵌套生产新的task协程加入到调度队列;
先看测试效果:(后面会有源码)
案例1:tcp 服务器/客户端模型测试
除调度协程外,协程队列中会产生4个task,一个服务监听器task,一个客户端生成器task,服务端task,客户端task
Main coro scheduler started ...
Main coro scheduler: Iocp loop started ... //0 调度协程执行
Iocp: New task arrived and first run, tid=26064
Tcp server coro started ... //1 监听器task执行
Server listener accept wait ... --》 在accept异步挂起
Iocp: New task arrived and first run, tid=26064 //0 调度协程执行 (event loop段)
Clients creater coro started. //2 客户端生成器task执行
Clients creater make client task 1. --》 动态生成客户端task加入队列
Clients creater yield run privilege to coro scheduler. --> 通过co_yield返回调度协程
Iocp: New task arrived and first run, tid=26064 //0 调度协程执行
Iocp: New task arrived and first run, tid=26064 --》 调度新到来的task
Client[1] connect wait ... //3 客户端task执行 在connect异步挂起
Iocp: IoFileAwaitable[TConnect] completed 0 bytes, tid=26064 //0 调度协程 执行 检测到connect完成事件
Clients creater fetch run privilege again. //2 客户端生成器task 执行
Clients creater yield run privilege to coro scheduler.
Client[1] send wait ...
Iocp: IoFileAwaitable[TAccept] completed 47 bytes, tid=26064 //0 调度协程执行 检测到accept完成事件
Server listener accept wait ... //1 服务端监听task执行 在accept异步挂起
Iocp: IoFileAwaitable[TSend] completed 47 bytes, tid=26064 //0 调度协程 执行
Clients creater fetch run privilege again. //2 客户端生成器task执行
Clients creater yield run privilege to coro scheduler.
Iocp: New task arrived and first run, tid=26064 //0 调度协程执行
Server[1] send wait ... //4 服务端task执行
Iocp: IoFileAwaitable[TSend] completed 47 bytes, tid=26064 //0 调度协程执行 检测到send完成事件
Client[1] recv wait ... //3 客户端task执行
Iocp: IoFileAwaitable[TRecv] completed 47 bytes, tid=26064 //0 调度协程执行 检测到recv完成事件
Clients creater fetch run privilege again.
Clients creater yield run privilege to coro scheduler.
Server[1] recv wait ... //4 服务端task执行 在recv异步挂起
Client[1] recv server msg = //3 客户端task执行
Hello client. this is server 1. 1st response. --》打印服务端发来的消息
Client[1] send wait ... --》在send异步挂起
Iocp: IoFileAwaitable[TRecv] completed 47 bytes, tid=26064 //0 调度协程执行 检测到recv完成事件
Iocp: IoFileAwaitable[TSend] completed 47 bytes, tid=26064 --》 检测到send完成事件
Clients creater fetch run privilege again.
Clients creater yield run privilege to coro scheduler.
Server[1] recv client msg = //4 服务端task执行
Helle server, this is client 1: 2st request. -->打印客户端发来的消息
Server[1] send wait ...
多个协程任务的异步交替执行,就是在一个协程遇到 可挂起的异步操作时,比如connect accept send recv等,把运行权限归还给调度器,当完成事件到来,调度器又把执行权返回给task,形成执行权在调度器和task之间反复横跳的情况,实现cpu的多任务复用;
案例2:udp 广播模式测试
Main coro scheduler started ... // 同案例1 调度启动,分别产生3个服务和3个客户端
Main coro scheduler: Iocp loop started ...
Iocp: New task arrived and first run, tid=31188
Servers creater coro started.
Servers creater make server task 1.
Servers creater make server task 2.
Servers creater make server task 3.
Servers creater yield run privilege to coro scheduler.
Iocp: New task arrived and first run, tid=31188
Clients creater coro started.
Clients creater make broadcastor client task 1.
Clients creater make broadcastor client task 2.
Clients creater make broadcastor client task 3.
Clients creater yield run privilege to coro scheduler.
Iocp: New task arrived and first run, tid=31188
Iocp: New task arrived and first run, tid=31188
Udp server[1] coro started bind port = 33000...
Udp server[1] recvfrom wait ... //服务端1 异步接收
Iocp: New task arrived and first run, tid=31188
Udp server[2] coro started bind port = 33001...
Udp server[2] recvfrom wait ... //服务端2 异步接收
Iocp: New task arrived and first run, tid=31188
Udp server[3] coro started bind port = 33002...
Udp server[3] recvfrom wait ... //服务端3 异步接收
Iocp: New task arrived and first run, tid=31188
Broadcastor[1] send wait ... //客户端1 异步发送
Iocp: New task arrived and first run, tid=31188
Broadcastor[2] send wait ... //客户端2 异步发送
Iocp: New task arrived and first run, tid=31188
Broadcastor[3] send wait ... //客户端3 异步发送
Iocp: IoFileAwaitable[TRecvfrom] completed 75 bytes, tid=31188 //调度器 recvfrom事件完成
Servers creater fetch run privilege again.
Servers creater yield run privilege to coro scheduler.
Iocp: IoFileAwaitable[TSendto] completed 75 bytes, tid=31188 //调度器 sendto事件完成
Clients creater fetch run privilege again.
Clients creater yield run privilege to coro scheduler.
Iocp: IoFileAwaitable[TRecvfrom] completed 75 bytes, tid=31188
Iocp: IoFileAwaitable[TSendto] completed 75 bytes, tid=31188
Iocp: IoFileAwaitable[TSendto] completed 75 bytes, tid=31188
Udp server[2] recvfrom 1st broadcast 75 bytes data, msg = //服务端2 收到并打印消息
Helle server, this is broadcastor 1: 1st randon broadcast to port=33001.
Udp server[2] recvfrom wait ... --》 在recvfrom异步挂起
Iocp: IoFileAwaitable[TRecvfrom] completed 75 bytes, tid=31188
Udp server[3] recvfrom 1st broadcast 75 bytes data, msg =
Helle server, this is broadcastor 2: 1st randon broadcast to port=33002.
Udp server[3] recvfrom wait ...
Broadcastor[1] sendto server msg =
Helle server, this is broadcastor 1: 1st randon broadcast to port=33001.
Broadcastor[1] send wait ...
Iocp: IoFileAwaitable[TSendto] completed 75 bytes, tid=31188
Broadcastor[2] sendto server msg =
Helle server, this is broadcastor 2: 1st randon broadcast to port=33002.
Broadcastor[2] send wait ...
Iocp: IoFileAwaitable[TRecvfrom] completed 75 bytes, tid=31188
Broadcastor[3] sendto server msg =
Helle server, this is broadcastor 3: 1st randon broadcast to port=33001.
再看看性能测试:
同样采用案例1和案例2的模型,但这次tcp采用100个server/client共200个task,udp采用20个braodcast/reciever共40个task来测试并发效果,做一下统计;效果如下
Tcp server coro started ...
Clients creater coro started.
Clients creater make client task 1.
...
Clients creater make client task 100.
Summary coro count 203: total handle 92752 times (spend time 3.06413s), 30902.3 times/per-second.
Summary coro count 203: total handle 185852 times (spend time 6.06633s), 31010.6 times/per-second.
Summary coro count 203: total handle 278601 times (spend time 9.06766s), 30902.6 times/per-second.
Summary coro count 203: total handle 371901 times (spend time 12.0696s), 31080.1 times/per-second.
Summary coro count 203: total handle 466752 times (spend time 15.0719s), 31592 times/per-second.
按server和client一次完整的send和recv,也就是4此tcp通信,记录为一次有效通讯记录,记为1times,
则结果显示,在coro=200时候,单个线程平均每秒将完成3万次有效通讯(虽然是自导自演,但是协程的功能完整实现了,性能可观)
Servers creater coro started.
Servers creater make server task 1.
...
Servers creater make server task 20.
Clients creater coro started.
Clients creater make broadcastor client task 1.
...
Clients creater make broadcastor client task 20.
Udp server[1] coro started bind port = 33000...
...
Udp server[20] coro started bind port = 33019...
Summary coro count 43: total handle 541730 times (spend time 3.02587s), 180571 times/per-second.
Summary coro count 43: total handle 1082377 times (spend time 6.02621s), 180196 times/per-second.
Summary coro count 43: total handle 1623102 times (spend time 9.02651s), 180223 times/per-second.
Summary coro count 43: total handle 2165716 times (spend time 12.0268s), 180853 times/per-second.
Summary coro count 43: total handle 2731919 times (spend time 15.0271s), 188714 times/per-second.
由于udp是单向非链接协议,速度会比tcp快得多,按一次sendto和recvfrom记为一次有效通讯,则在coro=40时候,单线程每秒有效通讯18万次。
最后
c++协程理解之后并不是很难,并且只要api提供异步方案,都可以实现协程库的封装,比如mysql,redis等异步操作,后续都可以依葫芦画瓢,很快实现c++协程库的开发。
本库开发只是为记录c++协程学习的经历,很多功能后续还需完善。目前支持在windows下的各位file socket sleep的异步操作,后续可扩展支持linux的epoll模型。
代码
头文件CLCoroutine.h 其中的void test_coroutine_tcp_server()和void test_coroutine_udp_random_broadcast()就是案例1和案例2的测试代码。
#ifndef __CL_COROUTINE__
#define __CL_COROUTINE__
#if (defined(__cplusplus) && __cplusplus >= 202002L) || (defined(_HAS_CXX20) && _HAS_CXX20)
#ifndef CLUseCorotine
#define CLUseCorotine 1
#endif
#endif
#if (defined(CLUseCorotine) && CLUseCorotine)
#include
#include
#include
#include "../_cl_common/CLCommon.h"
#ifdef _WIN32
#ifndef WIN32_LEAN_AND_MEAN //精简windows包含库的大小
#define WIN32_LEAN_AND_MEAN
#endif // !WIN32_LEAN_AND_MEAN
#include "Windows.h"
#include "Winsock2.h"
#include "WS2tcpip.h"
#include "MSWSock.h"
#pragma comment(lib, "ws2_32.lib")
#else
#endif
struct CoScheduler;
//(协程)任务单元
struct CoTask {
using return_type = void;
struct promise_type;
using handle = std::coroutine_handle;
struct promise_type {
CoTask get_return_object() { return { handle::from_promise(*this) }; }
void unhandled_exception() { std::terminate(); }
std::suspend_always initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void return_void() { }
template
std::suspend_always yield_value(const U& val) {
pAwaitableFile = nullptr;
return {};
}
CoScheduler* sc = 0;
handle before = 0, behind = 0;
void* pAwaitableFile = 0;
};
bool resume();
handle hd;
};
//(协程)任务调度器。包含主调度协程和事件循环,维护挂起的(协程)任务队列
struct CoScheduler {
struct MainCoro {
using return_type = void;
struct promise_type;
using handle = std::coroutine_handle;
struct promise_type {
MainCoro get_return_object() { return { handle::from_promise(*this) }; }
void unhandled_exception() { std::terminate(); }
std::suspend_always initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() { }
CoScheduler* sc = 0;
};
constexpr bool await_ready() const { return false; }
void await_suspend(std::coroutine_handle<>) { }
auto await_resume() const { }
handle hd;
};
CoScheduler()
: m_curTid(std::this_thread::get_id())
, m_hIocp(CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0))
{
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
}
bool registe(HANDLE hFile) {
if (!hFile || hFile == INVALID_HANDLE_VALUE || !m_hIocp || ::CreateIoCompletionPort(hFile, m_hIocp, 0, 0) != m_hIocp)
return false;
else
return true;
}
bool registe(SOCKET sock) {
return registe((HANDLE)sock);
}
// 创建task并等待后续调度执行
template
void gather(F&& func, Args&& ...args) {
if (m_curTid != std::this_thread::get_id())
throw std::logic_error("Scheduler thread is not match.");
CoTask coro = func(std::forward(args)...);
appendNewTaskToEnd(coro);
::PostQueuedCompletionStatus(m_hIocp, 0, (ULONG_PTR)coro.hd.address(), 0);
}
// 创建task并立即调度执行
template
void createTask(F&& func, Args&& ...args) {
if (m_curTid != std::this_thread::get_id())
throw std::logic_error("Scheduler thread is not match.");
CoTask coro = func(std::forward(args)...);
appendNewTaskToEnd(coro);
coro.resume();
}
size_t taskCount() const { return m_taskCount; }
// 执行协程调度
void run();
private:
void appendNewTaskToEnd(CoTask& cur) {
auto& cprm = cur.hd.promise();
cprm.sc = this;
if (m_end.hd) {
cprm.before = m_end.hd;
cprm.behind = 0;
m_end.hd.promise().behind = cur.hd;
}
m_end.hd = cur.hd;
++m_taskCount;
if (m_begin.hd == 0) {
m_begin.hd = cur.hd;
cprm.before = 0;
}
}
void moveTaskToEnd(CoTask::handle h) {
if (removeDoneTask())
return;
if (!h)
return;
auto& cprm = h.promise();
if (h == m_begin.hd) {
m_begin.hd = cprm.behind;
if (m_begin.hd)
m_begin.hd.promise().before = 0;
if (m_end.hd)
m_end.hd.promise().behind = h;
cprm.behind = 0;
cprm.before = m_end.hd;
m_end.hd = h;
}
else if (h == m_end.hd) {
}
else {
cprm.behind.promise().before = cprm.before;
cprm.before.promise().behind = cprm.behind;
if (m_end.hd)
m_end.hd.promise().behind = h;
cprm.behind = 0;
cprm.before = m_end.hd;
m_end.hd = h;
}
}
bool removeDoneTask() {
bool ret = false;
while (m_begin.hd && m_begin.hd.done()) {
auto h = m_begin.hd;
m_begin.hd = h.promise().behind;
if (m_begin.hd)
m_begin.hd.promise().before = 0;
h.destroy();
--m_taskCount;
ret = true;
}
return ret;
}
HANDLE m_hIocp;
const std::thread::id m_curTid;
MainCoro m_main;
CoTask m_begin, m_end;
std::atomic m_taskCount = 0;
};
// IO文件操作类型
enum IoFileType :int {
TUnknown = 0,
TRead,
TWrite,
TListen,
TAccept,
TConnect,
TSend,
TRecv,
TSendto,
TRecvfrom,
TSleep,
};
// IO文件调度优先级
enum IoFilePriority : int {
WaitingForPolling = 0, // 等待顺序轮询调度
DispatchImmediately, // 立即调度
};
// 支持异步挂起的可等待文件对象(基类)
template
struct IoFileAwaitable : OVERLAPPED {
operator HANDLE() const { return m_hFile; }
operator SOCKET() const { return (SOCKET)m_hFile; }
bool isRegisted() const { return m_isRegisted; }
bool isCompleted() const { return m_isCompleted; }
void setCompleted() { m_isCompleted = true; }
void resetCompleted() {
memset(this, 0, sizeof(OVERLAPPED));
m_isCompleted = 0;
}
void setReturn(Ret ret) { m_ret = ret; }
Ret getReturn() const { return m_ret; }
IoFileType& type() { return m_fileType; }
const char* typeName() const {
#define _TypeNameItem( tp ) case tp: return #tp;
switch (m_fileType)
{
_TypeNameItem(TUnknown);
_TypeNameItem(TRead);
_TypeNameItem(TWrite);
_TypeNameItem(TListen);
_TypeNameItem(TAccept);
_TypeNameItem(TConnect);
_TypeNameItem(TSend);
_TypeNameItem(TRecv);
_TypeNameItem(TSendto);
_TypeNameItem(TRecvfrom);
_TypeNameItem(TSleep);
default:
return "TUnknown";
}
}
void* getTransferredBytesCountBuffer() const {
return m_transferredBytesCount;
}
void setTransferredBytesCountRecvBuffer(void* countBuf) {
m_transferredBytesCount = countBuf;
}
bool close() {
if (m_hFile) {
return CloseHandle(detach());
}
return true;
}
HANDLE detach() {
HANDLE ret = *this;
m_hFile = 0;
m_isRegisted = 0;
return ret;
}
HANDLE attach(CoScheduler& sc, HANDLE s) {
HANDLE ret = *this;
m_hFile = s;
m_isRegisted = sc.registe(m_hFile);
return ret;
}
int getLastError() const { return m_lastError; }
void setLastError(int err) { m_lastError = err; }
CoTask::handle& onwer() { return m_owner; }
auto getPriority() const { return m_priority; }
void setPriority(IoFilePriority priority) { m_priority = priority; }
// awaitable methed
bool await_ready() const { return isCompleted(); }
void await_suspend(CoTask::handle h) {
h.promise().pAwaitableFile = this;
m_owner = h;
}
Ret await_resume() {
setTransferredBytesCountRecvBuffer(nullptr);
return getReturn();
}
protected:
IoFileAwaitable()
: m_hFile((HANDLE)0)
, m_isRegisted(false)
{
resetCompleted();
}
IoFileAwaitable(CoScheduler& sc, HANDLE hFile)
: m_hFile(hFile)
, m_isRegisted(sc.registe(m_hFile))
{
resetCompleted();
}
IoFileAwaitable(CoScheduler& sc, SOCKET sock)
: m_hFile((HANDLE)sock)
, m_isRegisted(sc.registe(sock))
{
resetCompleted();
}
HANDLE m_hFile;
bool m_isRegisted;
bool m_isCompleted;
IoFileType m_fileType = IoFileType::TUnknown;
void* m_transferredBytesCount = nullptr;
int m_lastError = ERROR_SUCCESS;
IoFilePriority m_priority = IoFilePriority::WaitingForPolling;
CoTask::handle m_owner;
Ret m_ret = 0;
};
// 支持异步挂起的套接字(基类)
template
struct AsyncSocket :public IoFileAwaitable {
using base = IoFileAwaitable;
~AsyncSocket() { close(); }
sockaddr_in localAddress() const { return m_local; }
sockaddr_in remoteAddress() const { return m_remote; }
sockaddr_in* localAddress() { return &m_local; }
sockaddr_in* remoteAddress() { return &m_remote; }
int close() {
int ret = 0;
if (base::m_hFile) {
if (base::m_hFile != (HANDLE)INVALID_SOCKET) {
ret = closesocket(detach());
}
else {
base::m_hFile = 0;
base::m_isRegisted = 0;
}
}
return ret;
}
SOCKET detach() {
return (SOCKET)base::detach();
}
SOCKET attach(CoScheduler& sc, SOCKET s) {
return (SOCKET)base::attach(sc, (HANDLE)s);
}
protected:
AsyncSocket(CoScheduler& sc, SOCKET sock)
:base(sc, sock)
{ }
sockaddr_in m_local = { 0 };
sockaddr_in m_remote = { 0 };
};
struct AsyncAcceptor;
// 支持异步挂起的服务端监听器,是一个等待连接到来的TCP监听套接字
struct AsyncListener :public AsyncSocket {
AsyncListener(CoScheduler& sc, unsigned long addr, unsigned short port, int backlog = SOMAXCONN)
:AsyncSocket(sc, WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED))
{
type() = IoFileType::TListen;
m_local.sin_family = AF_INET;
m_local.sin_addr.s_addr = addr;
m_local.sin_port = htons(port);
char opt = 1;
if (setsockopt(*this, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt))) {
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncListener set reuse addr error.");
}
if (SOCKET_ERROR == ::bind(*this, (sockaddr*)&m_local, sizeof(m_local))
|| SOCKET_ERROR == ::listen(*this, backlog)
)
{
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncListener bind or listen error.");
}
}
AsyncListener(CoScheduler& sc, const char* ip, unsigned short port, int backlog = SOMAXCONN)
:AsyncSocket(sc, WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED))
{
type() = IoFileType::TListen;
m_local.sin_family = AF_INET;
m_local.sin_port = htons(port);
InetPton(AF_INET, ip, &m_local.sin_addr);
char opt = 1;
if (setsockopt(*this, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt))) {
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncListener set reuse addr error.");
}
if (SOCKET_ERROR == ::bind(*this, (sockaddr*)&m_local, sizeof(m_local))
|| SOCKET_ERROR == ::listen(*this, backlog)
)
{
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncListener bind or listen error.");
}
}
sockaddr_in listenAddress() const { return localAddress(); }
// 返回值true成功,false失败
AsyncAcceptor& accept(AsyncAcceptor& sockAccept, void* lpAcceptBuffer
, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted);
};
// 支持异步挂起的TCP连接(基类)
struct AsyncTcp :public AsyncSocket {
// 返回值0成功,SOCKET_ERROR失败
AsyncTcp& send(const void* lpSendBuffer, unsigned long nNumberOfBytesSendBuffer, unsigned long* lpNumberOfBytesSend);
// 返回值0成功,SOCKET_ERROR失败
AsyncTcp& recv(void* lpRecvBuffer, unsigned long nNumberOfBytesRecvBuffer, unsigned long* lpNumberOfBytesRecv);
protected:
AsyncTcp(CoScheduler& sc)
:AsyncSocket(sc, WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED))
{ }
};
// 支持异步挂起的服务端接收器,是一个接受端TCP套接字
struct AsyncAcceptor : public AsyncTcp {
AsyncAcceptor(CoScheduler& sc)
: AsyncTcp(sc)
{
type() = IoFileType::TAccept;
}
// 解析到来连接的地址信息,保存在内部地址变量
void perseAddress(void* lpAcceptBuffer, unsigned long nNumberOfBytesAcceptBuffer) {
if (lpAcceptBuffer == 0 || nNumberOfBytesAcceptBuffer == 0)
throw std::logic_error("perseAddress parm is invalid.");
static LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptSockAddrs = 0;
if (!lpfnGetAcceptSockAddrs) {
GUID GuidGetAcceptexSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
unsigned long dwBytes = 0;
if (SOCKET_ERROR == WSAIoctl(
*this,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidGetAcceptexSockAddrs,
sizeof(GuidGetAcceptexSockAddrs),
&lpfnGetAcceptSockAddrs,
sizeof(lpfnGetAcceptSockAddrs),
&dwBytes, NULL, NULL))
{
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncListener GetAcceptexSockAddrs error.");
}
}
int localLen = 0, remoteLen = 0;
lpfnGetAcceptSockAddrs(
lpAcceptBuffer,
nNumberOfBytesAcceptBuffer - (sizeof(sockaddr_in) + 16) * 2,
sizeof(sockaddr_in) + 16,
sizeof(sockaddr_in) + 16,
(LPSOCKADDR*)localAddress(),
&localLen,
(LPSOCKADDR*)remoteAddress(),
&remoteLen
);
}
// 返回值true成功,false失败
AsyncAcceptor& accept(AsyncListener& sockListener, void* lpAcceptBuffer
, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted);
int await_resume() {
setPriority(IoFilePriority::WaitingForPolling);
return AsyncTcp::await_resume();
}
};
// 支持异步挂起的用户端连接器,是一个发起端TCP套接字
struct AsyncConnector : public AsyncTcp {
AsyncConnector(CoScheduler& sc)
: AsyncTcp(sc)
{
type() = IoFileType::TConnect;
}
AsyncConnector(CoScheduler& sc, const char* ip, unsigned short port)
: AsyncTcp(sc)
{
type() = IoFileType::TConnect;
setConnectRemoteAddress(ip, port);
bindConnectLocalPort(0);
}
void setConnectRemoteAddress(const char* ip, unsigned short port) {
memset(&m_remote, 0, sizeof(m_remote));
m_remote.sin_family = AF_INET;
m_remote.sin_port = htons(port);
InetPton(AF_INET, ip, &m_remote.sin_addr);
}
int bindConnectLocalPort(unsigned short port = 0) {
memset(&m_local, 0, sizeof(m_local));
m_local.sin_family = AF_INET;
m_local.sin_addr.s_addr = INADDR_ANY;
m_local.sin_port = htons(port);
return ::bind(*this, (const sockaddr*)&m_local, sizeof(m_local));
}
// 返回值true成功,false失败
AsyncConnector& connect(const sockaddr* name, int namelen, void* lpSendBuffer = nullptr
, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr);
// 返回值true成功,false失败
AsyncConnector& connect(const char* ip, unsigned short port
, void* lpSendBuffer = nullptr, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr);
// 返回值true成功,false失败
AsyncConnector& connect(void* lpSendBuffer = nullptr
, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr);
};
// 作为服务端Acceptor应该具有事件完成并立即调度优先级,保证吞吐量
// 返回值true成功,false失败
inline
AsyncAcceptor&
accept(AsyncListener& sockListener, AsyncAcceptor& sockAccept, void* lpAcceptBuffer
, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted) {
static LPFN_ACCEPTEX lpfnAcceptEx = 0;
sockListener.type() = IoFileType::TListen;
sockAccept.type() = IoFileType::TAccept;
sockAccept.resetCompleted();
sockAccept.setTransferredBytesCountRecvBuffer(lpNumberOfBytesAccepted);
sockAccept.setPriority(IoFilePriority::DispatchImmediately);//设置为立即调度优先级
if (lpNumberOfBytesAccepted)
*lpNumberOfBytesAccepted = 0;
if (!lpfnAcceptEx) {
GUID GuidAcceptEx = WSAID_ACCEPTEX; // GUID,这个是识别AcceptEx函数必须的
unsigned long dwBytes = 0;
if (SOCKET_ERROR == WSAIoctl(
sockListener,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx,
sizeof(GuidAcceptEx),
&lpfnAcceptEx,
sizeof(lpfnAcceptEx),
&dwBytes, NULL, NULL))
{
lpfnAcceptEx = 0;
throw std::system_error(WSAGetLastError(), std::system_category(), "Accept get AcceptEx function address error.");
}
}
bool ret = lpfnAcceptEx(
sockListener,
sockAccept,
(char*)lpAcceptBuffer,
nNumberOfBytesAcceptBuffer - (sizeof(sockaddr_in) + 16) * 2,
sizeof(sockaddr_in) + 16,
sizeof(sockaddr_in) + 16,
lpNumberOfBytesAccepted,
&sockAccept
);
if (ret == false) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = true;
}
if (ret) {
sockAccept.setReturn(ret);
return sockAccept;
}
sockAccept.setReturn(false);
sockAccept.setCompleted();
sockAccept.setPriority(IoFilePriority::WaitingForPolling);
return sockAccept;
}
// 返回值true成功,false失败
inline
AsyncConnector&
connect(AsyncConnector& sockCon, const sockaddr* name, int namelen, void* lpSendBuffer = nullptr
, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr) {
static LPFN_CONNECTEX lpfnConnectEx = 0;
sockCon.type() = IoFileType::TConnect;
sockCon.resetCompleted();
if (lpdwBytesSent)
*lpdwBytesSent = 0;
if (!lpfnConnectEx) {
GUID GuidConnectEx = WSAID_CONNECTEX; // GUID,这个是识别AcceptEx函数必须的
unsigned long dwBytes = 0;
if (SOCKET_ERROR == WSAIoctl(
sockCon,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidConnectEx,
sizeof(GuidConnectEx),
&lpfnConnectEx,
sizeof(lpfnConnectEx),
&dwBytes, NULL, NULL))
{
lpfnConnectEx = 0;
throw std::system_error(WSAGetLastError(), std::system_category(), "Connect get ConnectEx function address error.");
}
}
sockCon.setTransferredBytesCountRecvBuffer(lpdwBytesSent);
bool ret = lpfnConnectEx(
sockCon,
name,
namelen,
lpSendBuffer,
dwSendDataLength,
lpdwBytesSent,
&sockCon
);
if (ret == false) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = true;
}
if (ret) {
sockCon.setReturn(ret);
return sockCon;
}
sockCon.setReturn(false);
sockCon.setCompleted();
return sockCon;
}
// 返回值true成功,false失败
inline
AsyncConnector&
connect(AsyncConnector& sockCon, const char* ip, unsigned short port
, void* lpSendBuffer = nullptr, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr) {
sockCon.setConnectRemoteAddress(ip, port);
sockCon.bindConnectLocalPort(0);
return connect(sockCon, (const sockaddr*)sockCon.remoteAddress(), sizeof(*sockCon.remoteAddress()),
lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}
// 返回值true成功,false失败
inline
AsyncConnector&
connect(AsyncConnector& sockCon, void* lpSendBuffer = nullptr
, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr) {
return connect(sockCon, (const sockaddr*)sockCon.remoteAddress(), sizeof(*sockCon.remoteAddress()),
lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}
// 返回值0成功,SOCKET_ERROR失败
inline
AsyncTcp&
send(AsyncTcp& sock, const void* lpSendBuffer, unsigned long nNumberOfBytesSendBuffer, unsigned long* lpNumberOfBytesSend) {
sock.type() = IoFileType::TSend;
sock.resetCompleted();
if (lpNumberOfBytesSend)
*lpNumberOfBytesSend = 0;
sock.setTransferredBytesCountRecvBuffer(lpNumberOfBytesSend);
WSABUF wsaBuf{ nNumberOfBytesSendBuffer , (char*)lpSendBuffer };
auto ret = WSASend(sock, &wsaBuf, 1, lpNumberOfBytesSend, 0, &sock, NULL);
if (ret == SOCKET_ERROR) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = 0;
else
sock.setCompleted();
}
sock.setReturn(ret);
return sock;
}
// 返回值0成功,SOCKET_ERROR失败
inline
AsyncTcp&
recv(AsyncTcp& sock, void* lpRecvBuffer, unsigned long nNumberOfBytesRecvBuffer, unsigned long* lpNumberOfBytesRecv) {
sock.type() = IoFileType::TRecv;
sock.resetCompleted();
if (lpNumberOfBytesRecv)
*lpNumberOfBytesRecv = 0;
sock.setTransferredBytesCountRecvBuffer(lpNumberOfBytesRecv);
WSABUF wsaBuf{ nNumberOfBytesRecvBuffer , (char*)lpRecvBuffer };
unsigned long dwFlag = 0;
auto ret = WSARecv(sock, &wsaBuf, 1, NULL, &dwFlag, &sock, NULL);
if (ret == SOCKET_ERROR) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = 0;
else
sock.setCompleted();
}
sock.setReturn(ret);
return sock;
}
// 支持异步挂起的UDP(非连接)套接字
struct AsyncUdp : public AsyncSocket {
// 设置失败返回-1;返回1设置为广播模式(client端),返回0则为接收端(server端)
int status() const { return m_isBroadCast; }
int* remoteLen() { return &m_remoteLen; }
protected:
//isBroadCast = true则为发送端udp(client端),使用sendTo,此时可以在sendTo阶段动态指定广播目的地址
//isBroadCast = false则为接受端udp(server端),使用recvFrom,构造时必须指定绑定的广播接收地址
AsyncUdp(CoScheduler& sc, bool isBroadCast = true, const char* ip = 0, unsigned short port = 0)
: AsyncSocket(sc, WSASocketW(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_OVERLAPPED))
{
setBroadCast(isBroadCast, ip, port);
}
// 设置失败返回-1;返回1设置为广播模式(client端),返回0则为接收端(server端)
int setBroadCast(bool isBroadCast, const char* ip, unsigned short port) {
if (*this && *this != INVALID_SOCKET)
{
m_isBroadCast = isBroadCast;
if (::setsockopt(*this, SOL_SOCKET, SO_BROADCAST, (char*)&m_isBroadCast, sizeof(m_isBroadCast)) == 0) {
if (isBroadCast) {
setBindAddress(0, 0);
setBroadcastAddress(ip, port);
}
else {
setBindAddress(ip, port);
}
return m_isBroadCast;
}
}
return m_isBroadCast = -1;
}
// 设置接收器绑定的收听本地地址
bool setBindAddress(const char* ip, unsigned short port)
{
memset(&m_local, 0, sizeof(m_local));
m_local.sin_family = AF_INET;
m_local.sin_addr.S_un.S_addr = ip ? inet_addr(ip) : INADDR_ANY;
m_local.sin_port = htons(port);
char opt = 1;
if (setsockopt(*this, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt))) {
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncUdp set reuse address error.");
}
if (::bind(*this, (const sockaddr*)&m_local, sizeof(sockaddr_in)))
{
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncUdp bind address error.");
}
return true;
}
// 设置发送要广播到的目标地址(远端地址)
void setBroadcastAddress(const char* ip, unsigned short port)
{
memset(&m_remote, 0, sizeof(m_remote));
m_remote.sin_family = AF_INET;
m_remote.sin_addr.S_un.S_addr = ip ? inet_addr(ip) : INADDR_ANY;
m_remote.sin_port = htons(port);
}
int m_remoteLen = 0;
int m_isBroadCast = -1;
};
// 支持异步挂起的UDP协议广播器套接字(发送端,client端)
struct AsyncBroadcastor :public AsyncUdp {
AsyncBroadcastor(CoScheduler& sc, const char* ip = 0, unsigned short port = 0)
:AsyncUdp(sc, true, ip, port)
{
type() = IoFileType::TSendto;
}
// 发送端udp(client端)向内部已保存的指定的广播地址发送数据(未设置广播地址将失败)
// 返回值0成功,SOCKET_ERROR失败
AsyncBroadcastor& sendTo(const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent);
// 发送端udp(client端)向动态指定的广播地址发送数据
// 返回值0成功,SOCKET_ERROR失败
AsyncBroadcastor& sendTo(const char* ip, unsigned short port, const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent);
bool isValidBroadcastor() const { return status() == 1; }
using AsyncUdp::setBroadcastAddress;
};
// 支持异步挂起的UDP协议接收器套接字(接收端,server端)
struct AsyncReceiver :public AsyncUdp {
AsyncReceiver(CoScheduler& sc, const char* ip, unsigned short port)
:AsyncUdp(sc, false, ip, port)
{
type() = IoFileType::TRecvfrom;
}
// 接收端udp(server端)向绑定的本地地址获取广播数据
// 返回值0成功,SOCKET_ERROR失败
AsyncReceiver& recvFrom(void* lpRecvdBuffer, unsigned long nNumberOfBytesRecvdBuffer, unsigned long* lpNumberOfBytesRecvd);
bool isValidReceiver() const { return status() == 0; }
using AsyncUdp::setBindAddress;
};
// 返回值0成功,SOCKET_ERROR失败
inline
AsyncBroadcastor&
sendTo(AsyncBroadcastor& sock, const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent) {
sock.type() = IoFileType::TSendto;
sock.resetCompleted();
if (lpNumberOfBytesSent)
*lpNumberOfBytesSent = 0;
sock.setTransferredBytesCountRecvBuffer(lpNumberOfBytesSent);
WSABUF wsaBuf{ nNumberOfBytesSentBuffer , (char*)lpSentBuffer };
auto ret = WSASendTo(sock, &wsaBuf, 1, lpNumberOfBytesSent, 0,
(const sockaddr*)sock.remoteAddress(), (int)sizeof(sockaddr_in), &sock, NULL);
if (ret == SOCKET_ERROR) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = 0;
else
sock.setCompleted();
}
sock.setReturn(ret);
return sock;
}
// 返回值0成功,SOCKET_ERROR失败
inline
AsyncBroadcastor&
sendTo(AsyncBroadcastor& sock, const char* ip, unsigned short port,
const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent) {
sock.setBroadcastAddress(ip, port);
return ::sendTo(sock, lpSentBuffer, nNumberOfBytesSentBuffer, lpNumberOfBytesSent);;
}
// 返回值0成功,SOCKET_ERROR失败
inline
AsyncReceiver&
recvFrom(AsyncReceiver& sock, void* lpRecvdBuffer, unsigned long nNumberOfBytesRecvdBuffer, unsigned long* lpNumberOfBytesRecvd) {
sock.type() = IoFileType::TRecvfrom;
sock.resetCompleted();
if (lpNumberOfBytesRecvd)
*lpNumberOfBytesRecvd = 0;
sock.setTransferredBytesCountRecvBuffer(lpNumberOfBytesRecvd);
WSABUF wsaBuf{ nNumberOfBytesRecvdBuffer , (char*)lpRecvdBuffer };
DWORD dwFlag = 0;
*sock.remoteLen() = sizeof(sockaddr_in);
auto ret = WSARecvFrom(sock, &wsaBuf, 1, NULL, &dwFlag,
(sockaddr*)sock.remoteAddress(), sock.remoteLen(), &sock, NULL);
if (ret == SOCKET_ERROR) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = 0;
else
sock.setCompleted();
}
sock.setReturn(ret);
return sock;
}
struct AsyncFile : public IoFileAwaitable {
AsyncFile(CoScheduler& sc, const char* filename,
unsigned long dwDesiredAccess,
unsigned long dwShareMode,
LPSECURITY_ATTRIBUTES lpSecurityAttributes,
unsigned long dwCreationDisposition,
unsigned long dwFlagsAndAttributes,
HANDLE hTemplateFile
)
:IoFileAwaitable(sc, CreateFileA(filename, dwDesiredAccess, dwShareMode,
lpSecurityAttributes, dwCreationDisposition, dwFlagsAndAttributes, hTemplateFile))
{
}
AsyncFile(CoScheduler& sc, const wchar_t* filename,
unsigned long dwDesiredAccess,
unsigned long dwShareMode,
LPSECURITY_ATTRIBUTES lpSecurityAttributes,
unsigned long dwCreationDisposition,
unsigned long dwFlagsAndAttributes,
HANDLE hTemplateFile
)
:IoFileAwaitable(sc, CreateFileW(filename, dwDesiredAccess, dwShareMode,
lpSecurityAttributes, dwCreationDisposition, dwFlagsAndAttributes, hTemplateFile))
{
}
~AsyncFile() { close(); }
AsyncFile& read(void* lpBuffer, unsigned long nNumberOfBytesToRead, unsigned long* lpNumberOfBytesRead);
AsyncFile& write(const void* lpBuffer, unsigned long nNumberOfBytesToWrite, unsigned long* lpNumberOfBytesWritten);
};
// 返回值true成功,false失败
inline
AsyncFile&
read(AsyncFile& file, void* lpBuffer, unsigned long nNumberOfBytesToRead, unsigned long* lpNumberOfBytesRead) {
file.type() = IoFileType::TWrite;
file.resetCompleted();
if (lpNumberOfBytesRead)
*lpNumberOfBytesRead = 0;
file.setTransferredBytesCountRecvBuffer(lpNumberOfBytesRead);
auto ret = ReadFile(file, lpBuffer, nNumberOfBytesToRead, lpNumberOfBytesRead, &file);
if (ret == false) {
auto lr = GetLastError();
if (lr == ERROR_IO_PENDING)
ret = true;
else
file.setCompleted();
}
file.setReturn(ret);
return file;
}
// 返回值true成功,false失败
inline
AsyncFile&
write(AsyncFile& file, const void* lpBuffer, unsigned long nNumberOfBytesToWrite, unsigned long* lpNumberOfBytesWritten) {
file.type() = IoFileType::TWrite;
file.resetCompleted();
if (lpNumberOfBytesWritten)
*lpNumberOfBytesWritten = 0;
file.setTransferredBytesCountRecvBuffer(lpNumberOfBytesWritten);
auto ret = WriteFile(file, lpBuffer, nNumberOfBytesToWrite, lpNumberOfBytesWritten, &file);
if (ret == false) {
auto lr = GetLastError();
if (lr == ERROR_IO_PENDING)
ret = true;
else
file.setCompleted();
}
file.setReturn(ret);
return file;
}
struct AsyncSleepor :public IoFileAwaitable {
AsyncSleepor(long long microOrMilliSeconds = 0, bool useMicroSeconds = false)
: microOrMilliSeconds(microOrMilliSeconds)
, useMicroSeconds(useMicroSeconds)
{
type() = IoFileType::TSleep;
start();
}
void start()
{
tp = std::chrono::steady_clock::now();
}
auto getSpendMicroSeconds() const {
constexpr auto div = std::nano::den / std::micro::den;
std::chrono::nanoseconds delta(std::chrono::steady_clock::now() - tp);
return delta.count() / div;
}
auto getSpendMilliSeconds() const {
constexpr auto div = std::nano::den / std::milli::den;
std::chrono::nanoseconds delta(std::chrono::steady_clock::now() - tp);
return delta.count() / div;
}
bool isCompleted() {
setReturn(useMicroSeconds ? getSpendMicroSeconds() : getSpendMilliSeconds());
return (m_isCompleted = getReturn() >= microOrMilliSeconds);
}
protected:
long long microOrMilliSeconds;
bool useMicroSeconds;
std::chrono::steady_clock::time_point tp;
};
//毫秒妙级别休眠,返回实际休眠的毫妙数
inline
AsyncSleepor
sleepFor(long long milliSeconds) {
return AsyncSleepor{ milliSeconds };
}
//微妙级别休眠,返回实际休眠的微妙数
inline
AsyncSleepor
sleepForEx(long long microSeconds) {
return AsyncSleepor{ microSeconds, true };
}
void test_coroutine_tcp_server(unsigned short serverPort = 33100, int totalClientCount = 100, bool dumpTestInfo = 0);
void test_coroutine_udp_random_broadcast(unsigned short broadCastPort = 33000, int totalClientBroadcastCount = 20, bool dumpTestInfo = 0);
#endif
#endif
实现文件
CLCoroutine.cpp
#include "CLCoroutine.h"
#if (defined(CLUseCorotine) && CLUseCorotine)
#include "../_cl_common/CLCommon.h"
#include "../_cl_string/CLString.h"
#include "../_cl_logger/CLLogger.h"
void CoScheduler::run() {
auto coro = [this]() ->MainCoro {
//logger.debug("nMain coro scheduler started ...");
#ifdef _WIN32
if (m_hIocp) {
CLString err;
DWORD dwMilliseconds = 0;
//logger.debug("nMain coro scheduler: Iocp loop started ...");
while (1) {
DWORD numberOfBytesTransferred = 0;
ULONG_PTR completionKey = 0;
OVERLAPPED* pOverlapped = 0;
while (GetQueuedCompletionStatus(m_hIocp, &numberOfBytesTransferred, &completionKey, &pOverlapped, dwMilliseconds))
{
if (pOverlapped) { //io完成事件
auto pFile = (IoFileAwaitable<>*)pOverlapped;
pFile->setCompleted();
pFile->setLastError(ERROR_SUCCESS);
auto saveBuf = (DWORD*)pFile->getTransferredBytesCountBuffer();
if (saveBuf) *saveBuf = numberOfBytesTransferred;
// 根据可等待对象的优先级,决定是否立即调度或是轮流调度让每个任务的权重相同
switch (pFile->getPriority())
{
case IoFilePriority::DispatchImmediately:
moveTaskToEnd(pFile->onwer()); //立即调度
break;
default:
moveTaskToEnd(m_begin.hd); //轮询调度
break;
}
m_end.resume();
}
else { //新task来到,立即调度
if (numberOfBytesTransferred == 0 && completionKey) {
auto h = CoTask::handle::from_address((void*)completionKey);
moveTaskToEnd(h);
h.resume();
}
else {
auto lr = GetLastError();
logger.warning("Iocp: get status in event loop: ",err.getLastErrorString(lr));
CLString().getLastErrorMessageBoxExceptSucceed(lr);
}
}
}
auto lr = GetLastError();
if (lr == WSA_WAIT_TIMEOUT) {
moveTaskToEnd(m_begin.hd); //轮询调度
m_end.resume(); //执行resume,此刻所有等待io均未完成不会执行,但yeild让渡的协程得到执行;
}
else if(pOverlapped) {
auto pFile = (IoFileAwaitable<>*)pOverlapped;
pFile->setCompleted();
pFile->setLastError(lr);
auto saveBuf = (DWORD*)pFile->getTransferredBytesCountBuffer();
if (saveBuf) *saveBuf = 0;
IoFileType fileType = pFile->type();
switch (fileType)
{
case TUnknown:
break;
case TRead:
case TWrite:
case TListen:
case TAccept:
case TConnect:
pFile->setReturn(false);
break;
case TSend:
case TRecv:
case TSendto:
case TRecvfrom:
pFile->setReturn(SOCKET_ERROR);
break;
case TSleep:
break;
default:
break;
}
switch (lr)
{
case ERROR_NETNAME_DELETED: //64 指定的网络名不再可用
break;
case ERROR_SEM_TIMEOUT://121 信号灯超时
break;
default:
logger.error("Iocp: get status out event loop: ", err.getLastErrorString(lr));
CLString().getLastErrorMessageBoxExceptSucceed(lr);
break;
}
// 根据可等待对象的优先级,决定是否立即调度或是轮流调度让每个任务的权重相同
switch (pFile->getPriority())
{
case IoFilePriority::DispatchImmediately:
moveTaskToEnd(pFile->onwer()); //立即调度
break;
default:
moveTaskToEnd(m_begin.hd); //轮询调度
break;
}
m_end.resume();
}
else {
logger.error("Iocp: get status out event loop no completed: ", err.getLastErrorString(lr));
CLString().getLastErrorMessageBoxExceptSucceed(lr);
}
if (taskCount() == 0)
break;
}
CloseHandle(m_hIocp);
m_hIocp = 0;
//logger.debug("nMain coro scheduler: Iocp loop has done ...");
}
#endif
//logger.debug("nMain coro scheduler quit ...");
co_return;
};
m_main = coro();
m_main.hd.promise().sc = this;
m_main.hd.resume();
m_main.hd.destroy();
}
bool CoTask::resume() {
if (!hd)
return true;
else if (hd.done()) {
return false;
}
else {
auto pFile = (IoFileAwaitable<>*) hd.promise().pAwaitableFile;
if (!pFile) //第一次调度或者yield的协程
hd.resume();
else {
if (pFile->type() == IoFileType::TSleep) { //休眠调度
if (((AsyncSleepor*)pFile)->isCompleted()) {
hd.promise().pAwaitableFile = nullptr;
hd.resume();
}
}
else if (pFile->isCompleted()) { //io完成调度
hd.promise().pAwaitableFile = nullptr;
hd.resume();
}
}
return true;
}
}
#ifdef _WIN32
#else // Windows
#endif // Linux
AsyncAcceptor& AsyncListener::accept(AsyncAcceptor& sockAccept, void* lpAcceptBuffer, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted)
{
return ::accept(* this, sockAccept, lpAcceptBuffer, nNumberOfBytesAcceptBuffer, lpNumberOfBytesAccepted);
}
AsyncAcceptor& AsyncAcceptor::accept(AsyncListener& sListen, void* lpAcceptBuffer, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted)
{
return ::accept(sListen, *this, lpAcceptBuffer, nNumberOfBytesAcceptBuffer, lpNumberOfBytesAccepted);
}
AsyncConnector& AsyncConnector::connect(const sockaddr* name, int namelen, void* lpSendBuffer, unsigned long dwSendDataLength, unsigned long* lpdwBytesSent)
{
return ::connect(*this, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}
AsyncConnector& AsyncConnector::connect(const char* ip, unsigned short port, void* lpSendBuffer, unsigned long dwSendDataLength, unsigned long* lpdwBytesSent)
{
return ::connect(*this, ip, port, lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}
AsyncConnector& AsyncConnector::connect(void* lpSendBuffer, unsigned long dwSendDataLength, unsigned long* lpdwBytesSent)
{
return ::connect(*this, lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}
AsyncTcp& AsyncTcp::send(const void* lpSendBuffer, unsigned long nNumberOfBytesSendBuffer, unsigned long* lpNumberOfBytesSend)
{
return ::send(*this, lpSendBuffer, nNumberOfBytesSendBuffer, lpNumberOfBytesSend);
}
AsyncTcp& AsyncTcp::recv(void* lpRecvBuffer, unsigned long nNumberOfBytesRecvBuffer, unsigned long* lpNumberOfBytesRecv)
{
return ::recv(*this, lpRecvBuffer, nNumberOfBytesRecvBuffer, lpNumberOfBytesRecv);
}
AsyncBroadcastor& AsyncBroadcastor::sendTo(const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent)
{
return ::sendTo(*this, lpSentBuffer, nNumberOfBytesSentBuffer, lpNumberOfBytesSent);
}
AsyncBroadcastor& AsyncBroadcastor::sendTo(const char* ip, unsigned short port, const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent)
{
return ::sendTo(*this, ip, port, lpSentBuffer, nNumberOfBytesSentBuffer, lpNumberOfBytesSent);
}
AsyncReceiver& AsyncReceiver::recvFrom(void* lpRecvdBuffer, unsigned long nNumberOfBytesRecvdBuffer, unsigned long* lpNumberOfBytesRecvd)
{
return ::recvFrom(*this, lpRecvdBuffer, nNumberOfBytesRecvdBuffer, lpNumberOfBytesRecvd);
}
AsyncFile& AsyncFile::read(void* lpBuffer, unsigned long nNumberOfBytesToRead, unsigned long* lpNumberOfBytesRead)
{
return ::read(*this, lpBuffer, nNumberOfBytesToRead, lpNumberOfBytesRead);
}
AsyncFile& AsyncFile::write(const void* lpBuffer, unsigned long nNumberOfBytesToWrite, unsigned long* lpNumberOfBytesWritten)
{
return ::write(*this, lpBuffer, nNumberOfBytesToWrite, lpNumberOfBytesWritten);
}
#include
void test_coroutine_tcp_server(unsigned short serverPort, int totalClientCount, bool dumpTestInfo)
{
logger.setLevel(dumpTestInfo ? logger.Debug : logger.Info);
logger.openHeadInfoFlag(false);
CoScheduler sc;
int servRun = 0;
int totals = 0;
CLTick tk;
// 服务端监听器task
sc.gather([&]()->CoTask {
logger.info("nTcp server coro started ...");
AsyncListener listener(sc, ADDR_ANY, serverPort);
// loop accept
std::vector acceptbuf(260);
AsyncAcceptor* pAcceptor = 0;
int servId = 0;
while (true)
{
AsyncAcceptor& acceptor = pAcceptor ? *pAcceptor : *(pAcceptor = new AsyncAcceptor(sc));
DWORD nValidAccept;
logger.debug("nServer listener accept wait ...");
bool ret = co_await listener.accept(acceptor, acceptbuf.data(), acceptbuf.size(), &nValidAccept);
if (ret) {
//create server task
acceptor.perseAddress(acceptbuf.data(), acceptbuf.size());
servRun++;
// 服务端task
sc.gather([&](AsyncAcceptor* pAcceptor, int idx) ->CoTask {
AsyncAcceptor& acp = *pAcceptor;
std::vector bufSend(260), bufRecv(260);
DWORD nbytesSend, nbytesRecv;
int total = 1;
while (1) {
std::sprintf(bufSend.data(), "nHello client. this is server %d. %dst response.", idx, total);
logger.debug("nServer[%d] send wait ...", idx);
int ret = co_await acp.send(bufSend.data(), std::strlen(bufSend.data()) + 1, &nbytesSend);
logger.debug("nServer[%d] recv wait ...", idx);
ret = co_await acp.recv(bufRecv.data(), bufRecv.size(), &nbytesRecv);
if (nbytesRecv == 0)
break;
logger.debug("nServer[%d] recv client msg = %s", idx, bufRecv.data());
total++;
totals++;
}
logger.debug("nServer[%d] recv client close msg", idx);
delete pAcceptor;
servRun--;
}, pAcceptor, ++servId);
pAcceptor = 0;
}
}
logger.info("nTcp server coro quit.%d", GetCurrentThreadId());
});
// 客户端生成器
sc.gather([&]()->CoTask {
logger.info("nClients creater coro started.");
int nClient = 0;
for (int i = 0; 1; )
{
if (nClient < totalClientCount) {
i++;
logger.info("nClients creater make client task %d.", i);
nClient++;
// 客户端task
sc.gather([&](int idx)->CoTask {
AsyncConnector con(sc);
logger.debug("nClient[%d] connect wait ...", idx);
auto ret = co_await con.connect("127.0.0.1", serverPort);
if (!ret) {
logger.debug("nClinet[%d] connect server fail, %s", idx, CLString().getLastErrorString(GetLastError()));
co_return;
}
std::vector bufSend(260), bufRecv(260);
DWORD nbytesSend, nbytesRecv;
int total = 1;
while (1) {
std::snprintf(bufSend.data(), bufSend.size(), "nHelle server, this is client %d: %dst request.", idx, total);
logger.debug("nClient[%d] send wait ...", idx);
auto ret = co_await con.send(bufSend.data(), std::strlen(bufSend.data()) + 1, &nbytesSend);
if (!(ret == SOCKET_ERROR || nbytesSend == 0)) {
logger.debug("nClient[%d] recv wait ...", idx);
ret = co_await con.recv(bufRecv.data(), bufRecv.size(), &nbytesRecv);
if (ret == SOCKET_ERROR || nbytesRecv == 0)
break;
logger.debug("nClient[%d] recv server msg = %s", idx, bufRecv.data());
}
total++;
}
logger.debug("nClient[%d] get server close msg and shutdown.", idx);
nClient--;
}, i);
}
else {
logger.debug("nClients creater yield run privilege to coro scheduler.");
co_yield 1;
logger.debug("nClients creater fetch run privilege again.");
}
}
logger.debug("nClients creater coro quit.");
});
// 统计协程
sc.gather([&]()->CoTask {
auto last = totals;
auto lastTime = tk.getSpendTime();
while (1)
{
co_await sleepFor(3000);
auto time = tk.getSpendTime();
logger.info("nSummary coro count %u: total handle %d times (spend time %gs), %g times/per-second.",
sc.taskCount(), totals, time, (totals - last) / (time - lastTime));
last = totals;
lastTime = time;
}
});
sc.run();
}
void test_coroutine_udp_random_broadcast(unsigned short broadCastPort, int totalClientBroadcastCount, bool dumpTestInfo)
{
logger.setLevel(dumpTestInfo ? logger.Debug : logger.Info);
logger.openHeadInfoFlag(false);
srand(time(0));
CoScheduler sc;
int servRun = 0;
int totalsRecvd = 0;
int totalsSendto = 0;
CLTick tk;
std::vector portList(totalClientBroadcastCount);
for (int i = 0; i < totalClientBroadcastCount; i++)portList[i] = broadCastPort + i;
// 服务端生成器
sc.gather([&]()->CoTask {
logger.info("nServers creater coro started.");
int nServer = 0;
for (int i = 0; 1; )
{
if (nServer < totalClientBroadcastCount) {
i++;
logger.info("nServers creater make server task %d.", i);
nServer++;
// 服务端task (广播接收端)
sc.gather([&](int i)->CoTask {
logger.info("nUdp server[%d] coro started bind port = %d...", i, portList[i - 1]);
AsyncReceiver serv(sc, "127.0.0.1", portList[i - 1]);
// recv
std::vector recv(260);
int servId = 0;
int total = 1;
while (true)
{
DWORD nbytesRecv;
logger.debug("nUdp server[%d] recvfrom wait ...", i);
int ret = co_await serv.recvFrom(recv.data(), recv.size(), &nbytesRecv);
if (ret == SOCKET_ERROR || nbytesRecv == 0) {
CLString().getLastErrorMessageBoxExceptSucceed(WSAGetLastError());
break;
}
logger.debug("nUdp server[%d] recvfrom %dst broadcast %u bytes data, msg = %s", i, total, nbytesRecv, recv.data());
total++;
totalsRecvd++;
}
logger.info("nUdp server[%d] coro quit.%d", i, GetCurrentThreadId());
nServer--;
}, i);
}
else {
logger.debug("nServers creater yield run privilege to coro scheduler.");
co_yield 1;
logger.debug("nServers creater fetch run privilege again.");
}
}
logger.debug("nServers creater coro quit.");
});
// 客户端生成器
sc.gather([&]()->CoTask {
logger.info("nClients creater coro started.");
int nClient = 0;
for (int i = 0; 1; )
{
if (nClient < totalClientBroadcastCount) {
i++;
logger.info("nClients creater make broadcastor client task %d.", i);
nClient++;
// 客户端task (广播发送端)
sc.gather([&](int idx)->CoTask {
AsyncBroadcastor broadcast(sc);
std::vector bufSent(260);
DWORD nbytesSent;
int total = 1;
while (1) {
auto randPort = portList[rand() % totalClientBroadcastCount];
std::snprintf(bufSent.data(), bufSent.size(),
"nHelle server, this is broadcastor %d: %dst randon broadcast to port=%d."
, idx, total, randPort);
logger.debug("nBroadcastor[%d] send wait ...", idx);
auto ret = co_await broadcast.sendTo("127.0.0.1", randPort,
bufSent.data(), std::strlen(bufSent.data()) + 1, &nbytesSent);
if (ret == SOCKET_ERROR || nbytesSent == 0) {
break;
}
logger.debug("nBroadcastor[%d] sendto server msg = %s", idx, bufSent.data());
total++;
totalsSendto++;
}
logger.debug("nBroadcastor[%d] send 0 bytes and shutdown.", idx);
nClient--;
}, i);
}
else {
logger.debug("nClients creater yield run privilege to coro scheduler.");
co_yield 1;
logger.debug("nClients creater fetch run privilege again.");
}
}
logger.debug("nClients creater coro quit.");
});
// 统计协程
sc.gather([&]()->CoTask {
auto last = totalsRecvd + totalsSendto;
auto lastTime = tk.getSpendTime();
while (1)
{
co_await sleepFor(3000); // 协程休眠3000毫秒
auto time = tk.getSpendTime();
logger.info("nSummary coro count %u: total handle %d times (spend time %gs), %g times/per-second.",
sc.taskCount(), totalsRecvd + totalsSendto, time, (totalsRecvd + totalsSendto - last) / (time - lastTime));
last = totalsRecvd + totalsSendto;
lastTime = time;
}
});
sc.run();
}
#endif