- 主题:boost/asio的协程调试非常麻烦,有没什么法子
它里边gets啥的,是异步的?把底层的IO都异步化了?
要求在gets而不得的时候,yield。
【 在 Algoquant 的大作中提到: 】
: 使用asio 代码清爽多了,linux裸代码 一堆fd操作 看着就头大。
: #include <iostream>
: #include <memory>
: ...................
--
修改:ylh1969 FROM 221.218.60.*
FROM 221.218.60.*
没看到poll_wait,可能运行不了。
也没看到read里会有异步。
【 在 Algoquant 的大作中提到: 】
: 以下由ds生成:
: #include <iostream>
: #include <thread>
: ...................
--
FROM 221.218.60.*
以下是对应构造函数的部分,大差不差。
【 在 Algoquant 的大作中提到: 】
: 以下由ds生成:
: #include <iostream>
: #include <thread>
: ...................
/***************************************************************
* 线程池模型的入口函数,主线程主要负责资源管理
*应用插件的服务函数在
* extern srvfunc Function[];
*/
int TPOOL_srv(void (*conn_init)(T_Connect *,T_NetHead *),void (*quit)(int),void (*poolchk)(void),int sizeof_gda)
{
int ret,i;
int s;
struct sockaddr_in sin,cin;
struct servent *sp;
char *p;
struct timeval tm;
fd_set efds;
socklen_t leng=1;
int sock=-1;
srvfunc *fp;
TCB *task;//Task Control Block
struct linger so_linger;
signal(SIGPIPE,SIG_IGN);
signal(SIGHUP,SIG_IGN);
signal(SIGINT ,SIG_IGN);
signal(SIGPWR ,quit);
signal(SIGTERM,quit);
p=getenv("SERVICE");
if(!p || !*p) {
ShowLog(1,"缺少环境变量 SERVICE ,不知守候哪个端口!");
quit(3);
}
//测试端口是否被占用
sock=tcpopen("localhost",p);
if(sock>-1) {
ShowLog(1,"端口 %s 已经被占用",p);
close(sock);
sock=-1;
quit(255);
}
ret=tpool_init(sizeof_gda);//先启动线程池,同时初始化TCB,gda是全局数据区,实际上是这个线程的独享数据区。尺寸。
if(ret) return(ret);
for(fp=Function;fp->funcaddr!=0;fp++) rpool.svc_num++;//这个服务器里边有多少个应用函数
other_yield=set_yield(do_yield);//设置异步IO
bzero(&sin,sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = INADDR_ANY;
if(isdigit(*p)){
sin.sin_port=htons((u_short)atoi(p));
} else {
if((sp=getservbyname(p,"tcp"))==NULL){
ShowLog(1,"getsrvbyname %s error",p);
quit(3);
}
sin.sin_port=(u_short)sp->s_port;
}
sock=socket(AF_INET,SOCK_STREAM,0);
if(sock < 0) {
ShowLog(1,"open socket error=%d,%s",errno,
strerror(errno));
quit(3);
}
bind(sock,(struct sockaddr *)&sin,sizeof(sin));
leng=1;
setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&leng,sizeof(leng));
//避免 TIME_WAIT
so_linger.l_onoff=1;
so_linger.l_linger=0;
ret=setsockopt(sock, SOL_SOCKET, SO_LINGER, &so_linger, sizeof so_linger);
if(ret) ShowLog(1,"set SO_LINGER err=%d,%s",errno,strerror(errno));
listen(sock,client_q.max_client);//有多少task就有多少侦听队列,防ddos
ShowLog(0,"main start tid=%lx sock=%d",pthread_self(),sock);
--
修改:ylh1969 FROM 221.218.60.*
FROM 221.218.60.*
这是后边 运行 的部分。
【 在 ylh1969 的大作中提到: 】
: 以下是对应构造函数的部分,大差不差。
: /***************************************************************
: * 线程池模型的入口函数,主线程主要负责资源管理
: ...................
int repeat=0;
leng=sizeof(cin);
while(1) {
do {
FD_ZERO(&efds);
FD_SET(sock, &efds);
//健康检查周期
tm.tv_sec=15;//可以改
tm.tv_usec=0;
ret=select(sock+1,&efds,NULL,&efds,&tm);
if(ret==-1) {
ShowLog(1,"select error %s",strerror(errno));
close(sock);
quit(3);
}
if(ret==0) {
check_TCB_timeout();//体检
if(poolchk) poolchk();
}
} while(ret<=0);
i=get_task_no();//取task号,不得不休,死磕。
task=&client_q.pool[i];
s=accept(sock,(struct sockaddr *)&cin,&leng);
if(s<0) {
ShowLog(1,"%s:accept err=%d,%s",__FUNCTION__,errno,strerror(errno));
client_del(task);
switch(errno) {
case EMFILE://fd用完了,其他线程还要继续工作,主线程休息一下。
case ENFILE:
usleep(30000000);
continue;
default:break;
}
usleep(15000000);
if(++repeat < 20) continue;
ShowLog(1,"%s:network fail! err=%s",__FUNCTION__,strerror(errno));
close(sock);
quit(5);
}
repeat=0;
task->fd=task->conn.Socket=s;
task->timestamp=now_usec();
task->timeout=60;
task->status=-1;
task->conn.only_do=(int (*)())conn_init;
ret=do_epoll(task,EPOLL_CTL_ADD,0);//任务交给其他线程做
}
close(sock);
}
--
修改:ylh1969 FROM 221.218.60.*
FROM 221.218.60.*
asio 还是难用啊。看看人家 python 和 go 写的网络通信代码。
【 在 Algoquant 的大作中提到: 】
: 使用asio 代码清爽多了,linux裸代码 一堆fd操作 看着就头大。
: #include <iostream>
: #include <memory>
: ...................
--
FROM 59.61.198.*
取context,下标,不得不休,是抵抗DDOS重要一环。
【 在 ylh1969 的大作中提到: 】
: int repeat=0;
: leng=sizeof(cin);
: while(1) {
: ...................
static int get_task_no()
{
int i,ret;
struct timespec abstime;
abstime.tv_sec=0;
pthread_mutex_lock(&client_q.mut);
while(0>(i=TCB_get(&client_q.free_q))) {
if(abstime.tv_sec==0) ShowLog(1,"%s:超过最大连接数!",__FUNCTION__);
clock_gettime(CLOCK_REALTIME, &abstime);
abstime.tv_sec+=5;
ret=pthread_cond_timedwait(&client_q.cond,&client_q.mut,&abstime);
if(ret==ETIMEDOUT) {
pthread_mutex_unlock(&client_q.mut);
check_TCB_timeout();
pthread_mutex_lock(&client_q.mut);
}
}
pthread_mutex_unlock(&client_q.mut);
return i;
}
--
FROM 221.218.60.*
【 在 ylh1969 的大作中提到: 】
: 取context,下标,不得不休,是抵抗DDOS重要一环。
: static int get_task_no()
: {
: ...................
抵抗DDOS的依据是,攻击链接要么不发信息,要么发的信息不符合要求,被判定为攻击包,这立刻就会被踢。
不发包的,超时被踢,这样就归还了context。
这部分工作由主线程完成。成功的连接在epoll里,被多个线程wait。M个连接被N个线程守护。ddos攻击包对线程池影响很小,正常已经通过认证的连接,其工作由工作线程承担,基本不受影响。
--
修改:ylh1969 FROM 221.218.60.*
FROM 221.218.60.*
【 在 Algoquant 的大作中提到: 】
: 以下由ds生成:
: #include <iostream>
: #include <thread>
: ...................
// 将 context 投入线程池
pool.enqueue([this, ctx] { handleConnection(ctx); });
没必要嘛。线程epoll_wait就行了呀,得到了被激活的fd,就得到了相应的context。
而且,不能accept一个就投入一个,10000个accept,就投进去10000个,你那几个线程,吃得了那么多吗?
workers.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex);
condition.wait(lock, [this] { return stop || !tasks.empty(); });
这个condition.wait,似乎应该改成epoll_wait.,然后从激活的event里提取context。每次处理一个context。不需要每个线程的queue。
M个context,accept一个,丢到epoll里一个。N个线程一起epoll_wait,谁逮到哪个context处理哪个。
context在各个线程间游走。
我管这个叫做:抛绣球模型,context就是绣球,线程是工人,epoll是流水线,工人接到一个绣球就处理,处理到需要IO就抛进流水线。所以,context里有一个callback,用于支持抛绣球模型。同时里边也有一个ucontext用于支持协程。线程得到context后先看有没有callback,有就调用。否则看看是否异步IO,是就resume。再看是否新请求,如果是新请求,就建立协程然后在协程里进入工作函数。啥都不是就直接进入工作函数。
你用ds给的程序没看到协程。
--
修改:ylh1969 FROM 221.218.60.*
FROM 221.218.60.*
【 在 ylh1969 的大作中提到: 】
:
: // 将 context 投入线程池
: pool.enqueue([this, ctx] { handleConnection(ctx); });
: ...................
每个线程完成初始化后进入死循环。
while(1) {
//从就绪队列取一个任务,解决非IO事件,如获取连接池,获取数据库句柄等等。
pthread_mutex_lock(&rpool.mut);
while(!(task=rdy_get())) {//留1个线程在此即可,其余去epoll_wait
if(rpool.flg >= tpool.rdy_num) break;
rpool.flg++;
ret=pthread_cond_wait(&rpool.cond,&rpool.mut); //没有任务,等待
rpool.flg--;
}
pthread_mutex_unlock(&rpool.mut);
if(task) {
if(!task->AIO_flg && !task->call_back) {
task->fd=task->conn.Socket;
ShowLog(5,"%s:tid=%lx,TCB_no=%d from rdy_queue",__FUNCTION__,
pthread_self(),task->sv.TCB_no);
if(task->fd>=0) {
do_epoll(task,0,0);
}
continue;
}
} else {
fds = epoll_wait(g_epoll_fd, &event, 1 , -1);
if(fds < 0){
ShowLog(1,"%s:epoll_wait err=%d,%s",__FUNCTION__,errno,strerror(errno));
usleep(30000000);
continue;
}
task = (TCB *)event.data.ptr;
if(task->events) {
ShowLog(1,"%s:tid=%lx,TCB_no=%d,task->events=%08X,conflict!",__FUNCTION__,
pthread_self(),task->sv.TCB_no,task->events);//发现惊群
task=NULL;
continue;//丢掉它
}
task->events=event.events;
}
rs->timestamp=now_usec();
--
修改:ylh1969 FROM 221.218.60.*
FROM 221.218.60.*
accept后fd没必要设置nonblock。
认证里的recv,怎么处理这个nonblock的fd?要是对方不发信息呢?
协程在哪里?异步在哪里?
【 在 Algoquant 的大作中提到: 】
: 以下由ds生成:
: #include <iostream>
: #include <thread>
: ...................
--
修改:ylh1969 FROM 221.218.60.*
FROM 221.218.60.*