转自:http://kernel.meizu.com/linux-workqueue.html
Workqueue 是内核里面很重要的一个机制,特别是内核驱动,一般的小型任务 (work) 都不会自己起一个线程来处理,而是扔到 Workqueue 中处理。Workqueue 的主要工作就是用进程上下文来处理内核中大量的小任务。
所以 Workqueue 的主要设计思想:一个是并行,多个 work 不要相互阻塞;另外一个是节省资源,多个 work 尽量共享资源 ( 进程、调度、内存 ),不要造成系统过多的资源浪费。
为了实现的设计思想,workqueue 的设计实现也更新了很多版本。最新的 workqueue 实现叫做 CMWQ(Concurrency Managed Workqueue),也就是用更加智能的算法来实现“并行和节省”。新版本的 workqueue 创建函数改成 alloc_workqueue()
,旧版本的函数 create_workqueue()
逐渐会被被废弃。
本文的代码分析基于 Linux kernel 3.18.22,最好的学习方法还是 “read the fucking source code”
关于 workqueue 中几个概念都是 work 相关的数据结构非常容易混淆,大概可以这样来理解:
work_thread()
内核线程。最终的目的还是把 work( 工作 ) 传递给 worker( 工人 ) 去执行,中间的数据结构和各种关系目的是把这件事组织的更加清晰高效。
每个执行 work 的线程叫做 worker,一组 worker 的集合叫做 worker_pool。CMWQ 的精髓就在 worker_pool 里面 worker 的动态增减管理上 manage_workers()
。
CMWQ 对 worker_pool 分成两类:
默认 work 是在 normal worker_pool 中处理的。系统的规划是每个 CPU 创建两个 normal worker_pool:一个 normal 优先级 (nice=0)、一个高优先级 (nice=HIGHPRI_NICE_LEVEL),对应创建出来的 worker 的进程 nice 不一样。
每个 worker 对应一个 worker_thread()
内核线程,一个 worker_pool 包含一个或者多个 worker,worker_pool 中 worker 的数量是根据 worker_pool 中 work 的负载来动态增减的。
我们可以通过 ps | grep kworker
命令来查看所有 worker 对应的内核线程,normal worker_pool 对应内核线程 (worker_thread()
) 的命名规则是这样的:
snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
pool->attrs->nice < 0 ? "H" : "");
worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
"kworker/%s", id_buf);
so 类似名字是 normal worker_pool:
shell@PRO5:/ $ ps | grep "kworker"
root 14 2 0 0 worker_thr 0000000000 S kworker/1:0H // cpu1 高优先级 worker_pool 的第 0 个 worker 进程
root 17 2 0 0 worker_thr 0000000000 S kworker/2:0 // cpu2 低优先级 worker_pool 的第 0 个 worker 进程
root 18 2 0 0 worker_thr 0000000000 S kworker/2:0H // cpu2 高优先级 worker_pool 的第 0 个 worker 进程
root 23699 2 0 0 worker_thr 0000000000 S kworker/0:1 // cpu0 低优先级 worker_pool 的第 1 个 worker 进程
对应的拓扑图如下:
以下是 normal worker_pool 详细的创建过程代码分析:
init_workqueues()
-> init_worker_pool()
/create_worker()
static int __init init_workqueues(void)
{
int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL };
int i, cpu;
// (1) 给每个 cpu 创建对应的 worker_pool
/* initialize CPU pools */
for_each_possible_cpu(cpu) {
struct worker_pool *pool;
i = 0;
for_each_cpu_worker_pool(pool, cpu) {
BUG_ON(init_worker_pool(pool));
// 指定 cpu
pool->cpu = cpu;
cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
// 指定进程优先级 nice
pool->attrs->nice = std_nice[i++];
pool->node = cpu_to_node(cpu);
/* alloc pool ID */
mutex_lock(&wq_pool_mutex);
BUG_ON(worker_pool_assign_id(pool));
mutex_unlock(&wq_pool_mutex);
}
}
// (2) 给每个 worker_pool 创建第一个 worker
/* create the initial worker */
for_each_online_cpu(cpu) {
struct worker_pool *pool;
for_each_cpu_worker_pool(pool, cpu) {
pool->flags &= ~POOL_DISASSOCIATED;
BUG_ON(!create_worker(pool));
}
}
}
| →
static int init_worker_pool(struct worker_pool *pool)
{
spin_lock_init(&pool->lock);
pool->id = -1;
pool->cpu = -1;
pool->node = NUMA_NO_NODE;
pool->flags |= POOL_DISASSOCIATED;
// (1.1) worker_pool 的 work list,各个 workqueue 把 work 挂载到这个链表上,
// 让 worker_pool 对应的多个 worker 来执行
INIT_LIST_HEAD(&pool->worklist);
// (1.2) worker_pool 的 idle worker list,
// worker 没有活干时,不会马上销毁,先进入 idle 状态备选
INIT_LIST_HEAD(&pool->idle_list);
// (1.3) worker_pool 的 busy worker list,
// worker 正在干活,在执行 work
hash_init(pool->busy_hash);
// (1.4) 检查 idle 状态 worker 是否需要 destroy 的 timer
init_timer_deferrable(&pool->idle_timer);
pool->idle_timer.function = idle_worker_timeout;
pool->idle_timer.data = (unsigned long)pool