关键词:fasync_helper、kill_async、sigsuspend、sigaction、fcntl、F_SETOWN_EX、F_SETSIG、select()、poll()、poll_wait()等。
《Linux/UNIX系统编程手册》第63章主要介绍了select()/poll()、信号驱动IO、epoll三方面,以及他们之间异同、优劣点。
这里准备结合项目中遇到的问题,分两个方向进行归纳总结。一是一个IO模型从测试程序、API、内核实现进行纵向分析;二是横向不同IO模型的优缺点对比。
IO多路复用允许进程同时检查多个文件描述符以找出它们中的任何一个是否可执行IO操作。系统调用select()和poll()用来执行IO多路复用。
信号驱动IO是指当有输入或者数据可以写到指定的文件描述符上时,内核向请求数据的进程发送一个信号。进程可以处理其他的任务,当IO操作可执行时通过接收信号来获得通知。当同时检查大量的文件描述符时,信号驱动IO相比select()和poll()有显著的性能提升。
epoll API是Linux专有的特性,首次出现在Linux 2.6中。同IO多路复用API一样,epoll API允许进程同时检查多个文件描述符,看其中任意一个是否能执行IO操作。通信号驱动IO一样,当同时检查大量文件描述符时,epoll能提供更好的性能。
在实际应用中使用那种技术,下面是一些要点:
讨论多种IO机制之前,首先区分两种文件描述符准备就绪的通知模式。
水平触发通知:如果文件描述符上可以非阻塞地执行IO系统调用,此时认为它已经就绪。
边缘触发通知:如果文件描述符自上次状态检查以来有了新的IO活动,此时需要触发通知。
IO模式 | 水平触发 | 边缘触发 |
select()/poll() | √ | |
信号驱动IO | √ | |
epoll | √ | √ |
当采用水平触发通知时,可以在任意时刻检查文件描述符的就绪状态。表示当文件描述符处于就绪态时,就可以对其执行一些IO操作;然后重复检查文件描述符。看看是否仍然处于就绪态,此时可以执行更多的IO。由于水平触发模式允许我们在任意时刻重复检查IO状态,没有必要每次当文件描述符就绪后需要尽可能多地址性IO。
当采用边缘触发时,只有当IO事件发生时才会收到通知,因此:
信号驱动IO中,当文件描述符上可执行IO操作时,进程请求内核为自己发送一个信号。
进程可以执行其他任何任务直到IO就绪位置,此时内核会发送信号给进程。
信号驱动IO的使用遵循一定的步骤:
1.为内核发送通知信号安装一个信号处理例程;通常是SIGIO,但在多线程环境下可以指定自定义的实时信号。
2.设定文件描述符属主,也就是当文件描述符上可执行IO操作时会接收通知信号的进程或进程组。通过fcntl()命令F_SETOWN或者F_SETOWN_EX来制定。
3.通过设定O_NONBLOCK标志是能非阻塞IO。
4.通过O_ASYNC标志是能信号驱动IO。
5.然后进程可以处理其他任务,当有信号到达时对应的信号处理函数就会被调用
6.通常情况下,如果需要等待IO操作,可以通过sigsuspend()进行等待,此时进程会放弃
首先构造内核驱动和用户空间测试程序,然后针对测试程序进行详细的分析。
创建signal_kernel.c作为模组插入内核,signal_user.c作为用户空间测试程序。
创建/dev/signal0设备,通过wirte可以让内核发送信号到用户空间;wirte之后立即进入sigsuspend()等待。
#include <linux/kernel.h> #include <linux/module.h> #include <linux/init.h> #include <linux/cdev.h> #include <linux/device.h> #include <linux/fs.h> #include <linux/uaccess.h> static struct class *signal_class; static int signal_major; #define SIGNAL_NAME "signal" struct timer_list signal_timer; static struct fasync_struct *signal_async; int signal_count = 0; static unsigned char signal_text[30]; static int signal_release(struct inode *inode, struct file *file) { return 0; } static void dummy_timer(unsigned long data) { //printk("Send SIGIO signal.\n"); kill_fasync (&signal_async, SIGIO, POLL_IN);----------------------------------------发送异步信号给fa绑定的进程。 signal_count++; } static int signal_open(struct inode *inode, struct file *file) { return 0; } static ssize_t signal_read(struct file * file, char __user * buf, size_t count, loff_t *ppos) { int size; size = sprintf(signal_text, "%d", signal_count); copy_to_user(buf, signal_text, size); return size; } static ssize_t signal_write(struct file * file, const char __user * buf, size_t count, loff_t *ppos) { char str[30]; copy_from_user(str, buf,count); printk("Receive %s from user.\n",str); dummy_timer(0); return 0; } static int signal_fasync (int fd, struct file *filp, int on) { return fasync_helper(fd, filp, on, &signal_async);----------------------------------创建fasync_struct并插入到当前fasync链表中。 } static const struct file_operations signal_fops = { .owner = THIS_MODULE, .open = signal_open, .release = signal_release, .read = signal_read, .write = signal_write, .fasync = signal_fasync,----------------------------------------------------------当用户空间设置FASYNC的时候调用fasync函数。 }; static int __init signal_test_init(void) { struct device *signal_device; signal_major = register_chrdev(0, SIGNAL_NAME, &signal_fops); if (signal_major < 0) { pr_err("register_chrdev failed\n"); goto err; } signal_class = class_create(THIS_MODULE, SIGNAL_NAME); if (IS_ERR(signal_class)) { pr_err("device class file already in use\n"); goto err_class; } signal_device = device_create(signal_class, NULL, MKDEV(signal_major, 0), NULL, "%s%d", SIGNAL_NAME, 0); if (IS_ERR(signal_device)) { pr_err("failed to create device\n"); goto err_device; } return 0; err_device: class_destroy(signal_class); err_class: unregister_chrdev(signal_major, SIGNAL_NAME); err: return 0; } static void __exit signal_test_exit(void) { del_timer(&signal_timer); device_destroy(signal_class, MKDEV(signal_major, 0)); class_destroy(signal_class); unregister_chrdev(signal_major, SIGNAL_NAME); } module_init(signal_test_init); module_exit(signal_test_exit); MODULE_LICENSE("GPL");
对应的Makefile如下:
CONFIG_MODULE_SIG=n EXTRA_CFLAGS += -D_GNU_SOURCE---------------------------------------------针对F_SETOWN_EX等扩展命令需要定义此宏。 obj-m := signal_kernel.o KERN_DIR := /lib/modules/$(shell uname -r)/build PWD := $(shell pwd) all: $(MAKE) -C $(KERN_DIR) M=$(PWD) modules gcc $(EXTRA_CFLAGS) signal_user.c -o signal_user -pthread clean: $(MAKE) -C $(KERN_DIR) M=$(PWD) modules clean rm signal_user modules_install: $(MAKE) -C $(KERN_DIR) M=$(PWD) modules_install
用户空间测试程序signal_user.c如下。
测试程序创建一个sigio线程专门处理信号,重点是进行信号处理前进行一系列设置;然后调用sigsuspend()进入睡眠,等待内核信号唤醒。
在实际应用中,有可能是中断调用kill_fasync()发送信号。
#include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <stdio.h> #include <poll.h> #include <signal.h> #include <sys/types.h> #include <unistd.h> #include <fcntl.h> #include <time.h> #include <pthread.h> #include <sys/prctl.h> #include <sys/syscall.h> #include <memory.h> int fd; char *filename = "/dev/signal0"; char signal_response[] = "OK"; unsigned int handle_count = 0; #define LOOP_COUNT 10 #define SIGTEST (SIGRTMIN+1) void sig_handler(int sig, siginfo_t *si, void *uc) { unsigned char signal_count[30]; memset(signal_count, 30, 0); read(fd, &signal_count, 0); handle_count ++; printf(">>>>>PID %ld Receive sig=%d count=%d.\n", syscall(SYS_gettid), sig, handle_count); //Trigger a SIGTEST signal. if(handle_count < LOOP_COUNT) write(fd, signal_response, sizeof(signal_response));------------------重复触发内核发送SIGTEST信号。 //if(atoi(signal_count) != handle_count) //printf("%s: receive_count=%d, handle_count=%d\n", __func__, atoi(signal_count), handle_count); } void *pthread_func(void *arg) { int Oflags; struct timespec time1, time2; struct sigaction sa; sigset_t set, oldset; unsigned long int duration; struct f_owner_ex owner_ex; //Set thread name. prctl(PR_SET_NAME,"sigio"); printf("main=%d, sigio=%ld.\n", getpid(), syscall(SYS_gettid)); //Set SIGTEST actiong. memset(&sa, 0, sizeof(sa)); sa.sa_sigaction = sig_handler; sa.sa_flags |= SA_RESTART | SA_SIGINFO;------------------------------------这里必须使用sa_sigaction和SA_SIGINFO。 sigaction(SIGTEST, &sa, NULL);---------------------------------------------设置SIGTEST信号处理函数。 //Set proc mask. sigemptyset(&set); sigprocmask(SIG_SETMASK, &set, NULL); sigaddset(&set, SIGTEST); fd = open(filename, O_RDWR); if (fd < 0) { printf("Can‘t open %s!\n", filename); } owner_ex.pid = syscall(SYS_gettid); owner_ex.type = F_OWNER_TID; fcntl(fd, F_SETOWN_EX, &owner_ex);------------------------------------------将fd文件句柄和当前线程绑定;如果设置F_SETOWN则是和线程组绑定,这两者区别后续重点介绍。 Oflags = fcntl(fd, F_GETFL); fcntl(fd, F_SETFL, Oflags | FASYNC);----------------------------------------对应内核驱动file_operations的fasync成员,给当前fd创建一个fasync_struct。 fcntl(fd, F_SETSIG, SIGTEST);-----------------------------------------------设置SIGTEST替代SIGIO作为信号发送。 clock_gettime(CLOCK_REALTIME, &time1); write(fd, signal_response, sizeof(signal_response));------------------------触发内核发送SIGTEST信号。 //pthread_sigmask(SIG_BLOCK, &set, &oldset); sigprocmask(SIG_BLOCK, &set, &oldset); while(handle_count < LOOP_COUNT) { //Will suspend here, and later code will be executed after SIGTEST handler. sigsuspend(&oldset);----------------------------------------------------在此处睡眠,等待信号来唤醒。 } //pthread_sigmask(SIG_UNBLOCK, &set, NULL); sigprocmask(SIG_UNBLOCK, &set, NULL); clock_gettime(CLOCK_REALTIME, &time2); duration =(time2.tv_sec-time1.tv_sec)*1000000000 + (time2.tv_nsec-time1.tv_nsec); printf("End time %ld.%ld\n", duration/1000000000, duration%1000000000); close(fd); pthread_exit(0); } void main(int argc, char **argv) { pthread_t tidp; sigset_t set; sigemptyset(&set); sigaddset(&set, SIGTEST); sigprocmask(SIG_BLOCK, &set, NULL); if(pthread_create(&tidp, NULL, pthread_func, NULL) == -1) { printf("Create pthread error.\n"); return; } if(pthread_join(tidp, NULL)) { printf("Join pthread error.\n"); return; } printf("Main exit.\n"); return; }
将编译的module signal_kernel.ko插入内核,然后用户空间执行sudo ./signal_user结果如下。
main=10485, sigio=10486. >>>>>PID 10486 Receive sig=35 count=1. >>>>>PID 10486 Receive sig=35 count=2. >>>>>PID 10486 Receive sig=35 count=3. >>>>>PID 10486 Receive sig=35 count=4. >>>>>PID 10486 Receive sig=35 count=5. >>>>>PID 10486 Receive sig=35 count=6. >>>>>PID 10486 Receive sig=35 count=7. >>>>>PID 10486 Receive sig=35 count=8. >>>>>PID 10486 Receive sig=35 count=9. >>>>>PID 10486 Receive sig=35 count=10. End time 0.103711 Main exit.
首先整个流程在用户空间配置;然后由内核发起,用户空间处理。
除了signal(),sigaction()是另一种设置信号处置的选择。sigaction()更加复杂,但也根据灵活性。
#include <signal.h> int sigaction(int sig, const struct sigaction *act, struct sigaction *oldact);
sig参数标识想要获取或改变的信号编号,可以使处SIGKILL和SIGSTOP之外任何信号。
act指向描述信号新处置数据结构,oldact用于返回之前信号处置相关信息。两者都可以设置为NULL。
struct sigaction { void (*sa_handler)(int); /* Address of handler */ sigset_t sa_mask; /* Signals blocked during handler invocation */ int sa_flags; /* Flags controlling handler invocation */ void (*sa_restorer)(void); /* Not for application use */ };
sa_handler指向sig对应的处理函数。
sa_mask定义一组新号,不允许它们中断此处理程序的执行。调用信号处理程序之前,将该组信号中其他还没有处于进程掩码之列的信号添加到进程掩码中;在信号处理返回之后再将,之前添加的掩码移除。这就保证,在处理程序执行期间,sa_mask所有信号不会中断此处理程序。
引发处理程序自身的信号将自动添加到进程掩码中,这意味着信号处理程序不允许嵌套。当正在执行处理程序时,如果同一信号第二次抵达,将不会递归中断自己。
由于标准信号(sig < SIGRTMIN)信号处理程序不会被队列化,所以标准信号处理程序执行期间第二次达到的同一信号将会被忽略。
但是事实信号不会存在这种现象。
struct sighand_struct { atomic_t count; struct k_sigaction action[_NSIG];------------------------------------------------一共_NSIG,即64个信号1-31是标准信号,32-64是实时信号。 spinlock_t siglock; wait_queue_head_t signalfd_wqh; }; int do_sigaction(int sig, struct k_sigaction *act, struct k_sigaction *oact) { struct task_struct *p = current, *t; struct k_sigaction *k; sigset_t mask; if (!valid_signal(sig) || sig < 1 || (act && sig_kernel_only(sig))) return -EINVAL; k = &p->sighand->action[sig-1];------------------------------------------------------将k指向当前进程sig对应的k_sigaction结构。 spin_lock_irq(&p->sighand->siglock); if (oact) *oact = *k; sigaction_compat_abi(act, oact); if (act) { sigdelsetmask(&act->sa.sa_mask, sigmask(SIGKILL) | sigmask(SIGSTOP)); *k = *act;-----------------------------------------------------------------------将act和k关联。 ... } spin_unlock_irq(&p->sighand->siglock); return 0; }
F_SETOWN是标准fcntl;F_SETOWN_EX是Linux扩展命令,如果要使用需要加上_D_GNU_SOURCE。
这两个命令都通过fcntl()系统调用,设置文件控制操作。
struct fown_struct是内核中关联文件句柄和其拥有者信息的结构体:
struct fown_struct { rwlock_t lock; /* protects pid, uid, euid fields */ struct pid *pid; /* pid or -pgrp where SIGIO should be sent */----------------------拥有此文件SIGIO信号的进程。 enum pid_type pid_type; /* Kind of process group SIGIO should be sent to */---------进程类型。 kuid_t uid, euid; /* uid/euid of process setting the owner */ int signum; /* posix.1b rt signal to be delivered on IO */----------------------kill_fasync()发送的信号。 };
下面就来分析这两命令的区别,以及他们是如何影响信号处理行为的。
enum pid_type { PIDTYPE_PID, PIDTYPE_PGID, PIDTYPE_SID, PIDTYPE_MAX, /* only valid to __task_pid_nr_ns() */ __PIDTYPE_TGID }; static long do_fcntl(int fd, unsigned int cmd, unsigned long arg, struct file *filp) { long err = -EINVAL; switch (cmd) { ... case F_GETFL:------------------------------------------------获取、设置文件句柄的f_flags。 err = filp->f_flags; break; case F_SETFL: err = setfl(fd, filp, arg); break; ...
case F_GETOWN:-----------------------------------------------获取、设置文件句柄的拥有者进程pid。 err = f_getown(filp); force_successful_syscall_return(); break; case F_SETOWN: f_setown(filp, arg, 1); err = 0; break; case F_GETOWN_EX:--------------------------------------------进阶获取、设置文件句柄拥有者进程pid。 err = f_getown_ex(filp, arg); break; case F_SETOWN_EX: err = f_setown_ex(filp, arg); break; ... case F_GETSIG:-----------------------------------------------设置文件句柄和其拥有者之间异步通信的信号。 err = filp->f_owner.signum; break; case F_SETSIG: if (!valid_signal(arg)) { break; } err = 0; filp->f_owner.signum = arg; break; ... } return err; } pid_t f_getown(struct file *filp) { pid_t pid; read_lock(&filp->f_owner.lock); pid = pid_vnr(filp->f_owner.pid); if (filp->f_owner.pid_type == PIDTYPE_PGID)-----------------如果是进程组返回其负值。 pid = -pid; read_unlock(&filp->f_owner.lock); return pid; } void f_setown(struct file *filp, unsigned long arg, int force) { enum pid_type type; struct pid *pid; int who = arg; type = PIDTYPE_PID; if (who < 0) { type = PIDTYPE_PGID;-----------------------------------默认类型是进程,如果who为负,则是进程组。 who = -who; } rcu_read_lock(); pid = find_vpid(who); __f_setown(filp, pid, type, force); rcu_read_unlock(); } static int f_getown_ex(struct file *filp, unsigned long arg) { struct f_owner_ex __user *owner_p = (void __user *)arg; struct f_owner_ex owner; int ret = 0; read_lock(&filp->f_owner.lock); owner.pid = pid_vnr(filp->f_owner.pid); switch (filp->f_owner.pid_type) {-------------------------相对于F_SETOWN,多了pid_type类型设置。其中PIDTYPE_MAX表示该文件句柄被一个线程所拥有。而不是进程或者进程组。这点对于信号究竟发给谁,有着非常重要的作用。 case PIDTYPE_MAX: owner.type = F_OWNER_TID; break; case PIDTYPE_PID: owner.type = F_OWNER_PID; break; case PIDTYPE_PGID: owner.type = F_OWNER_PGRP; break; default: WARN_ON(1); ret = -EINVAL; break; } read_unlock(&filp->f_owner.lock); if (!ret) { ret = copy_to_user(owner_p, &owner, sizeof(owner)); if (ret) ret = -EFAULT; } return ret; } static int f_setown_ex(struct file *filp, unsigned long arg) { struct f_owner_ex __user *owner_p = (void __user *)arg; struct f_owner_ex owner; struct pid *pid; int type; int ret; ret = copy_from_user(&owner, owner_p, sizeof(owner)); if (ret) return -EFAULT; switch (owner.type) { case F_OWNER_TID: type = PIDTYPE_MAX; break; case F_OWNER_PID: type = PIDTYPE_PID; break; case F_OWNER_PGRP: type = PIDTYPE_PGID; break; default: return -EINVAL; } rcu_read_lock(); pid = find_vpid(owner.pid); if (owner.pid && !pid) ret = -ESRCH; else __f_setown(filp, pid, type, 1); rcu_read_unlock(); return ret; } static void f_modown(struct file *filp, struct pid *pid, enum pid_type type, int force) { write_lock_irq(&filp->f_owner.lock); if (force || !filp->f_owner.pid) { put_pid(filp->f_owner.pid); filp->f_owner.pid = get_pid(pid); filp->f_owner.pid_type = type; if (pid) { const struct cred *cred = current_cred(); filp->f_owner.uid = cred->uid; filp->f_owner.euid = cred->euid; } } write_unlock_irq(&filp->f_owner.lock); } void __f_setown(struct file *filp, struct pid *pid, enum pid_type type, int force)----------------------------------------------------------------无论是F_SETOWN还是F_SETOWN_EX两者的force都是1。 { security_file_set_fowner(filp); f_modown(filp, pid, type, force); }
所以F_SETOWN和F_SETOWN_EX主要区别就在于,F_SETOWN_EX可以设置进程的类型。这对于后续信号的发送,有重要作用。
F_SETOWN_EX更加细致,可以指定只发送给某个线程;而F_SETOWN优先发送给线程。如果线程被阻塞,则选择同一进程中的其他线程接收。
相关问题调试详见:《sigsuspend()阻塞:异步信号SIGIO为什么会被截胡?》。
如果要使用实时信号替代SIGIO作为kill_fasync()作为信号发送,可以设置F_SETSIG。
参照do_fcntl(),也即通过设置fown_struct的signum值。
如果需要在信号处理函数中获取更多信息,还需要在sa.sa_flags增加SA_SIGINFO标志。
struct sigaction { union { void (*sa_handler)(int); void (*sa_sigaction)(int, siginfo_t *, void *); } __sigaction_handler; sigset_t sa_mask; int sa_flags; void (*sa_restorer)(void); };
在定义sa_handler是就需要定义如下类型函数:
void handler(int sig, siginfo_t *siginfo, void *ucontext);
其中第二个参数siginfo包含的字段标识出了在哪个文件描述上发生了事件。
使用F_SETSIG有两个优点:
1.指定不同信号作为信号驱动IO通知信号,解决了在同一进程范围内,多线程使用信号驱动IO冲突问题。因为默认都是发送SIGIO,而信号处理是进程范围的。
2.默认的SIGIO是非排队信号,如果有对个IO时间发送了信号,而SIGIO被阻塞了,除了第一个通知外,其他后续的通知都会丢失。使用实时信号就不会存在这个问题,信号的处理会被排队;除非信号队列溢出。
此表示是通过fcntl进行设置的,所以看一些setfl()。
static int setfl(int fd, struct file * filp, unsigned long arg) { struct inode * inode = file_inode(filp); int error = 0; ... /* * ->fasync() is responsible for setting the FASYNC bit. */ if (((arg ^ filp->f_flags) & FASYNC) && filp->f_op->fasync) {----------------------可以看出主要当前文件具有FASYNC标志位,并且f_op->fasync()定义了就会执行相关函数。 error = filp->f_op->fasync(fd, filp, (arg & FASYNC) != 0); if (error < 0) goto out; if (error > 0) error = 0; } spin_lock(&filp->f_lock); filp->f_flags = (arg & SETFL_MASK) | (filp->f_flags & ~SETFL_MASK); spin_unlock(&filp->f_lock); out: return error; }
我们看到对应的驱动中调用fasync_helper()进行fasync_struct创建,并插入到fasync列表中。
int fasync_helper(int fd, struct file * filp, int on, struct fasync_struct **fapp) { if (!on) return fasync_remove_entry(filp, fapp); return fasync_add_entry(fd, filp, fapp); } static int fasync_add_entry(int fd, struct file *filp, struct fasync_struct **fapp) { struct fasync_struct *new; new = fasync_alloc(); if (!new) return -ENOMEM; if (fasync_insert_entry(fd, filp, fapp, new)) { fasync_free(new); return 0; } return 1; }
当设备IO就需后,通过kill_fasync()发送信号。
void kill_fasync(struct fasync_struct **fp, int sig, int band) { if (*fp) { rcu_read_lock(); kill_fasync_rcu(rcu_dereference(*fp), sig, band); rcu_read_unlock(); } } static void kill_fasync_rcu(struct fasync_struct *fa, int sig, int band) { while (fa) {-------------------------------------------------------------------遍历当前文件句柄的所有fasync列表,并进行信号发送操作。 struct fown_struct *fown; unsigned long flags; if (fa->magic != FASYNC_MAGIC) { printk(KERN_ERR "kill_fasync: bad magic number in " "fasync_struct!\n"); return; } spin_lock_irqsave(&fa->fa_lock, flags); if (fa->fa_file) { fown = &fa->fa_file->f_owner; if (!(sig == SIGURG && fown->signum == 0))-----------------------------只有在signum为0,并sig等于SIGURQ放弃发送信号,因为SIGURQ有自己的特殊处理。 send_sigio(fown, fa->fa_fd, band); } spin_unlock_irqrestore(&fa->fa_lock, flags); fa = rcu_dereference(fa->fa_next); } } void send_sigio(struct fown_struct *fown, int fd, int band) { struct task_struct *p; enum pid_type type; struct pid *pid; int group = 1; read_lock(&fown->lock); type = fown->pid_type; if (type == PIDTYPE_MAX) {----------------------------------------------------这里体现了F_SETOWN_EX由于F_SETOWN的地方,如果设置为PIDTYPE_MAX,那么group则为0,只发送给线程,不会选择其他线程作为替代。 group = 0; type = PIDTYPE_PID; } pid = fown->pid; if (!pid) goto out_unlock_fown; read_lock(&tasklist_lock); do_each_pid_task(pid, type, p) { send_sigio_to_task(p, fown, fd, band, group); } while_each_pid_task(pid, type, p); read_unlock(&tasklist_lock); out_unlock_fown: read_unlock(&fown->lock); } static void send_sigio_to_task(struct task_struct *p, struct fown_struct *fown, int fd, int reason, int group) { /* * F_SETSIG can change ->signum lockless in parallel, make * sure we read it once and use the same value throughout. */ int signum = ACCESS_ONCE(fown->signum);--------------------------------------------------------在使用F_SETSIG设置信号后最好不好改变,否则可能造成发送和信号处理函数不匹配。 if (!sigio_perm(p, fown, signum)) return; switch (signum) { siginfo_t si; default:-----------------------------------------------------------------------------------其他情况发送一个RT信号,提供更加丰富的返回信息。 si.si_signo = signum; si.si_errno = 0; si.si_code = reason; BUG_ON((reason & __SI_MASK) != __SI_POLL); if (reason - POLL_IN >= NSIGPOLL) si.si_band = ~0L; else si.si_band = band_table[reason - POLL_IN]; si.si_fd = fd; if (!do_send_sig_info(signum, &si, p, group))------------------------------------------使用signum代提SIGIO作为信号发送。 break; case 0:------------------------------------------------------------------------------------在没有通过F_SETSIG设置signum的情况下,默认发送SIGIO信号。 do_send_sig_info(SIGIO, SEND_SIG_PRIV, p, group); } } int do_send_sig_info(int sig, struct siginfo *info, struct task_struct *p, bool group) { unsigned long flags; int ret = -ESRCH; if (lock_task_sighand(p, &flags)) { ret = send_signal(sig, info, p, group); unlock_task_sighand(p, &flags); } return ret; } static inline struct sighand_struct *lock_task_sighand(struct task_struct *tsk, unsigned long *flags) { struct sighand_struct *ret; ret = __lock_task_sighand(tsk, flags); (void)__cond_lock(&tsk->sighand->siglock, ret); return ret; } static inline void unlock_task_sighand(struct task_struct *tsk, unsigned long *flags) { spin_unlock_irqrestore(&tsk->sighand->siglock, *flags); }
实时信号相对于标准信号有如下优势:
标准信号和实时信号都通过seng_signal()发送。下面看一下两者是如何被区别对待的。
static int send_signal(int sig, struct siginfo *info, struct task_struct *t, int group) { int from_ancestor_ns = 0; #ifdef CONFIG_PID_NS from_ancestor_ns = si_fromuser(info) && !task_pid_nr_ns(current, task_active_pid_ns(t)); #endif return __send_signal(sig, info, t, group, from_ancestor_ns); } static int __send_signal(int sig, struct siginfo *info, struct task_struct *t, int group, int from_ancestor_ns) { struct sigpending *pending; struct sigqueue *q; int override_rlimit; int ret = 0, result; assert_spin_locked(&t->sighand->siglock); result = TRACE_SIGNAL_IGNORED; if (!prepare_signal(sig, t, from_ancestor_ns || (info == SEND_SIG_FORCED))) goto ret; pending = group ? &t->signal->shared_pending : &t->pending;----------------------------group起作用的地方,当group为0,则将信号放在当前线程私有的pending列表上;如果group为1,则将信号放在线程组共享的shared_pending列表上。 result = TRACE_SIGNAL_ALREADY_PENDING; if (legacy_queue(pending, sig))--------------------------------------------------------如果sig<SIGRTMIN,并且sig已经被处于pending中。也即标准信号sig已经在pending队列中,则返回表示当前sig信号已经放入到队列中。这也说明标准信号没有被队列化。 goto ret; result = TRACE_SIGNAL_DELIVERED; if (info == SEND_SIG_FORCED) goto out_set; if (sig < SIGRTMIN) override_rlimit = (is_si_special(info) || info->si_code >= 0);--------------------标准信号存在override_rlimit为1情况,那么即使当前信号队列达到了RLIMIT_SIGPENDING。仍然可以创建信号队列告诉缓存。 else override_rlimit = 0;--------------------------------------------------------------实时信号是不允许超过RLIMIT_SIGPENDING限制的。 q = __sigqueue_alloc(sig, t, GFP_ATOMIC | __GFP_NOTRACK_FALSE_POSITIVE, override_rlimit);-----------------------------------------------------------------创建一个高速缓存sigqueue_cachep。 if (q) { list_add_tail(&q->list, &pending->list);------------------------------------------将新创建的信号加入到队列中。 switch ((unsigned long) info) { case (unsigned long) SEND_SIG_NOINFO: q->info.si_signo = sig; q->info.si_errno = 0; q->info.si_code = SI_USER; q->info.si_pid = task_tgid_nr_ns(current, task_active_pid_ns(t)); q->info.si_uid = from_kuid_munged(current_user_ns(), current_uid()); break; case (unsigned long) SEND_SIG_PRIV: q->info.si_signo = sig; q->info.si_errno = 0; q->info.si_code = SI_KERNEL; q->info.si_pid = 0; q->info.si_uid = 0; break; default: copy_siginfo(&q->info, info); if (from_ancestor_ns) q->info.si_pid = 0; break; } userns_fixup_signal_uid(&q->info, t); } else if (!is_si_special(info)) { if (sig >= SIGRTMIN && info->si_code != SI_USER) { result = TRACE_SIGNAL_OVERFLOW_FAIL; ret = -EAGAIN; goto ret; } else { result = TRACE_SIGNAL_LOSE_INFO; } } out_set: signalfd_notify(t, sig); sigaddset(&pending->signal, sig); complete_signal(sig, t, group); ret: trace_signal_generate(sig, info, t, group, result); return ret; }
static struct sigqueue *
__sigqueue_alloc(int sig, struct task_struct *t, gfp_t flags, int override_rlimit)
{
struct sigqueue *q = NULL;
struct user_struct *user;
rcu_read_lock();
user = get_uid(__task_cred(t)->user);
atomic_inc(&user->sigpending);
rcu_read_unlock();
if (override_rlimit ||
atomic_read(&user->sigpending) <=
task_rlimit(t, RLIMIT_SIGPENDING)) {----------------------------------这里面说明了override_rlimit为1情况可以超出RLIMIT_SIGPENDING限制。实时信号在队列超过RLIMIT_SIGPENDING后则不允许创建信号队列。
q = kmem_cache_alloc(sigqueue_cachep, flags);
} else {
print_dropped_signal(sig);---------------------------------------------------当出现丢弃信号的情况,打印信息。
}
if (unlikely(q == NULL)) {
atomic_dec(&user->sigpending);
free_uid(user);
} else {
INIT_LIST_HEAD(&q->list);
q->flags = 0;
q->user = user;
}
return q;
}
这里详细解释了信号驱动IO的流程;但信号产生放入pending队列之后,何时被处理呢?可以参考《信号何时被处理》。
IO多路复用允许我们同时检查多个文件描述符,看其中任意一个是否可执行IO操作。可以在普通文件、终端、管道等字符型设备上使用select()/poll()来检查文件描述符。
下面分别对select()/poll()的API、内核流程、测试程序进行分析,然后对两者进行对比。
select()会一直阻塞,直到一个或多个文件描述符集合成为就绪态。
#include <sys/time.h> /* For portability */ #include <sys/select.h> int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,struct timeval *timeout);
参数nfds、readfds、writefds、exceptfds指定了select()要检查的文件描述符集合。
参数timeout用来设定select()阻塞的时间上限。
#include <sys/select.h> void FD_ZERO(fd_set *fdset);--------------------将fdset指向的集合初始化为空。 void FD_SET(int fd, fd_set *fdset);-------------将描述符fd添加到fdset所指向的集合中。 void FD_CLR(int fd, fd_set *fdset);-------------将描述符fd从fdset指向的集合中移除。 int FD_ISSET(int fd, fd_set *fdset);------------判断描述符fd是否在fdset结合中设置。
文件描述符集合有一个最大容量限制,由常量FD_SETSIZE来决定。Linux通常为1024。
参数timeout控制着select()阻塞行为,NULL时select()会一直阻塞。或者指向一个timeval结构体。
struct timeval { time_t tv_sec; /* Seconds */ suseconds_t tv_usec; /* Microseconds (long int) */ };
当timeout为NULL或者指向结构体字段非0时,select()阻塞直到下列事件发生:
如果timeout非空,且一个或多个文件描述符就绪返回时,timeout所指向的结构体表示剩余超时时间。
-1表示有错误发生。EBADF表示有一个文件描述符是非法的;EINTR表示该调用被信号处理例程中断了。
0表示在任何文件描述符成为就绪态前select()已经超时。
正整数表示一个或多个文件描述符已达到就绪态。返回值表示就绪态的文件描述符个数。
select系统调用路径是select()->core_sys_select()->do_select(),最终是遍历每个文件句柄的poll成员,根据poll()返回的参数判断当前文件状态。
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp, fd_set __user *, exp, struct timeval __user *, tvp) { struct timespec64 end_time, *to = NULL; struct timeval tv; int ret; if (tvp) { if (copy_from_user(&tv, tvp, sizeof(tv))) return -EFAULT; to = &end_time; if (poll_select_set_timeout(to, tv.tv_sec + (tv.tv_usec / USEC_PER_SEC), (tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))------------------------------------从内核拷贝信息并转换成内核数据结构struct timespec64。 return -EINVAL; } ret = core_sys_select(n, inp, outp, exp, to);------------------------------------------------和系统调用类似,只是to已经转变成内核时间。 ret = poll_select_copy_remaining(&end_time, tvp, 1, ret);------------------------------------计算end_time和当前时间的差值,并转化成tvp返回给用户空间。 return ret; } int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp, fd_set __user *exp, struct timespec64 *end_time) { fd_set_bits fds; void *bits; int ret, max_fds; size_t size, alloc_size; struct fdtable *fdt; /* Allocate small arguments on the stack to save memory and be faster */ long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];----------------------------------------------SELECT_STACK_ALLOC默认是256,32位系统stack_fds一共64成员。 ret = -EINVAL; if (n < 0) goto out_nofds; /* max_fds can increase, so grab it once to avoid race */ rcu_read_lock(); fdt = files_fdtable(current->files); max_fds = fdt->max_fds; rcu_read_unlock(); if (n > max_fds) n = max_fds; size = FDS_BYTES(n);------------------------------------------------------------------------一个文件描述符占用一bit,size表示这些fd_set需要用掉多少个字节。 bits = stack_fds; if (size > sizeof(stack_fds) / 6) {---------------------------------------------------------如果stack空间足够当前存放当前strcut fd_set_bits fds的时候,优先使用stack内存,好处是更加快并且节省内存。不然就需要kmalloc去申请内存。 /* Not enough space in on-stack array; must use kmalloc */------------------------------stack_fds大小为256字节,所以要使用stack的最大size=256/6/sizeof(long)=10。 ret = -ENOMEM; if (size > (SIZE_MAX / 6)) goto out_nofds; alloc_size = 6 * size; bits = kmalloc(alloc_size, GFP_KERNEL|__GFP_NOWARN);-------------------------------------通过kmalloc来分配6个size大小的内存;大于一个页面使用vmalloc。 if (!bits && alloc_size > PAGE_SIZE) bits = vmalloc(alloc_size); if (!bits) goto out_nofds; } fds.in = bits;--------------------------------------------------------------------------此时bits指向的内存都已经分配完毕,并且是6个同样size大小的。 fds.out = bits + size; fds.ex = bits + 2*size; fds.res_in = bits + 3*size; fds.res_out = bits + 4*size; fds.res_ex = bits + 5*size; if ((ret = get_fd_set(n, inp, fds.in)) || (ret = get_fd_set(n, outp, fds.out)) || (ret = get_fd_set(n, exp, fds.ex))) goto out; zero_fd_set(n, fds.res_in); zero_fd_set(n, fds.res_out); zero_fd_set(n, fds.res_ex);------------------------------------------------------------------将用户空间传入的fds拷贝到fds中,并清空res_in、res_out、res_ex。 ret = do_select(n, &fds, end_time); if (ret < 0) goto out; if (!ret) { ret = -ERESTARTNOHAND; if (signal_pending(current)) goto out; ret = 0; } if (set_fd_set(n, inp, fds.res_in) || set_fd_set(n, outp, fds.res_out) || set_fd_set(n, exp, fds.res_ex))----------------------------------------------------------res_int/res_out/res_ex中包含了状态变化的文件句柄,拷贝到in/out/ex返回给用户空间。 ret = -EFAULT; out: if (bits != stack_fds) kvfree(bits); out_nofds: return ret; } int do_select(int n, fd_set_bits *fds, struct timespec64 *end_time) { ktime_t expire, *to = NULL; struct poll_wqueues table; poll_table *wait; int retval, i, timed_out = 0; u64 slack = 0; unsigned int busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;---------------------------这个参数影响下面一轮查询所有文件描述符之后,才做busy loop还是睡眠。这个很影响CPU占用率,busy loop不会主动放弃CPU,睡眠则主动放弃CPU。 unsigned long busy_end = 0; rcu_read_lock(); retval = max_select_fd(n, fds); rcu_read_unlock(); if (retval < 0) return retval; n = retval; poll_initwait(&table);-----------------------------------------------------------------------将当前进程放入自己的等待队列table,并将该等待队列加入到测试表wait。 wait = &table.pt; if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {-----------------------------------这也是timeout参数为0时候的特殊情况,直接timed_out置1。 wait->_qproc = NULL; timed_out = 1; } if (end_time && !timed_out) slack = select_estimate_accuracy(end_time); retval = 0; for (;;) { unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp; bool can_busy_loop = false; inp = fds->in; outp = fds->out; exp = fds->ex; rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex; for (i = 0; i < n; ++rinp, ++routp, ++rexp) {--------------------------------------------遍历所有的fd。 unsigned long in, out, ex, all_bits, bit = 1, mask, j; unsigned long res_in = 0, res_out = 0, res_ex = 0; in = *inp++; out = *outp++; ex = *exp++;---------------------------------------------先取出当前循环周期中的32个文件描述符对应的bitmaps。 all_bits = in | out | ex;------------------------------------------------------------in/out/ex三者组合,有的fd可能监测读或写或异常,或者都检测。 if (all_bits == 0) {-----------------------------------------------------------------32个文件描述符不检测任何状态,调到下一个循环。 i += BITS_PER_LONG; continue; } for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) { struct fd f; if (i >= n) break; if (!(bit & all_bits))-----------------------------------------------------------bit每次循环左移一位,如果没有监测当前为则跳过进入下一次循环。 continue; f = fdget(i); if (f.file) { const struct file_operations *f_op; f_op = f.file->f_op; mask = DEFAULT_POLLMASK; if (f_op->poll) { wait_key_set(wait, in, out, bit, busy_flag);-----------------------------------------------设置当前fd待检测的事件掩码。 mask = (*f_op->poll)(f.file, wait);-------------------------------------调用每个文件句柄的poll成员,返回文件的状态mask。下面分别检查POLLIN_SET、POLLOUT_SET、POLLEX_SET三种状态。 } fdput(f); if ((mask & POLLIN_SET) && (in & bit)) { res_in |= bit; retval++; wait->_qproc = NULL; } if ((mask & POLLOUT_SET) && (out & bit)) { res_out |= bit; retval++; wait->_qproc = NULL; } if ((mask & POLLEX_SET) && (ex & bit)) { res_ex |= bit; retval++; wait->_qproc = NULL; } /* got something, stop busy polling */ if (retval) { can_busy_loop = false; busy_flag = 0; /* * only remember a returned * POLL_BUSY_LOOP if we asked for it */ } else if (busy_flag & mask) can_busy_loop = true; } } if (res_in)--------------------------------------------------------------------------根据poll结果写回到输出位图里。 *rinp = res_in; if (res_out) *routp = res_out; if (res_ex) *rexp = res_ex; cond_resched(); } wait->_qproc = NULL; if (retval || timed_out || signal_pending(current))-------------------------------------三种返回情况,retval表示有文件句柄满足条件;timed_out表示在有timeout参数的情况下超时;signal_pending()表示被信号中断。 break; if (table.error) { retval = table.error; break; } if (can_busy_loop && !need_resched()) {--------------------------------------------------这里面会一直占用CPU。 if (!busy_end) { busy_end = busy_loop_end_time(); continue; } if (!busy_loop_timeout(busy_end)) continue; } busy_flag = 0; if (end_time && !to) { expire = timespec64_to_ktime(*end_time); to = &expire; } if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE, to, slack))-------------------------------------------------------------进程进入睡眠等待唤醒,如果poll_schedule_timeout()返回0表示超时,下面timed_out被置1。to是根据end_time计算的超时时间。 timed_out = 1; } poll_freewait(&table); return retval; }
回顾一下select()大致流程如下:
1.进入系统调用。
2.进行时间转换;数据准备,分成in、out、exception三部分。
3.对所有文件句柄进行循环,调用对应文件句柄的poll()函数;分别查询是那种类型文件句柄有状态变化。
4.如果一轮循环下来没有变化,则进入休眠等待,直到超时。
5.如果有数据则唤醒进程,将变化句柄数返回给用户空间。返回值是三种状态的综合,具体哪种状态哪个文件句柄变化,需要查看分别查看返回的文件句柄。
select()测试程序和poll()使用同样的内核驱动,区别就是设置文件句柄和API参数。
#include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <stdio.h> #include <poll.h> #include <memory.h> #include <unistd.h> #include <stdlib.h> #define POLLTEST_NAME "/dev/polltest0" #define LOOP_COUNT 3 int main(int argc, char **argv) { int fd, loop_count = 0; unsigned char polltest_count[30]; int ret; fd_set rds; memset(polltest_count, 30, 0); fd = open(POLLTEST_NAME, O_RDWR); if (fd < 0) { printf("can‘t open!\n"); } FD_ZERO(&rds); FD_SET(fd, &rds);--------------------------------------------------------------将fd加入到rds句柄集中,在select()中对其进行监控。 while (1) { ret = select(fd+1, &rds, NULL, NULL, NULL); if (ret == 0) { printf("time out\n"); } else { if(FD_ISSET(fd, &rds))-------------------------------------------------判断句柄fd是否有POLLIN事件发生。 { read(fd, &polltest_count, 0); loop_count = atoi(polltest_count); printf("key_val = %d\n", loop_count); if(loop_count >= LOOP_COUNT) break; } } } return 0; }
同样的从认识poll() API开始;然后详细分析poll系统调用内核是如何实现的;最后对poll()测试程序流程结合内核进行详细分析。
poll()提供一列文件描述符,在每个文件描述符上标明感兴趣的事件。
#include <poll.h> int poll(struct pollfd fds[], nfds_t nfds, int timeout);
nfds指定了fds中的元素个数;参数fds列出了需要poll()来检查的文件描述符:
struct pollfd {
int fd; /* File descriptor */
short events; /* Requested events bit mask */-------指定需要为描述符fd做检查的事件。
short revents; /* Returned events bit mask */-------表示fd上实际发生的事件。
};
timeout参数的单位是毫秒。
-1,poll()会一直阻塞直到fds数组中列出的有一个达到就绪态或者捕获到一个信号。
0,poll()不会阻塞,只是执行一次检查看看哪个文件描述符处于就绪态。
>0,poll()至多阻塞timeout毫秒,直到fds列出的文件描述符中有一个达到就绪态,或者捕获到一个信号为止。
-1标识有错误发生,一种可能是被信号中断返回EINTR。
0表示调用在任意一个文件描述符就绪之前就超时了。
正整数表示一个或多个文件描述符处于就绪态。返回值表示数组fds中拥有非零revents字段的pollfd结构体数量。
SYSCALL_DEFINE3(poll, struct pollfd __user *, ufds, unsigned int, nfds, int, timeout_msecs) { struct timespec64 end_time, *to = NULL; int ret; if (timeout_msecs >= 0) {-------------------------------------------从poll()参数timeout可知小于0表示poll()会一直阻塞;等于0表示只检查一次;大于0表示等待一个超时时间。 to = &end_time; poll_select_set_timeout(to, timeout_msecs / MSEC_PER_SEC, NSEC_PER_MSEC * (timeout_msecs % MSEC_PER_SEC));------------根据timeout_msecs转换成内核超时数据结构to。 } ret = do_sys_poll(ufds, nfds, to); if (ret == -EINTR) { struct restart_block *restart_block; restart_block = ¤t->restart_block; restart_block->fn = do_restart_poll; restart_block->poll.ufds = ufds; restart_block->poll.nfds = nfds; if (timeout_msecs >= 0) { restart_block->poll.tv_sec = end_time.tv_sec; restart_block->poll.tv_nsec = end_time.tv_nsec; restart_block->poll.has_timeout = 1; } else restart_block->poll.has_timeout = 0; ret = -ERESTART_RESTARTBLOCK; } return ret; } int do_sys_poll(struct pollfd __user *ufds, unsigned int nfds, struct timespec64 *end_time) { struct poll_wqueues table; int err = -EFAULT, fdcount, len, size; long stack_pps[POLL_STACK_ALLOC/sizeof(long)]; struct poll_list *const head = (struct poll_list *)stack_pps; struct poll_list *walk = head; unsigned long todo = nfds; if (nfds > rlimit(RLIMIT_NOFILE))-----------------------------------nfds不能超过系统对打开文件数目限制。 return -EINVAL; len = min_t(unsigned int, nfds, N_STACK_PPS);-----------------------取nfds和栈能容纳的fds最小值。 for (;;) {----------------------------------------------------------遍历所有的ufds[],分配对应的struct poll_list,并且链接起来;每个struct poll_list里面又包含了若干个struct pollfd,他的大小通过len确定。 walk->next = NULL; walk->len = len; if (!len) break; if (copy_from_user(walk->entries, ufds + nfds-todo, sizeof(struct pollfd) * walk->len))-----------------首先将使用栈提供的空间,将用户空间struct pollfd拷贝到内核空间。 goto out_fds; todo -= walk->len;----------------------------------------------假设len==nfds,todo也等于walk->len;所以此处退出for(;;)。 if (!todo) break; len = min(todo, POLLFD_PER_PAGE); size = sizeof(struct poll_list) + sizeof(struct pollfd) * len; walk = walk->next = kmalloc(size, GFP_KERNEL);------------------如果占空间不够存放struct pollfd,那么kmalloc()申请内存。 if (!walk) { err = -ENOMEM; goto out_fds; } } poll_initwait(&table);----------------------------------------------初始化一个struct poll_wqueues。 fdcount = do_poll(head, &table, end_time); poll_freewait(&table); for (walk = head; walk; walk = walk->next) { struct pollfd *fds = walk->entries; int j; for (j = 0; j < walk->len; j++, ufds++) if (__put_user(fds[j].revents, &ufds->revents))-------------将所有内核处理的revents返回给用户空间。 goto out_fds; } err = fdcount; out_fds: walk = head->next; while (walk) { struct poll_list *pos = walk; walk = walk->next; kfree(pos);-----------------------------------------------------释放申请的内存。 } return err; } static int do_poll(struct poll_list *list, struct poll_wqueues *wait, struct timespec64 *end_time) { poll_table* pt = &wait->pt; ktime_t expire, *to = NULL; int timed_out = 0, count = 0; u64 slack = 0; unsigned int busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;-----------------------决定下面是循环还是睡眠等待超时。 unsigned long busy_end = 0; /* Optimise the no-wait case */ if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {------------------------------对timeout为0特殊情况的处理,只查询一次就退出。 pt->_qproc = NULL; timed_out = 1; } if (end_time && !timed_out) slack = select_estimate_accuracy(end_time); for (;;) { struct poll_list *walk; bool can_busy_loop = false; for (walk = list; walk != NULL; walk = walk->next) {---------------------------------遍历所有的struct poll_list。 struct pollfd * pfd, * pfd_end; pfd = walk->entries; pfd_end = pfd + walk->len; for (; pfd != pfd_end; pfd++) { if (do_pollfd(pfd, pt, &can_busy_loop, busy_flag)) { count++; pt->_qproc = NULL; /* found something, stop busy polling */ busy_flag = 0; can_busy_loop = false; } } } pt->_qproc = NULL; if (!count) { count = wait->error; if (signal_pending(current)) count = -EINTR; } if (count || timed_out)-----------------------------------------------------------------count是所有的有状态变化的描述符数量;timed_out表示是否超时。 break; /* only if found POLL_BUSY_LOOP sockets && not out of time */ if (can_busy_loop && !need_resched()) { if (!busy_end) { busy_end = busy_loop_end_time(); continue; } if (!busy_loop_timeout(busy_end)) continue; } busy_flag = 0; if (end_time && !to) { expire = timespec64_to_ktime(*end_time); to = &expire; } if (!poll_schedule_timeout(wait, TASK_INTERRUPTIBLE, to, slack)) timed_out = 1; } return count; } static inline unsigned int do_pollfd(struct pollfd *pollfd, poll_table *pwait, bool *can_busy_poll, unsigned int busy_flag) { unsigned int mask; int fd; mask = 0; fd = pollfd->fd;------------------------------------------------------------------------------根据struct pollfd找到对应的fd,然后调用所属的poll()函数。 if (fd >= 0) { struct fd f = fdget(fd); mask = POLLNVAL; if (f.file) { mask = DEFAULT_POLLMASK; if (f.file->f_op->poll) { pwait->_key = pollfd->events|POLLERR|POLLHUP; pwait->_key |= busy_flag; mask = f.file->f_op->poll(f.file, pwait);----------------------------------------调用具体文件的poll()函数。 if (mask & busy_flag) *can_busy_poll = true; } /* Mask out unneeded events. */ mask &= pollfd->events | POLLERR | POLLHUP; fdput(f); } } pollfd->revents = mask; return mask; }
回顾一下,poll系统调用大致流程是:
1.进入poll系统调用。
2.进行timeout时间转换;准备每个文件句柄对应的数据,并将这些数据串联起来。
3.对所有的struct pollfd进行循环,调用do_pollfd()查询状态。
4.do_pollfd()调用具体文件的file->f_op->poll()查询状态。
5.一次轮询之后,将当前线程挂起等待超时获唤醒。
6.有数据到达后退出循环,将数据返回给用户空间。
首先创建polltest_kernel.c和Makefile文件。
#define DEBUG #include <linux/kernel.h> #include <linux/module.h> #include <linux/init.h> #include <linux/cdev.h> #include <linux/device.h> #include <linux/fs.h> #include <linux/uaccess.h> #include <linux/poll.h> #define POLL_EXPRIES 1 #define POLLTEST_NAME "polltest" static struct class *polltest_class; static int polltest_major; static struct hrtimer polltest_hrtimer; static volatile int ev_press = 0; int polltest_count = 0; static unsigned char polltest_text[30]; DECLARE_WAIT_QUEUE_HEAD(polltest_waitq); static enum hrtimer_restart hrtimer_func(struct hrtimer *timer) { wake_up_interruptible(&polltest_waitq);--------------------------------------------------调用队列上所有等待项wait_queue_t->func()函数,这个函数主要用来唤醒等待的进程。 printk("%s line=%d\n", __func__, __LINE__); ev_press = 1;----------------------------------------------------------------------------只有ev_press置位,poll()才会返回POLLIN状态。其他情况就会阻塞。 polltest_count++; hrtimer_forward_now(&polltest_hrtimer, ktime_set(0, 10000000)); return HRTIMER_RESTART; } static int polltest_open(struct inode *inode, struct file *file) { hrtimer_init(&polltest_hrtimer, CLOCK_MONOTONIC, HRTIMER_MODE_REL); polltest_hrtimer.function = hrtimer_func; hrtimer_start(&polltest_hrtimer, ktime_set(0, 10000000), HRTIMER_MODE_REL); printk("%s line=%d\n", __func__, __LINE__); return 0; } static int polltest_release(struct inode *inode, struct file *file) { hrtimer_cancel(&polltest_hrtimer); printk("%s line=%d\n", __func__, __LINE__); return 0; } static ssize_t polltest_read(struct file * file, char __user * buf, size_t count, loff_t *ppos) { int size; size = snprintf(polltest_text, 30, "%d\n", polltest_count); copy_to_user(buf, polltest_text, size); return size; } static ssize_t polltest_write(struct file * file, const char __user * buf, size_t count, loff_t *ppos) { char str[30]; copy_from_user(str, buf,count); return 0; } static unsigned int polltest_poll(struct file *file, struct poll_table_struct *wait) { unsigned int mask = 0; poll_wait(file, &polltest_waitq, wait);-------------------------------------------------将当前struct poll_table_struct的wait加入到polltest_waitq队列头中。那么这个wait合适被从polltest_waitq队列头移出呢?在poll_freewait()中。 printk("%s line=%d\n", __func__, __LINE__); if(ev_press)----------------------------------------------------------------------------在hrtimer超时之后,ev_press置位,才会发送POLLIN状态。
{
mask |= POLLIN | POLLRDNORM;--------------------------------------------------------如果返回POLLIN状态,poll系统调用才会返回;否则会进入睡眠状态及poll_schedule_timeout()。
ev_press = 0;-----------------------------------------------------------------------恢复为0,避免下次poll()返回POLLIN。
} return mask; } static const struct file_operations polltest_fops = { .owner = THIS_MODULE, .open = polltest_open, .release = polltest_release, .read = polltest_read, .write = polltest_write, .poll = polltest_poll, }; static int __init polltest_test_init(void) { struct device *polltest_device; polltest_major = register_chrdev(0, POLLTEST_NAME, &polltest_fops); if (polltest_major < 0) { pr_err("register_chrdev failed\n"); goto err; } polltest_class = class_create(THIS_MODULE, POLLTEST_NAME); if (IS_ERR(polltest_class)) { pr_err("device class file already in use\n"); goto err_class; } polltest_device = device_create(polltest_class, NULL, MKDEV(polltest_major, 0), NULL, "%s%d", POLLTEST_NAME, 0); if (IS_ERR(polltest_device)) { pr_err("failed to create device\n"); goto err_device; } return 0; err_device: class_destroy(polltest_class); err_class: unregister_chrdev(polltest_major, POLLTEST_NAME); err: return 0; } static void __exit polltest_test_exit(void) { device_destroy(polltest_class, MKDEV(polltest_major, 0)); class_destroy(polltest_class); unregister_chrdev(polltest_major, POLLTEST_NAME); } module_init(polltest_test_init); module_exit(polltest_test_exit); MODULE_LICENSE("GPL");
Makefile:
CONFIG_MODULE_SIG=n obj-m := polltest_kernel.o KERN_DIR := /lib/modules/$(shell uname -r)/build PWD := $(shell pwd) all: $(MAKE) -C $(KERN_DIR) M=$(PWD) modules gcc polltest_user.c -o polltest_user gcc polltest_select.c -o polltest_select clean: $(MAKE) -C $(KERN_DIR) M=$(PWD) modules clean rm polltest_user rm polltest_select modules_install: $(MAKE) -C $(KERN_DIR) M=$(PWD) modules_install
然后创建用户空间测试程序:
#include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <stdio.h> #include <poll.h> #include <memory.h> #include <unistd.h> #include <stdlib.h> #define POLLTEST_NAME "/dev/polltest0" #define LOOP_COUNT 3 int main(int argc, char **argv) { int fd, loop_count = 0; unsigned char polltest_count[30]; int ret; struct pollfd fds[1]; memset(polltest_count, 30, 0); fd = open(POLLTEST_NAME, O_RDWR);--------------------------------打开/dev/polltest0此时启动一个10ms的hrtimer。 if (fd < 0) { printf("can‘t open!\n"); } fds[0].fd = fd; fds[0].events = POLLIN; while (1) { ret = poll(fds, 1, 5000);------------------------------------调用/dev/polltest0的poll()函数。如果返回状态有POLLIN则ret表示满足状态的句柄数,应该为1;如果返回0表示超时。 if (ret == 0) { printf("time out\n"); } else { if(fds[0].revents == POLLIN) { read(fd, &polltest_count, 0); loop_count = atoi(polltest_count); printf("key_val = %d\n", loop_count); if(loop_count >= LOOP_COUNT) break; } } } return 0; }
结果如下:
[ 2366.904469] polltest_open line=41---------------------------------打开/dev/polltest0,启动10ms的hrtimer。 [ 2366.904475] polltest_poll line=78---------------------------------紧接着poll()系统调用,由于ev_press为0,没有满足POLLIN条件;所以进程进入睡眠状态。 [ 2366.914893] hrtimer_func line=29----------------------------------hrtimer 10ms超时,唤醒进程并且ev_press为1。 [ 2366.914951] polltest_poll line=78---------------------------------进程被唤醒之后,重新调用/dev/polltest0的poll()检查状态;因为此时ev_press为1,所以返回POLLIN。满足条件,poll()系统调用退出,用户空间read()且置ev_press为0。 [ 2366.915197] polltest_poll line=78---------------------------------再次调用poll()系统调用,此时ev_press为0,所以进入睡眠状态。 [ 2366.924575] hrtimer_func line=29 [ 2366.924584] polltest_poll line=78 [ 2366.924641] polltest_poll line=78 [ 2366.934632] hrtimer_func line=29 [ 2366.934683] polltest_poll line=78 [ 2366.934980] polltest_release line=49
从上面的分析可以看出select()/poll()的内核实现其实大同小异,最终都是调用具体文见的poll()函数查询状态。
select()和poll()系统调用有如下几个重要的结构体:struct poll_wqueues、struct poll_table_struct、struct poll_table_page、struct poll_table_entry,其中poll()还有两个文件句柄相关结构体:struct poll_list、struct pollfd。
其中struct poll_wqueues是核心,用来统一辅助实现这个进程中所有监测fd的轮训工作。
struct poll_list {-------------------------------------------------一次poll()的poll_list可能有多个,通过next链接起来。一个poll_list里可以有多个pollfd。 struct poll_list *next; int len;-------------------------------------------------------下面entries[]的数目。 struct pollfd entries[0]; }; struct pollfd {---------------------------------------------------和用户空间一致的数据结构,表示一个poll句柄。 int fd; short events; short revents; }; struct poll_wqueues { poll_table pt; struct poll_table_page *table;----------------------------------指向poll_table_page类型页面,多个页面可以互相链接起来。 struct task_struct *polling_task;-------------------------------保存当前调用select()/poll()的进程struct task_struct结构体。 int triggered;--------------------------------------------------当前用户进程被唤醒后置位,以免该进程接着睡眠。 int error;------------------------------------------------------错误码。 int inline_index;-----------------------------------------------数组inline_entries[]的下标。 struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES]; };
struct poll_table_page {------------------------------------------申请的物理页面都会将起始地址强制转换成该结构体指针。
struct poll_table_page * next;---------------------------------指向下一个申请的物理页面地址。
struct poll_table_entry * entry;-------------------------------指向entries[]中首个待分配poll_table_entry地址。
struct poll_table_entry entries[0];----------------------------该页后面剩余的空间都是待分配的poll_table_entry结构体。
};
typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *); typedef struct poll_table_struct { poll_queue_proc _qproc; unsigned long _key; } poll_table; struct poll_table_entry { struct file *filp;--------------------------------------------指向特定fd对应的file结构体。 unsigned long key;--------------------------------------------等待特定fd对应硬件设备的事件掩码,如POLLIN、POLLOUT等。 wait_queue_t wait;--------------------------------------------代表调用select()/poll()的应用程序,等待在fd对应设备的特定时间的等待队列头上的等待队列项。 wait_queue_head_t *wait_address;------------------------------设备驱动中特定时间的等待队列头。 };
一次select()/poll()调用对应一个strcut poll_wqueue结构体;struct poll_table_struct就是一个函数和一个key值。
一次select()/poll()可能包含一个或者多个struct poll_table_page,这个可能根据文件句柄的数量而变化。
一个fd对应一个struct poll_table_entry。
所以fd、struct poll_fd、strcut poll_table_entry是一一对应的;一次select()/poll()和struct poll_wqueues、strcut poll_table_strcut是一一对应的。
这两个结构体都通过poll_initwait()初始化,然后在poll_wait()的时候调用poll_queue_proc函数。
poll_initwait()是select()/poll()中对相关结构体进行初始化的入口;poll_wait()是驱动中实现file_operations成员poll()函数的主要功能,它将此次调用转换成一个wait_queue_t放入到驱动的等待队列中。
然后wake_up_interruptible()是唤醒所有wait_queue_head_t上的等待项,这些等待项调用对应的func()函数;默认是用来唤醒default_wake_function()。
void poll_initwait(struct poll_wqueues *pwq) { init_poll_funcptr(&pwq->pt, __pollwait);-----------------------------------------------------------------初始化struct poll_table,函数是__pollwait。在后面poll_wait()会被调用。 pwq->polling_task = current;-----------------------------------------------------------------------------polling_task指向当前进程结构体。 pwq->triggered = 0; pwq->error = 0; pwq->table = NULL; pwq->inline_index = 0; } static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc) { pt->_qproc = qproc; pt->_key = ~0UL; /* all events enabled */ } static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)-------------wait_address是驱动程序提供的等待队列头,来容纳后需等待该硬件设备就绪的进程对应的等待队列项。p是系统调用传下来的struct poll_table_strcut。 { if (p && p->_qproc && wait_address) p->_qproc(filp, wait_address, p);-----------------------------------------------------------------------调用__pollwait()函数。 } /* Add a new entry */ static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p) { struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt); struct poll_table_entry *entry = poll_get_entry(pwq);-------------------------------------------------------首先从struct poll_wqueues->inline_entries[]中获取poll_table_entry;如果不够用则通过盛情一个页面转换成struct poll_table_page类型指针。 if (!entry) return; entry->filp = get_file(filp); entry->wait_address = wait_address; entry->key = p->_key; init_waitqueue_func_entry(&entry->wait, pollwake);----------------------------------------------------------等待队列项的操作函数指定为pollwake(),这个函数作用就是唤醒polling_task对应的进程。 entry->wait.private = pwq;----------------------------------------------------------------------------------私有变量指向pwq,在__pollwake()中会使用到pwq->polling_task来唤醒对应进程。 add_wait_queue(wait_address, &entry->wait);-----------------------------------------------------------------将poll_table_entry对应的wait加入到驱动的wait_queue_head_t上。 }
static struct poll_table_entry *poll_get_entry(struct poll_wqueues *p)
{
struct poll_table_page *table = p->table;
if (p->inline_index < N_INLINE_POLL_ENTRIES)
return p->inline_entries + p->inline_index++;
if (!table || POLL_TABLE_FULL(table)) {
struct poll_table_page *new_table;
new_table = (struct poll_table_page *) __get_free_page(GFP_KERNEL);--------------------------------------申请一个空闲页面,转换成struct poll_table_page类型指针;在poll_freewait()中被释放。
if (!new_table) {
p->error = -ENOMEM;
return NULL;
}
new_table->entry = new_table->entries;
new_table->next = table;
p->table = new_table;
table = new_table;
}
return table->entry++;
}
static int pollwake(wait_queue_t *wait, unsigned mode, int sync, void *key) { struct poll_table_entry *entry; entry = container_of(wait, struct poll_table_entry, wait);--------------------------------------------------根据wait找到struct poll_table_entry,进而获得关注的事件值key。 if (key && !((unsigned long)key & entry->key)) return 0; return __pollwake(wait, mode, sync, key); } static int __pollwake(wait_queue_t *wait, unsigned mode, int sync, void *key) { struct poll_wqueues *pwq = wait->private;------------------------------------------------------------------在__pollwait()中设置变量,此处使用其polling_task成员。 DECLARE_WAITQUEUE(dummy_wait, pwq->polling_task);----------------------------------------------------------定义一个临时的wait_queue_t,给下面的default_wake_functio做参数。 smp_wmb(); pwq->triggered = 1; return default_wake_function(&dummy_wait, mode, sync, key);------------------------------------------------唤醒pwq->polling_task对应的线程。 } int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags, void *key) { return try_to_wake_up(curr->private, mode, wake_flags); }
释放poll_wqueues申请的内存,主要是struct poll_table_page对应的页面。
poll_schedule_timeout()实现超时功能。
void poll_freewait(struct poll_wqueues *pwq) { struct poll_table_page * p = pwq->table; int i; for (i = 0; i < pwq->inline_index; i++) free_poll_entry(pwq->inline_entries + i);------------------------------------------这里将poll_table_entry->wait从其所在队列头移出。在poll_wait()中被添加进去。 while (p) { struct poll_table_entry * entry; struct poll_table_page *old; entry = p->entry; do { entry--; free_poll_entry(entry); } while (entry > p->entries); old = p; p = p->next; free_page((unsigned long) old); } }
static void free_poll_entry(struct poll_table_entry *entry)
{
remove_wait_queue(entry->wait_address, &entry->wait);
fput(entry->filp);
}
int poll_schedule_timeout(struct poll_wqueues *pwq, int state, ktime_t *expires, unsigned long slack) { int rc = -EINTR; set_current_state(state); if (!pwq->triggered) rc = schedule_hrtimeout_range(expires, slack, HRTIMER_MODE_ABS);-------------------调用schedule_hrtimeout_range()来完成timeout功能。 __set_current_state(TASK_RUNNING); smp_store_mb(pwq->triggered, 0); return rc; }
wake_up_interruptible()用于执行等待队列wake_queue_head_t上的所有wait_queue_t对应func函数,这里对应的就是pollwake(),即唤醒对应的进程。
#define wake_up_interruptible(x) __wake_up(x, TASK_INTERRUPTIBLE, 1, NULL) void __wake_up(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, void *key) { unsigned long flags; spin_lock_irqsave(&q->lock, flags); __wake_up_common(q, mode, nr_exclusive, 0, key); spin_unlock_irqrestore(&q->lock, flags); } static void __wake_up_common(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, int wake_flags, void *key) { wait_queue_t *curr, *next; list_for_each_entry_safe(curr, next, &q->task_list, task_list) {------------------------q->task_list是所有wait_queue_head_t上wait_queue_t对应的进程链表,遍历链表上进程结构体,进而找到对应的wait_queue_t。然后调用wait_queue_t->func函数。 unsigned flags = curr->flags; if (curr->func(curr, mode, wake_flags, key) && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)------------------------------ break; } }
void add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait)------------------------------将wait_queue_t加入到wait_queue_head_t的时候通过将wait_queue_t->task_list加入到wait_queue_head_t->task_list链表上。
{
unsigned long flags;
wait->flags &= ~WQ_FLAG_EXCLUSIVE;
spin_lock_irqsave(&q->lock, flags);
__add_wait_queue(q, wait);
spin_unlock_irqrestore(&q->lock, flags);
}
static inline void __add_wait_queue(wait_queue_head_t *head, wait_queue_t *new)
{
list_add(&new->task_list, &head->task_list);
}
select()和poll()基于同样的内核机制poll_wqueues,不同点在于select()区分读写异常句柄等。
select()和poll()使用了相同的内核poll例程集合,
当select()和poll()用来检查大量文件描述符时,可能会遇到一些问题。
如上种种特性造成select()或poll()在性能延展性上要低于信号驱动IO和epoll()。
epoll API有以下优点:
性能表现上,epoll通信号驱动IO相似。但epoll有一些胜过信号驱动IO:
epoll API由以下三个系统调用组成:
epoll_create()创建一个epoll实例,返回代表该实例的文件描述符。
epoll_ctl()操作同epoll实例相关联的兴趣列表。通过epoll_ctl()可以增加到新的描述符到列表中,将已有的文件描述符从该列表中移除,以及修改代表文件描述符上时间类型的掩码。
epoll_wait()返回与epoll实例相关联的就绪列表中的成员。
#include <sys/epoll.h> int epoll_create(int size);
epoll_create()创建了一个新的epoll实例,其对应的兴趣列表初始化为空。
参数size指定了想要通过epoll实例老检查的文件描述符个数。Linux 2.6.8以来,size参数被忽略不用。
返回值代表新创建的epoll实例的文件描述符,这个文件描述符在其他几个epoll系统调用中用来表示epoll实例。
当文件描述符不再需要时,可以通过close()关闭。
struct epoll_event { uint32_t events; /* epoll events (bit mask) */ epoll_data_t data; /* User data */ }; typedef union epoll_data { void *ptr; /* Pointer to user-defined data */ int fd; /* File descriptor */ uint32_t u32; /* 32-bit integer */ uint64_t u64; /* 64-bit integer */ } epoll_data_t; #include <sys/epoll.h> int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev);
参数epfd是epoll_create()创建的文件描述符。
参数fd指明了要修改兴趣列表中的哪一个文件描述符的设定。
参数op用来指定需要执行的操作,可以是:
参数ev为文件描述符fd所做的设置如下:
每个注册到epoll实例上的文件描述符都占用一小段不能被交换的内核交换空间。
内核提供了一个接口用来定义每个用户可以注册到epoll实例上的文件描述符总数,这个上限值可以通过max_user_watches来修改,这个文件在/proc/sys/fs/epoll/max_user_watches。
#include <sys/epoll.h> int epoll_wait(int epfd, struct epoll_event *evlist, int maxevents, int timeout);
参数evlist指向的结构体数组中返回的是有关就绪态文件描述符的信息。数组evlist的空间由调用者负责申请,所包含的元素个数在参数maxevents中指定。
数组evlist中,每个元素返回的都是单个就绪态文件描述符的信息。events字段返回了在该描述符上已经发生的事件掩码。data字段返回的是使用epoll_ctl()注册感兴趣的事件时在ev.data中所指定的值。
参数timeout用来确定epoll_wait()的阻塞行为:
调用成功后epoll_wait()返回数组evlist中的元素个数,如果在timeout超时间隔内没有任何文件描述符处于就绪态,返回0.
出错时返回-1,并在errno总设定错误码以表示错误原因。
调用epoll_ctl()时可以在ev.events中指定位掩码以及由epoll_wait()返回的evlist[].events中给出。
位掩码 | epoll_ctl()输入 | epoll_wait()返回 | 描述 |
EPOLLIN | √ | √ | 可读取非高优先级的数据 |
EPOLLPRI | √ | √ | 可读取高优先级的数据 |
EPOLLRDHUP | √ | √ | |
EPOLLOUT | √ | √ | 普通数据可写 |
EPOLLET | √ | 采用边缘触发事件通知 | |
EPOLLONESHOT | √ | 在完成事件通知之后禁用检查 | |
EPOLLERR | √ | 有错误发生 | |
EPOLLHUP | √ | 出现挂断 |
一旦通过epoll_ctl()的EPOLL_CTL_ADD将文件描述符添加到epoll实例兴趣列表中,它会保持激活状态,直到显式地通过EPOLL_CTL_DEL操作将其从列表中移除。
如果我们希望在某个特定的文件描述符上只得到一次通知,那么可以在ev.events中指定EPOLLONESHOT标志。
epoll API测试程序内核部分可以和select/poll共用,用户空间测试程序如下。
#include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <stdio.h> #include <memory.h> #include <unistd.h> #include <stdlib.h> #include <sys/epoll.h> #include <errno.h> #define POLLTEST_NAME "/dev/polltest0" #define LOOP_COUNT 3 #define MAX_EVENTS 5 void main(void) { int epfd, i; struct epoll_event ev; struct epoll_event evlist[MAX_EVENTS]; int fd, loop_count = 0; unsigned char polltest_count[30]; int ret; fd = open(POLLTEST_NAME, O_RDWR); if (fd < 0) { printf("%s() %s %s\n", __func__, strerror(errno), POLLTEST_NAME); return; } epfd = epoll_create(MAX_EVENTS);---------------------------------------------里面的size其实已经不重要。 if(epfd == -1) { printf("%s() %s\n", __func__, strerror(errno)); return; } ev.data.fd = fd; ev.events = EPOLLIN; if(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) == -1)----------------------------增加监控fd文件句柄上EPOLLIN事件。 { printf("%s() %s\n", __func__, strerror(errno)); return; } while(1) { ret = epoll_wait(epfd, evlist, MAX_EVENTS, -1);--------------------------在epfd上等待,直到fd上有事件发生。 if (ret == 0) { printf("time out\n"); } else if(ret == -1) { printf("%s() %s\n", __func__, strerror(errno)); } else { for(i = 0; i < ret; i++) { if(evlist[i].events & EPOLLIN)-----------------------------------检查事件类型,并从fd中读取信息。 { read(fd, &polltest_count, 0); loop_count = atoi(polltest_count); printf("epoll key_val = %d\n", loop_count); if(loop_count >= LOOP_COUNT) return; } } } } }
struct eventpoll { spinlock_t lock; struct mutex mtx; /* Wait queue used by sys_epoll_wait() */ wait_queue_head_t wq;-----------------------------后面epoll_wait()使用的等待队列头。 /* Wait queue used by file->poll() */ wait_queue_head_t poll_wait;----------------------file->poll()使用的等待队列头。 /* List of ready file descriptors */ struct list_head rdllist;-------------------------准备就绪的文件描述符链表,在epoll_wait()中返回给用户空间。 /* RB tree root used to store monitored fd structs */ struct rb_root rbr; struct epitem *ovflist; struct wakeup_source *ws; struct user_struct *user; struct file *file; int visited; struct list_head visited_list_link; }; struct epitem { union { struct rb_node rbn; struct rcu_head rcu; }; /* List header used to link this structure to the eventpoll ready list */ struct list_head rdllink; struct epitem *next; struct epoll_filefd ffd; int nwait; struct list_head pwqlist; struct eventpoll *ep; struct list_head fllink; struct wakeup_source __rcu *ws; /* The structure that describe the interested events and the source fd */ struct epoll_event event; }; struct epoll_filefd { struct file *file; int fd; } __packed;
SYSCALL_DEFINE1(epoll_create1, int, flags) { int error, fd; struct eventpoll *ep = NULL; struct file *file; /* Check the EPOLL_* constant for consistency. */ BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC); if (flags & ~EPOLL_CLOEXEC) return -EINVAL; /* * Create the internal data structure ("struct eventpoll"). */ error = ep_alloc(&ep);----------------------------------------------分配一个struct eventpoll结构体,并且初始化。 if (error < 0) return error; /* * Creates all the items needed to setup an eventpoll file. That is, * a file structure and a free file descriptor. */ fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));-------------在当前进程中,寻找一个空闲的文件描述符并返回;同时对文件设置O_RDWR等标志。 if (fd < 0) { error = fd; goto out_free_ep; } file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep, O_RDWR | (flags & O_CLOEXEC));-------------------------创建一个匿名inode文件。 if (IS_ERR(file)) { error = PTR_ERR(file); goto out_free_fd; } ep->file = file;----------------------------------------------------将struct eventpoll和struct file关联起来。 fd_install(fd, file);-----------------------------------------------将fd和file在current->files->fdt中关联起来。 return fd; out_free_fd: put_unused_fd(fd); out_free_ep: ep_free(ep); return error; } SYSCALL_DEFINE1(epoll_create, int, size) { if (size <= 0) return -EINVAL; return sys_epoll_create1(0); } static int ep_alloc(struct eventpoll **pep) { int error; struct user_struct *user; struct eventpoll *ep; user = get_current_user(); error = -ENOMEM; ep = kzalloc(sizeof(*ep), GFP_KERNEL);-----------------------分配一个struct eventpoll结构体。 if (unlikely(!ep)) goto free_uid; spin_lock_init(&ep->lock);-----------------------------------初始化自旋锁、等待队列、红黑树等。 mutex_init(&ep->mtx); init_waitqueue_head(&ep->wq); init_waitqueue_head(&ep->poll_wait); INIT_LIST_HEAD(&ep->rdllist); ep->rbr = RB_ROOT; ep->ovflist = EP_UNACTIVE_PTR; ep->user = user; *pep = ep; return 0; free_uid: free_uid(user); return error; } int get_unused_fd_flags(unsigned flags) { return __alloc_fd(current->files, 0, rlimit(RLIMIT_NOFILE), flags);----------current->files表示当前进程打开文件相关信息,通过RLIMIT_NOFILE可以获取系统最大打开文件数目。 } int __alloc_fd(struct files_struct *files, unsigned start, unsigned end, unsigned flags)------------------------__alloc_fd()在start和end之间,寻找一个合适的fd返回。 { unsigned int fd; int error; struct fdtable *fdt; spin_lock(&files->file_lock); repeat: fdt = files_fdtable(files); fd = start; if (fd < files->next_fd) fd = files->next_fd; if (fd < fdt->max_fds) fd = find_next_fd(fdt, fd); error = -EMFILE; if (fd >= end) goto out; error = expand_files(files, fd); if (error < 0) goto out; if (error) goto repeat; if (start <= files->next_fd) files->next_fd = fd + 1; __set_open_fd(fd, fdt);-----------------------------------------------------将fd设置为打开状态。 if (flags & O_CLOEXEC) __set_close_on_exec(fd, fdt); else __clear_close_on_exec(fd, fdt); error = fd; ... out: spin_unlock(&files->file_lock); return error; }
系统调用epoll_create()就是进程在内核中创建了一个从epoll文件描述符到struct event结构的通道;首先调用ep_alloc()分配一个struct eventpoll数据结构,并初始化;然后寻找一个空闲的文件描述符,并创建一个匿名文件inode;最后将两者在进程task_struct结构体中关联起来。
最终返回打开的文件描述符。
后面的epoll_ctl()和epoll_wait()都可以通过此文件描述符关联到内核中的struct eventpoll结构体。
struct ep_pqueue { poll_table pt; struct epitem *epi; }; typedef struct poll_table_struct { poll_queue_proc _qproc; unsigned long _key; } poll_table; typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *);
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd, struct epoll_event __user *, event) { int error; int full_check = 0; struct fd f, tf; struct eventpoll *ep; struct epitem *epi; struct epoll_event epds; struct eventpoll *tep = NULL; error = -EFAULT; if (ep_op_has_event(op) && copy_from_user(&epds, event, sizeof(struct epoll_event)))--------------------将用户空间的event拷贝到内核的epds中。 goto error_return; error = -EBADF; f = fdget(epfd);-----------------------------------------------------------------根据epfd找到对应的struct file结构体。 if (!f.file) goto error_return; tf = fdget(fd);------------------------------------------------------------------获取目标文件的struct file结构体。 if (!tf.file) goto error_fput; /* The target file descriptor must support poll */ error = -EPERM; if (!tf.file->f_op->poll) goto error_tgt_fput; /* Check if EPOLLWAKEUP is allowed */ if (ep_op_has_event(op)) ep_take_care_of_epollwakeup(&epds); error = -EINVAL; if (f.file == tf.file || !is_file_epoll(f.file)) goto error_tgt_fput; if (epds.events & EPOLLEXCLUSIVE) { if (op == EPOLL_CTL_MOD) goto error_tgt_fput; if (op == EPOLL_CTL_ADD && (is_file_epoll(tf.file) || (epds.events & ~EPOLLEXCLUSIVE_OK_BITS))) goto error_tgt_fput; } ep = f.file->private_data; mutex_lock_nested(&ep->mtx, 0); if (op == EPOLL_CTL_ADD) { if (!list_empty(&f.file->f_ep_links) || is_file_epoll(tf.file)) { full_check = 1; mutex_unlock(&ep->mtx); mutex_lock(&epmutex); if (is_file_epoll(tf.file)) { error = -ELOOP; if (ep_loop_check(ep, tf.file) != 0) { clear_tfile_check_list(); goto error_tgt_fput; } } else list_add(&tf.file->f_tfile_llink, &tfile_check_list); mutex_lock_nested(&ep->mtx, 0); if (is_file_epoll(tf.file)) { tep = tf.file->private_data; mutex_lock_nested(&tep->mtx, 1); } } } epi = ep_find(ep, tf.file, fd);-------------------------------------------------在ep->rbr红黑树中查找file对应的文件,如果没有找到返回NULL。 error = -EINVAL; switch (op) { case EPOLL_CTL_ADD: if (!epi) {-----------------------------------------------------------------如果之前ep_find()没有找到对应的epi,那么此处插入到ep->rbr红黑树中。 epds.events |= POLLERR | POLLHUP; error = ep_insert(ep, &epds, tf.file, fd, full_check); } else error = -EEXIST; if (full_check) clear_tfile_check_list(); break; case EPOLL_CTL_DEL: if (epi) error = ep_remove(ep, epi);---------------------------------------------在ep_find()找到epi的情况下,将其从ep->rbr中移除。 else error = -ENOENT; break; case EPOLL_CTL_MOD: if (epi) { if (!(epi->event.events & EPOLLEXCLUSIVE)) { epds.events |= POLLERR | POLLHUP; error = ep_modify(ep, epi, &epds);----------------------------------修改epi的事件类型。 } } else error = -ENOENT; break; } if (tep != NULL) mutex_unlock(&tep->mtx); mutex_unlock(&ep->mtx); ... return error; } static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd, int full_check) { int error, revents, pwake = 0; unsigned long flags; long user_watches; struct epitem *epi; struct ep_pqueue epq; user_watches = atomic_long_read(&ep->user->epoll_watches); if (unlikely(user_watches >= max_user_watches)) return -ENOSPC; if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))-------------------------从slab中分配缓存struct epitem结构体空间。 return -ENOMEM; /* Item initialization follow here ... */ INIT_LIST_HEAD(&epi->rdllink);------------------------------------------------初始化epi数据结构。 INIT_LIST_HEAD(&epi->fllink); INIT_LIST_HEAD(&epi->pwqlist); epi->ep = ep; ep_set_ffd(&epi->ffd, tfile, fd); epi->event = *event; epi->nwait = 0; epi->next = EP_UNACTIVE_PTR; if (epi->event.events & EPOLLWAKEUP) { error = ep_create_wakeup_source(epi); if (error) goto error_create_wakeup_source; } else { RCU_INIT_POINTER(epi->ws, NULL); } epq.epi = epi; init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);-----------------------------将epq.pt->_qproc指向ep_ptable_queue_proc()。设置后面poll()等待队列被唤醒的时候,将要调用到的函数。 revents = ep_item_poll(epi, &epq.pt);-----------------------------------------调用epi对应文件的poll()函数,插入到poll()的等待队列中。;返回的事件类型时用户空间关心事件类型的交集。 error = -ENOMEM; if (epi->nwait < 0) goto error_unregister; spin_lock(&tfile->f_lock); list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links); spin_unlock(&tfile->f_lock); ep_rbtree_insert(ep, epi);----------------------------------------------------将新的epi插入到ep->rbr的红黑树中。 ... if (pwake) ep_poll_safewake(&ep->poll_wait); return 0; ... return error; } static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt) { struct epitem *epi = ep_item_from_epqueue(pt); struct eppoll_entry *pwq;---------------------------------------------------------struct epoll_entry主要完成struct epitem和和epitem事件发生时的callback函数之间的关联。 if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {---------申请一个struct epoll_entry结构体缓存。 init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);----------------------初始化等待队列函数的入口,也就是poll醒来时要调用的回调函数。 pwq->whead = whead; pwq->base = epi; if (epi->event.events & EPOLLEXCLUSIVE) add_wait_queue_exclusive(whead, &pwq->wait); else add_wait_queue(whead, &pwq->wait);---------------------------------------将pwq->wait加入到等待队列中。 list_add_tail(&pwq->llink, &epi->pwqlist); epi->nwait++; } else { /* We have to signal that an error occurred */ epi->nwait = -1; } } static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)---主要功能试在被监视的文件等待时间就绪时,将文件对应的epitem实例添加到就绪列表中,当用户调用epoll_wait()时,内核会将就绪队列中的时间报告给用户。 { int pwake = 0; unsigned long flags; struct epitem *epi = ep_item_from_wait(wait); struct eventpoll *ep = epi->ep; int ewake = 0; if ((unsigned long)key & POLLFREE) { ep_pwq_from_wait(wait)->whead = NULL; list_del_init(&wait->task_list); } spin_lock_irqsave(&ep->lock, flags); if (!(epi->event.events & ~EP_PRIVATE_BITS)) goto out_unlock; if (key && !((unsigned long) key & epi->event.events)) goto out_unlock; if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) { if (epi->next == EP_UNACTIVE_PTR) { epi->next = ep->ovflist; ep->ovflist = epi; if (epi->ws) { __pm_stay_awake(ep->ws); } } goto out_unlock; } if (!ep_is_linked(&epi->rdllink)) { list_add_tail(&epi->rdllink, &ep->rdllist); ep_pm_stay_awake_rcu(epi); } if (waitqueue_active(&ep->wq)) { if ((epi->event.events & EPOLLEXCLUSIVE) && !((unsigned long)key & POLLFREE)) { switch ((unsigned long)key & EPOLLINOUT_BITS) { case POLLIN: if (epi->event.events & POLLIN) ewake = 1; break; case POLLOUT: if (epi->event.events & POLLOUT) ewake = 1; break; case 0: ewake = 1; break; } } wake_up_locked(&ep->wq); } if (waitqueue_active(&ep->poll_wait)) pwake++; ... return 1; } static int ep_modify(struct eventpoll *ep, struct epitem *epi, struct epoll_event *event) { int pwake = 0; unsigned int revents; poll_table pt; init_poll_funcptr(&pt, NULL); epi->event.events = event->events; /* need barrier below */ epi->event.data = event->data; /* protected by mtx */ if (epi->event.events & EPOLLWAKEUP) { if (!ep_has_wakeup_source(epi)) ep_create_wakeup_source(epi); } else if (ep_has_wakeup_source(epi)) { ep_destroy_wakeup_source(epi); } smp_mb(); revents = ep_item_poll(epi, &pt); if (revents & event->events) { spin_lock_irq(&ep->lock); if (!ep_is_linked(&epi->rdllink)) { list_add_tail(&epi->rdllink, &ep->rdllist); ep_pm_stay_awake(epi); /* Notify waiting tasks that events are available */ if (waitqueue_active(&ep->wq)) wake_up_locked(&ep->wq); if (waitqueue_active(&ep->poll_wait)) pwake++; } spin_unlock_irq(&ep->lock); } /* We have to call this outside the lock */ if (pwake) ep_poll_safewake(&ep->poll_wait); return 0; } static int ep_remove(struct eventpoll *ep, struct epitem *epi)-------------------------------------将epi从当前ep中移除。 { unsigned long flags; struct file *file = epi->ffd.file; ep_unregister_pollwait(ep, epi); spin_lock(&file->f_lock); list_del_rcu(&epi->fllink); spin_unlock(&file->f_lock); rb_erase(&epi->rbn, &ep->rbr); spin_lock_irqsave(&ep->lock, flags); if (ep_is_linked(&epi->rdllink)) list_del_init(&epi->rdllink); spin_unlock_irqrestore(&ep->lock, flags); wakeup_source_unregister(ep_wakeup_source(epi)); call_rcu(&epi->rcu, epi_rcu_free); atomic_long_dec(&ep->user->epoll_watches); return 0; }
epoll_ctl() 函数首先就分配空间, 将结构从用户空间复制到内核空间中, 在进行方法(op)判断之前, 先采用ep_find函数进行查找, 以确保该数据已经设置好回调函数了, 然后使用fget函数获取该epoll的匿名文件的文件描述符, 最后进行方法(op)判断, 确定是EPOLL_CTL_ADD, EPOLL_CTL_MOD还是 EPOLL_CTL_DEL。
这里主要讲的是EPOLL_CTL_ADD, 所以当是选择加入时, 就调用ep_insert函数, 将回调函数设置为ep_ptable_queue_proc函数, 也就是将消息到达后, 需要自动启动ep_ptable_proc函数, 进而调用ep_poll_callback函数, 该函数就是把来的消息所对应的结构和文件信息加入到就绪链表中, 以便之后调用 epoll_wait 可以直接从就绪队列链表中夺得就绪的文件. 也正是这样, epoll的回调函数使epoll不用每次都轮询遍历数据, 而是自动唤醒回调, 更加的高效. 并且回调函数也只是在进程加入的时侯才设置, 而且只设置一次.
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events, int, maxevents, int, timeout) { int error; struct fd f; struct eventpoll *ep; if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)----------------------------------判断maxevents合法性。 return -EINVAL; if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) return -EFAULT; f = fdget(epfd);------------------------------------------------------------------获取epoll_create()函数创建的匿名文件描述符。 if (!f.file) return -EBADF; error = -EINVAL; if (!is_file_epoll(f.file))-------------------------------------------------------通过判断f.file->f_op是否为eventpoll_fops来确定是否为epoll文件。 goto error_fput; ep = f.file->private_data;--------------------------------------------------------私有数据指向struct eventpoll数据结构。 error = ep_poll(ep, events, maxevents, timeout);----------------------------------等待消息到来;没有消息到来就阻塞自己。 error_fput: fdput(f); return error; } static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout) { int res = 0, eavail, timed_out = 0; unsigned long flags; u64 slack = 0; wait_queue_t wait; ktime_t expires, *to = NULL; if (timeout > 0) {----------------------------------------------------------------将用户空间传入的timeout事件转换成内核超时时间。 struct timespec64 end_time = ep_set_mstimeout(timeout); slack = select_estimate_accuracy(&end_time); to = &expires; *to = timespec64_to_ktime(end_time); } else if (timeout == 0) { timed_out = 1; spin_lock_irqsave(&ep->lock, flags); goto check_events; } fetch_events: spin_lock_irqsave(&ep->lock, flags); if (!ep_events_available(ep)) { init_waitqueue_entry(&wait, current);-----------------------------------------初始化一个等待队列入口,并将其添加到等待队列上。 __add_wait_queue_exclusive(&ep->wq, &wait); for (;;) { set_current_state(TASK_INTERRUPTIBLE);------------------------------------进程设置为可中断的。 if (ep_events_available(ep) || timed_out)---------------------------------退出的条件是是否有ready事件、是否超时、是否有信号中断,三者任一则退出循环。 break; if (signal_pending(current)) { res = -EINTR; break; } spin_unlock_irqrestore(&ep->lock, flags); if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))---------------时间达到之前,让出cpu调用其他进程;超时后,重新通过中断回调该进程。 timed_out = 1; spin_lock_irqsave(&ep->lock, flags); } __remove_wait_queue(&ep->wq, &wait); __set_current_state(TASK_RUNNING); } check_events: eavail = ep_events_available(ep); spin_unlock_irqrestore(&ep->lock, flags); if (!res && eavail && !(res = ep_send_events(ep, events, maxevents)) && !timed_out)-----------------如果中间没有被信号中断,并且ep->rdlist不为空,则调用ep_send_events()给用户空间发送消息。 goto fetch_events; return res; } static int ep_send_events(struct eventpoll *ep, struct epoll_event __user *events, int maxevents) { struct ep_send_events_data esed; esed.maxevents = maxevents; esed.events = events; return ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false); } static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv) { struct ep_send_events_data *esed = priv; int eventcnt; unsigned int revents; struct epitem *epi; struct epoll_event __user *uevent; struct wakeup_source *ws; poll_table pt; init_poll_funcptr(&pt, NULL); for (eventcnt = 0, uevent = esed->events; !list_empty(head) && eventcnt < esed->maxevents;) {---------------------------从就绪队列去除一个个epi,进行处理;并将相关事件返回给用户空间。 epi = list_first_entry(head, struct epitem, rdllink);--------------------------取出第一个消息数据对应的struct epitem结构,然后从就绪列表上移除。 ws = ep_wakeup_source(epi); if (ws) { if (ws->active) __pm_stay_awake(ep->ws); __pm_relax(ws); } list_del_init(&epi->rdllink); revents = ep_item_poll(epi, &pt);----------------------------------------------调用epi对应文件描述符的poll()函数,返回值是。 if (revents) { if (__put_user(revents, &uevent->events) || __put_user(epi->event.data, &uevent->data)) {--------------------------将revents和event.data发送到用户空间。 list_add(&epi->rdllink, head); ep_pm_stay_awake(epi); return eventcnt ? eventcnt : -EFAULT; } eventcnt++;----------------------------------------------------------------返回给用户空间的参数eventcnt递增,uevent也递增。 uevent++; if (epi->event.events & EPOLLONESHOT) epi->event.events &= EP_PRIVATE_BITS; else if (!(epi->event.events & EPOLLET)) { list_add_tail(&epi->rdllink, &ep->rdllist); ep_pm_stay_awake(epi); } } } return eventcnt; }
获得匿名文件的文件指针, 在通过调用ep_poll函数, 进行时间片的设置, 在时间片结束后就绪队列为空, 就退出等待; 如果时间设置的是负数, ep_poll函数会调用schedule_timeout, 执行进程调度, 当设置的时间片结束后又继续回到ep_poll进程 查看就绪队列是否为空, 为空的话就继续cinching调度, 此时wait又变成阻塞态; 当就绪队列准备好后, 就退出进程调度, 执行ep_send_events函数, 主要是为了将就绪队列的链表从内核空间发送给用户空间。
ep_send_events函数, 先将就绪链表rdllist复制到另一个新的链表, 重新将就绪链表清零, 然后程序调用__put_user将新链表的数据发送给用户空间, 同时, 发送的个数吉拉路下来, 以便函数的返回 , 但是, 如果在发送的时候又有就绪信号到来, 就将来的就绪信号保存在ovflist链表中, 最后又重新数据拷贝给rdllist中, 再重复执行ep_send_events函数。
从对epoll的分析也可以看出,为什么性能要优于select()/poll()。
参考文档:
《select(poll)系统调用实现解析(一)》《select(poll)系统调用实现解析(二)》《select(poll)系统调用实现解析(三)》《epoll源码分析(一)》《epoll源码分析(二)》《epoll源码分析(三)》《Linux epoll模型详解及源码分析》《epoll源码实现分析[整理]》《Linux下的I/O复用与epoll详解》《epoll源码分析(全)》《epoll内核源码详解+自己总结的流程》
《Linux/UNIX系统编程手册》第63章 IO多路复用、信号驱动IO以及epoll
原文:https://www.cnblogs.com/arnoldlu/p/10264350.html