SIGALRM信号:利用该信号的信号处理函数来处理定时任务
基于升序链表的定时器:
#ifndef LST_TIMER #define LST_TIMER #include <time.h> #include <netinet/in.h> #include <stdio.h> #define BUFFER_SIZE 64 class util_timer; //用户数据 struct client_data { sockaddr_in address; int sockfd; char buf[BUFFER_SIZE]; util_timer* timer; }; //定时器类:双向链表的结点 class util_timer { public: util_timer():prev(NULL),next(NULL){} //任务的超时时间 time_t expire; //任务回调函数 void (*cb_func)(client_data*); //回调函数处理的客户数据 client_data* user_data; util_timer* prev; util_timer* next; }; //定时器链表 class sort_time_lst { public: sort_time_lst():head(NULL),tail(NULL){} ~sort_time_lst() { util_timer* tmp = head; while(tmp) { head = tmp -> next; delete tmp; tmp = head; } } void add_timer(util_timer* timer) { if(!timer) { return; } if(!head) { head = tail = timer; return; } if(timer -> expire < head -> expire) { timer->next = head; head -> prev = timer; head = timer; return; } add_timer(timer,head); } //定时器的超时时间延长,向后调整 void adjust_timer(util_timer* timer) { if(!timer) { return; } util_timer* tmp = timer -> next; if(!tmp || (timer -> expire < tmp -> expire)) { return; } if(timer == head) { head = head->next; head->prev = NULL; timer->next = NULL; add_timer(timer,head); } else { timer->prev->next = timer->next; timer->next->prev = timer->prev; add_timer(timer,timer->next); } } void del_timer(util_timer* timer) { if(!timer) { return; } if((timer == head)&&(timer == tail)) { delete timer; head = NULL; tail = NULL; return; } if(timer == head) { head = head->next; head->prev = NULL; delete timer; return; } if(timer == tail) { tail = tail->prev; tail->next = NULL; delete timer; return; } timer->prev->next = timer->next; timer->next->prev = timer->prev; delete timer; } //SIGALRM信号每次被触发就在其信号处理函数中执行一次tick函数,来处理链表上到期的任务 void tick() { if(!head) { return; } printf("timer tick\n"); time_t cur = time(NULL); util_timer* tmp = head; while(tmp) { if(cur < tmp->expire) { break; } tmp->cb_func(tmp->user_data); head = tmp->next; if(head) { head->prev = NULL; } delete tmp; tmp = head; } } private: void add_timer(util_timer* timer,util_timer* lst_head) { util_timer* prev = lst_head; util_timer* tmp = prev->next; while(tmp) { if(timer->expire < tmp->expire) { prev->next = timer; timer->next = tmp; tmp->prev = timer; timer->prev = prev; break; } prev = tmp; tmp = tmp->next; } if(!tmp) { prev->next = timer; timer->prev = prev; timer->next = NULL; tail = timer; } } util_timer* head; util_timer* tail; }; #endif
利用上述定时器链表来处理非活动连接:
#include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <signal.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <stdlib.h> #include <sys/epoll.h> #include "lst_timer.h" #include <fcntl.h> #define FD_LIMIT 65535 #define MAX_EVENT_NUMBER 1024 #define TIMESLOT 5 static int pipefd[2]; static sort_time_lst timer_lst; static int epollfd = 0; //addsig添加SIGALRM信号,sighandler通知主进程,epoll_wait通过管道接收信号,timer_handler处理信号 int setnonblocking(int fd) { int old_option = fcntl(fd,F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd,F_SETFL,new_option); return old_option; } void addfd(int epollfd,int fd) { epoll_event event; event.data.fd = fd; //文件描述符可读,边缘触发 event.events = EPOLLIN | EPOLLET; epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event); setnonblocking(fd); } //信号处理函数利用管道通知主循环执行定时器链表上的定时任务 void sig_handler(int sig) { int save_errno = errno; int msg = sig; send(pipefd[1],(char *)&msg,1,0); errno = save_errno; } void addsig(int sig) { struct sigaction sa; memset(&sa,‘\0‘,sizeof(sa)); sa.sa_handler = sig_handler; sa.sa_flags |= SA_RESTART; sigfillset(&sa.sa_mask); //对sig信号进行处理 assert(sigaction(sig,&sa,NULL) != -1); } void timer_handler() { timer_lst.tick(); //alarm触发SIGALRM信号 alarm(TIMESLOT); } //定时器回调函数,删除非活动连接 void cb_func(client_data* user_data) { epoll_ctl(epollfd,EPOLL_CTL_DEL,user_data->sockfd,0); assert(user_data); close(user_data->sockfd); printf("close fd %d\n",user_data->sockfd); } int main(int argc,char* argv[]) { if(argc <= 2) { printf("usage:%s ip_address port_number\n",basename(argv[0])); return 1; } const char* ip = argv[1]; int port = atoi(argv[2]); int ret = 0; struct sockaddr_in address; bzero(&address,sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET,ip,&address.sin_addr); address.sin_port = htons(port); int listenfd = socket(PF_INET,SOCK_STREAM,0); assert(listenfd >= 0); ret = bind(listenfd,(struct sockaddr*)&address,sizeof(address)); assert(ret != -1); ret = listen(listenfd,5); assert(ret != -1); epoll_event events[MAX_EVENT_NUMBER]; int epollfd = epoll_create(5); assert(epollfd != -1); addfd(epollfd,listenfd); //socketpair创建一对套接字描述符,存储于pipefd ret = socketpair(PF_UNIX,SOCK_STREAM,0,pipefd); assert(ret != -1); //pipefd[1]写 setnonblocking(pipefd[1]); //pipefd[0]读 addfd(epollfd,pipefd[0]); addsig(SIGALRM); addsig(SIGTERM); bool stop_server = false; client_data* users = new client_data[FD_LIMIT]; bool timeout = false; alarm(TIMESLOT); while(!stop_server) { int number = epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1); if ((number < 0) && (errno != EINTR)) { printf("epoll failure\n"); break; } for(int i = 0; i < number;i++) { int sockfd = events[i].data.fd; //处理新到的客户端连接 if(sockfd == listenfd) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); int connfd = accept(listenfd,(struct sockaddr*)&client_address,&client_addrlength); addfd(epollfd,connfd); users[connfd].address = client_address; users[connfd].sockfd = connfd; //初始化定时器结点 util_timer* timer = new util_timer; timer->user_data = &users[connfd]; timer->cb_func = cb_func; time_t cur =time(NULL); timer->expire = cur + 3 * TIMESLOT; users[connfd].timer = timer; //将结点添加到链表中 timer_lst.add_timer(timer); } //处理管道发来的信号 else if((sockfd == pipefd[0])&&(events[i].events & EPOLLIN)) { int sig; char signals[1024]; ret = recv(pipefd[0],signals,sizeof(signals),0); if(ret == -1) { continue; } else if(ret == 0) { continue; } else { for(int i = 0;i < ret; ++i) { switch(signals[i]) { case SIGALRM: { timeout = true; break; } //程序结束信号 case SIGTERM: { stop_server = true; } } } } } //处理客户连接上接收到的数据 else if(events[i].events & EPOLLIN) { memset(users[sockfd].buf,‘\0‘,BUFFER_SIZE); ret = recv(sockfd,users[sockfd].buf,BUFFER_SIZE - 1,0); printf("get %d bytes of client data %s from %d\n",ret,users[sockfd].buf,sockfd); util_timer* timer = users[sockfd].timer; //读错误 if(ret < 0) { if(errno != EAGAIN) { cb_func(&users[sockfd]); if(timer) { timer_lst.del_timer(timer); } } } //没有数据,对方已经关闭连接 else if(ret == 0) { cb_func(&users[sockfd]); if(timer) { timer_lst.del_timer(timer); } } //有数据可读,调整该连接对应的定时器 else { if(timer) { time_t cur = time(NULL); timer->expire = cur + 3*TIMESLOT; printf("adjust timer once\n"); timer_lst.adjust_timer(timer); } } } else { } } //处理定时事件 if(timeout) { timer_handler(); timeout = false; } } close(listenfd); close(pipefd[1]); close(pipefd[0]); delete []users; return 0; }
高性能定时器:
1.时间轮
基于排序链表的定时器使用唯一的一条链表来管理所有定时器,插入操作的效率随着定时器数目的增多而降低,时间轮使用hash表的思想,将定时器散列到不同的链表上,插入操作的效率基本不受定时器数目的影响
#ifndef TIME_WHEEL_TIMER #define TIME_WHEEL_TIMER #include <time.h> #include <netinet/in.h> #include <stdio.h> #define BUFFER_SIZE 64 class tw_timer; //绑定socket和定时器 struct client_data { sockaddr_in address; int sockfd; char buf[BUFFER_SIZE]; tw_timer *timer; }; //定时器类 class tw_timer { public: tw_timer(int rot,int ts):next(NULL),prev(NULL),rotation(rot),time_slot(ts){} int rotation;//记录定时器在时间轮转多少圈后生效 int time_slot;//记录定时器属于时间轮上哪个槽 void (*cb_func)(client_data*);//定时器回调函数 client_data *user_data; tw_timer *next; tw_timer *prev; }; class time_wheel { public: time_wheel():cur_slot(0) { for(int i = 0;i < N; i++) { slots[i] = NULL;//初始化所有链表头 } } ~time_wheel() { for(int i = 0;i < N; i++) { tw_timer* tmp = slots[i]; while(tmp) { slots[i] = tmp->next; delete tmp; tmp = slots[i]; } } } //根据定时值timeout创建定时器,并插入合适的链表中 tw_timer* add_timer(int timeout) { if(timeout < 0) { return NULL; } int ticks = 0;//ticks表示多少个心跳时间 if(timeout < SI)//SI表示一个心跳时间 { ticks = 1; } else { ticks = timeout / SI; } int rotation = ticks / N; int ts = (cur_slot + (ticks % N)) % N; tw_timer *timer = new tw_timer(rotation,ts); if( !slots[ts]) { slots[ts] = timer; } else { timer->next = slots[ts]; slots[ts]->prev = timer; slots[ts] = timer; } return timer; } //删除定时器timer void del_timer(tw_timer *timer) { if(!timer) { return; } int ts = timer->time_slot; if(timer == slots[ts]) { slots[ts] = slots[ts]->next; if(slots[ts]) { slots[ts]->prev = NULL; } delete timer; } else { timer->prev->next = timer->next; if(timer->next) { timer->next->prev = timer->prev; } delete timer; } } //SI时间到后,调用该函数,时间轮向前滚动一个槽的间隔 void tick() { tw_timer* tmp = slots[cur_slot]; //首先处理当前链表中到期的定时器 while(tmp) { //该定时器还未到时间 if(tmp->rotation > 0) { tmp->rotation--; tmp = tmp->next; } //该定时器到期 else { tmp->cb_func(tmp->user_data); if(tmp == slots[cur_slot]) { slots[cur_slot] = tmp->next; delete tmp; if(slots[cur_slot]) { slots[cur_slot]->prev = NULL; } tmp = slots[cur_slot]; } else { tmp->prev->next = tmp->next; if(tmp->next) { tmp->next->prev = tmp->prev; } tw_timer *tmp2 = tmp->next; delete tmp; tmp = tmp2; } } } cur_slot = ++cur_slot % N; } private: static const int N = 60;//时间轮上槽的数目 static const int SI = 1;//时间槽间隔 tw_timer *slots[N];//链表头的集合 int cur_slot;//当前槽 }; #endif
2.时间堆
以上方案都是以固定的频率调用心跳函数tick,并在其中依次检测到期的定时器,然后执行到期定时器上的回调函数。另一种思路是将所有定时器中超时时间最小的的一个定时器的超时值作为心跳间隔,一旦心跳函数tick被调用,超时时间最小的定时器必然到期,就可以在tick函数中处理该定时器,然后,再次从剩余的定时器中找出超时时间最小的一个,并将这段最小时间设置为下一次心跳间隔。
原文:http://www.cnblogs.com/ljygoodgoodstudydaydayup/p/5413400.html