并发(Concurrency)是Linux内核中经常会发生的事情. 并发会引起竞态(Race)和阻塞(Block)的问题.
首先, 我们看一个并发引起竞态示例, 对并发有个大致印象, 看看竞态会引起那些问题.
然后, 我们会详解介绍并发, 竞态, 阻塞与休眠的概念.
接着, 我们会介绍Linux提供的用于解决竞态的机制.
接着, 会介绍Linux提供的阻塞机制.
首先, 来看这样一段设备驱动的代码:
if (!dptr->data[s_pos]) {
dptr->data[s_pos] = kmalloc(quantum, GFP_KERNEL);
if (!dptr->data[s_pos])
goto out;
}
上面这段代码是用一个指针数组去存放memory, 当data[s_pos]为NULL, 说明它没有指向某一块memory, 则分配一块memory给它.
想象这样一种情况, 如果有两个进程(进程A和进程B)同时访问这段代码, 两个进程同时执行到了if里面的判断语句, 然后都得到了同样的结论: data[s_pos]为NULL. 然后A和B都决定分配一块memory, 赋值给data[s_pos].
会发生什么? 最终的结果就是, 谁后完成, 谁就会”win”. 如果A先完成分配, data[s_pos]就会被B分配的memory覆盖. 导致的问题就是内存泄露: A分配的那块内存永远不会被释放.
上面的示例描述了并发(A, B进程同时运行上述代码)导致的竞争问题.
上述的竞争问题会导致内存泄露, 实际上竞态会引起各种各样的问题, 比如系统崩溃, 数据被覆盖, 安全问题等等.
在目前的Linux操作系统中, 有各种各样的情况会导致并发访问, 我们尝试把这些情况列举出来, 但是可能不全面:
? 用户空间运行着多个进程, 由于进程调度的原因, 每个进程运行一段时间之后, 都会被切出, 换另一个进程运行, 这个过程也叫时间片轮转的进程调度方式. 因此这些进程会以各种各样的组合方式访问你编写内核代码.
? 在多处理器系统中(SMP), 你的代码可能会同时运行在多个处理器上.
? 内核是支持抢占的(preemptible), 即使内核不支持时间片轮转的进程调度方式, 你编写的内核代码也可能在任何时候失去处理器, 而那个抢占了CPU的更高优先级的进程, 可能也会运行你的内核代码.
? 设备中断随时可能发生, 有可能你的代码正在修改一个全局变量时, 中断发生了, 你的代码必须去响应这个中断, 中断处理代码可能也要修改这个全局变量.
? 内核提供了各种各样的延迟执行代码的方式, 比如workqueues, tasklets, timers等.
? 内核支持热插拔, 有可能你的驱动正准备操作某个设备时, 它被拔出了, 从内核消失了.
并发访问加上共享资源就会导致竞态.
并发访问的可能情况上面我们已经介绍了
共享资源的形态也是各种各样:
? 一个全局变量
? 任何时候你的代码传递了一个指针到内核的其他部分, 也可能会成为一个共享资源
? 等等…
竞态在内核中是个非常难处理的问题, 即便是内核领域的专家, 在处理竞态问题时也可能会留下bug, 而且一旦出现bug, 将会非常难调试, 因为这些bug是随机发生的, 没什么规律可循.
所以我们在设计内核代码的时候, 要尽量避免竞态的出现. 我们没法禁止并发访问, 因此只能尽量减少共享资源, 例如能用局部变量时, 就不要用全局变量. 任何时候你想共享一个资源给其他代码片段访问的时候, 都要有足够的理由. 这里的代码片段是指上述可能引起并发访问的代码(例如中断处理函数, 与workqueues等相关的异步通知处理函数)
但是, 我们没法完全避免竞态, 解决竞态问题的机制一般是锁, 或者叫互斥锁, 它的核心思想就是在任何时候, 都只有一个代码片段能访问共享资源. 内核提供了各种各样的锁机制, 我们在后面会逐步介绍.
并发访问还会导致另外一个问题, 阻塞.
比如一个进程A可能需要等待另一个进程B完成某件事情之后, 才能继续. 在等待进程B完成某件事情的时候, 进程A就会进入阻塞状态, 进程B完成某件事情之后, 就会通知进程A, 然后进程A就可以继续执行了.
内核也提供了各种各样的阻塞机制, 我们在后面会逐步介绍.
休眠(sleep)在内核中很常见, 很多操作都会导致进程休眠, 这一节我们要详细介绍休眠.
简单的说, 休眠是在一个当前进程等待暂时无法获得的资源或者一个event时, 为了避免当前进程浪费CPU时间, 将自己放入进程等待队列中, 同时让出CPU给别的进程.
休眠就是为了更好地利用CPU.
休眠是针对进程的, 也就是拥有task_struct(task)的独立个体.
一个task拥有多种状态:
TASK_RUNNING代表task在进程调度队列中, 在某个时刻, 会被调度到CPU上运行.
TASK_INTERRUPTIBLE 和 TASK_UNINTERRUPTIBLE则代表task会移出进程调度队列.
进程休眠的思路就是:
? 首先保存当前进程的状态
? 然后将 task->state 设置为 TASK_UNINTERRUPTIBLE 或者 TASK_INTERRUPTIBLE
? 最后调用schedule(), 让出CPU.
由于state不在是TASK_RUNNING, 因此task就不在进程调度列表中了, 也不会被调度到CPU上运行.
而休眠唤醒的思路就是把state重新设置为TASK_RUNNING, 让task重新进入进程调度列表.
上面我们提到进程在休眠时可以被设置为TASK_INTERRUPTIBLE 和 TASK_UNINTERRUPTIBLE这两种状态, 那这两种状态有什么区别呢?
先想想一个进程为什么要休眠? 因为它要等待某个资源(进程A), 当另外一个进程释放这个资源之后(进程B), 就要唤醒进程A. 所谓唤醒就是告诉进程A, 我已经释放资源了, 你可以再次尝试获取它并重新进入调度队列运行, 我们把这个唤醒称为事件唤醒.
休眠的进程除了可以被事件唤醒外, 还可以被信号(signal)唤醒. 信号本质是对中断机制的一种模拟, 软中断. 最常用发送信号(当然还有一些其他方式能产生信号)的系统函数是kill, raise, alarm和setitimer以及sigqueue函数, 这些系统调用会产生SIGKILL(asm/signal.h中定义)等不同的信号.
事件唤醒和信号唤醒的区别就是: 事件唤醒是一次有意义的唤醒, 代表资源可用了; 而信号唤醒跟资源是否有效无关, 它有时候是为了杀死进程(SIGKILL).
TASK_INTERRUPTIBLE : 代表进程休眠后, 可以被事件唤醒, 也可以被信号唤醒.
TASK_UNINTERRUPTIBLE : 代表进程休眠后, 只能被事件唤醒. 如果它需要的资源一直没有被释放, 则它将永远睡眠下去, 不可被杀死, 直到系统重启. 所以尽量不要用这种休眠方式.
那么当某个进程被唤醒之后, 它怎么知道自己是被事件唤醒的还是被信号唤醒的呢?
Linux内核提供了系统调用signal_pending(struct task_struct *p), 当它的返回值为非0值时, 代表进程是被信号而非事件唤醒的.
内核中很多操作都会引起休眠, 有些可能是你意想不到的, 我们尝试在这里列举一些, 但不可能列举出所有的.
不过你也不用担心, 任何休眠操作都是遵循上节描述的休眠操作的思路的. 如果你怀疑你个函数是否会引起休眠, 深挖它的代码, 就能知道.
会引起休眠的操作主要包括:
? 为了解决竞态问题, 内核提供了锁机制, 某些锁会导致休眠.
? 为了解决阻塞问题, 内核提供了阻塞机制, 所以的阻塞机制都会导致休眠.
? 内存分配函数, 例如 kmalloc会导致休眠(没有想到吧!).
? copy_to_user, copy_from_user函数会导致休眠.
? 等等(后面会持续添加)
Note: 下面的内容在编写的时候, 不保证完全正确, 等详细了解了内核的进程调度相关机制之后, 在更新相关内容.
内核中, 有很多情况不能休眠, 也就是说在这些情况下, 不能调用上述会引起休眠的函数.
同样不可能列出所有的情况, 下面列举一些已经的:
? 一个进程一旦休眠, 自己就不会被调度进CPU运行了, 所以进程自己不可能唤醒自己, 也就是说进程不可能睡眠睡到自然醒, 它必须被另外一个进程或者中断唤醒. 所以你打算让一个进程休眠时, 一定要确保在某个地方有人唤醒它.
? 永远不要在原子上下文中进入休眠, 所谓原子上下文就是被自旋锁、seqlock或者RCU锁保护的临界代码区. 本文后面章节会依次介绍这些锁, 并解释不能休眠的原因
? 如果你的进程已经关闭了中断, 也不能休眠
? 终端例程中也不可休眠
? 中断处理的时候, 不能休眠. 因为休眠是针对进程的, 中断处理程序处于中断上下文中, 中断上下文与具体的某个进程无关. 中断上下文中没有进程概念, 没有对应的task_struct, 因此不可在中断上下文中休眠.
前面提到并发会引起竞态和阻塞的问题, 内核提供了多种机制来解决这些问题, 下面来分别介绍它们.
在介绍每种机制的同时, 我们都会介绍它是否会引起休眠, 以及它所保护的临界代码段中能否休眠.
解决竞态问题的机制就是locking, 保证在任何时刻, 只有一方能访问共享资源.
内核提供了多种锁机制, 每种锁的适用场景不完全一样, 下面我们逐一介绍.
它所保护的代码段中能否进入休眠: 可以
Semaphores(信号量)是计算机领域的一个重要概念. 它的核心思想是一个无符号整数+操作这个整数的一系列函数. 这些函数通常叫做P/V操作.
当一个进程想要访问共享资源时: 在信号量上执行P操作, 如果信号量大于0, 则P操作将信号量减一, 并开始访问共享资源; 反之, 如果信号量等于0, 则P操作将会阻塞, 以等待资源有效.
当拥有共享资源的进程使用完毕共享资源, 会执行V操作解锁共享资源, V操作会将信号量加一, 如果有其它进程正在等待该资源, V操作也会唤醒该进程.
在计算机领域, 信号量的初值往往大于1, 代表有多个进程可以同时访问某个资源, 例如信号量为5, 则代表最多有5个进程可以同时访问某个资源.
当信号量用于互斥访问时, 它的初值是1, 代表某个时刻只能有1个进程访问共享资源. 在Linux内核中, 信号量基本上都是用于互斥访问, 所以它的初值经常是1.
头文件: include/linux/semaphore.h
注意该数据结构的内容只能提供内核提供的semaphores API来访问, 不能直接访问
struct semaphore |
Comment |
raw_spinlock_t |
原子锁, 主要目的是保证对信号量的加/减操作是原子的 |
unsigned intcount |
信号量计数值 |
struct list_headwait_list |
当资源不可用时, 信号量会导致调用它的进程休眠. wait_list用于保存进程状态, 以便于在资源可用时, 唤醒该进程 |
Linux内核实现了上述的信号量用于互斥访问的机制.
头文件: include/linux/semaphore.h
实现文件: kernel/locking/semaphore.c
semaphore API |
Comment |
void sema_init(struct semaphore *sem, int val) |
当你手动定义了一个信号量之后, 例如struct semaphore sem; 可以用该API初始化sem, 初值一般赋值为1, 例如sema_init(&sem, 1) |
这是一个宏, 用于定义一个信号量并初始化其初值为1 例如DEFINE_SEMAPHORE(sem) 相当于定义了信号量sem并将其初值赋为1 |
|
void down(struct semaphore *sem) |
相当于P操作 资源不可用时会导致进程休眠 休眠的进程只能被事件唤醒 没有返回值 |
int __must_check down_interruptible(struct semaphore *sem) |
相当于P操作 与down的不同之处在于 休眠的进程可以被事件或者信号唤醒. 事件与信号唤醒的区别在《休眠的本质是什么》一节中有描述 返回0表示资源可用 返回非0值表示进程是被信号唤醒的 |
int __must_check down_killable(struct semaphore *sem) |
相当于P操作 与down_ interruptible的不同之处在于 休眠的进程只能被事件或者SIGKILL信号唤醒 返回非0值表示进程是被SIGKILL信号唤醒的 |
int __must_check down_trylock(struct semaphore *sem) |
相当于P操作 该API不会引起进程休眠 当资源可用时, 该API会把信号量减一, 返回0 当资源不可用时, 该API直接返回1 |
int __must_check down_timeout(struct semaphore *sem, long jiffies) |
相当于P操作 休眠的进程只能被事件唤醒 与down的不同之处在于, 该API可设置超时时间 如果过了jiffies时长后, 资源任然不可用, 则等待资源超时 返回0表示资源可用 返回非0值表示等待超时 |
void up(struct semaphore *sem) |
相当于V操作 一旦调用该API, 表示资源使用完毕, 不在占用该资源 如果有其它进程在等待该信号量, up也会唤醒该进程 如果有多个进程都在等待该信号量, up只会唤醒第一个进程 |
如何正确使用信号量
内核提供了信号量机制, 但是如何正确使用信号量机制就是使用者自己的事情了, 在使用信号量时, 有一些基本注意事项
? 首先, 信号量要先被初始化, 然后才能进行down/up操作, 否则可能引起奇怪的错误
? 在使用信号量的时候, 哪些资源是你希望被保护的, 要自己考虑清楚
? 在尝试访问共享资源时, 我们会先调用down或者down_xxx操作, 除了down没有返回值, 另外几个API都有返回值(见API说明).
例如我们编写了一个字符设备驱动, 在它的write函数中针对某个信号量调用了down_xxx, 如果返回值不为0, 代表被信号唤醒, 或者等待超时, 或者资源不可用, 此时我们可以返回-ERESTARTSYS给内核上层代码(就是调用write函数的内核核心层代码), 例如:
当内核收到- ERESTARTSYS, 核心层有可能再次调用你的代码(例如再次调用write函数), 也可能将错误返回到用户空间.
所以如果你打算返回- ERESTARTSYS, 那你需要在返回之前un-do那些引起变化的操作, 因为系统会认为什么都没发生, 重调你的代码
如果你不能un-do这些变化, 那就应该返回-EINTR
? 如果down或者down_xxx执行成功, 代表你获得了对共享资源的访问权.
接下来, 你将会在共享资源上进行一些操作, 不管这些操作是否会失败, 在结束的时候一定要用up释放信号量. 很多时候我们在操作失败时就直接return了, 忘记释放信号量, 这是一个严重的错误.
Semaphores保护的代码段中能否休眠
是否会引起休眠: 是
它所保护的代码段中能否进入休眠: 可以
Semaphores机制实现了严格的互斥访问, 任何时候, 不管是读还是写操作, 都只有1个能访问共享资源.
但是很多时候, 读操作是可以并发的. 例如多个进程可以同时读取同一个共享资源, 因为读操作不会对共享资源做任何修改. 并发的读操作可以显著提升性能, 一个读操作不用等到另外一个读操作完成之后在去读取.
因此Linux提供了一个rwsem ("reader/writer semaphore")来应对这种情况, 虽然rwsem在驱动开发中用的非常少, 我们还是简单介绍下它.
对于读来说, rwsem允许多个读操作同时访问共享资源.
对于写来说, rwsem只允许一个写操作访问共享资源.
读写是互斥的, 而且写操作具有优先权, 如果写操作和读操作同时想访问共享资源, 写操作优先. 而且假设某一时刻共享资源正在被一个写操作访问, 同时有多个读操作和多个写操作都在等待访问该共享资源, 那么读操作要等到所有的写操作完毕之后才能访问.
因此, rwsem适用于写操作很少, 而且每次写操作的耗时很短的情况.
头文件: include/linux/rwsem.h
结构体: struct rw_semaphore
就不介绍它的细节了, 我们可以用 struct rw_semaphore rwsem;来定义一个信号量
头文件: include/linux/rwsem.h
实现文件: kernel/locking/rwsem.c
Comment |
|
void init_rwsem(struct rw_semaphore *sem); |
当你手动定义了一个信号量之后, 例如struct rw_semaphore sem; 可以用该API初始化sem. |
DECLARE_RWSEM(name) |
这是一个宏, 用于定义一个信号量并初始化. |
void down_read(struct rw_semaphore *sem); |
A call to down_read provides read-only access to the protected resources, possibly concurrently with other readers 可能导致进程休眠, 休眠的进程只能被事件唤醒 |
int down_read_trylock(struct rw_semaphore *sem); |
尝试获取只读权限 不会导致进程休眠 如果权限获取失败(一般代表有写操作正在访问共享资源), 该API返回0 |
void up_read(struct rw_semaphore *sem); |
调了down_read, 或者down_read_trylock返回值为1, 则必须对应的调用up_read |
void down_write(struct rw_semaphore *sem); |
获取对共享资源的写权限 可能会导致进程休眠, 休眠的进程只能被事件唤醒 |
int down_write_trylock(struct rw_semaphore *sem); |
尝试获取写权限 不会导致进程休眠 如果权限获取成功, 该API返回1 如果权限获取失败(一般代表有其它写操作正在访问共享资源), 该API返回0 |
void up_write(struct rw_semaphore *sem); |
释放写权限
|
void downgrade_write(struct rw_semaphore *sem); |
暂时不清, 可在内核中查找其源码分析 |
它所保护的代码段中能否进入休眠: 不可以
spinlocks也叫自旋锁, 是Linux提供的用于解决竞态问题的另一个机制.
spinlocks与semaphore最大的不同在于spinlock不会导致进程休眠. 它属于忙等, 当锁不可用时, 相当于一个while循环, 不停的查询锁是否可用了, 这种忙等其实算是一种CPU资源的浪费.
spinlock的核心概念比较简单, 一个spinlock只有两种状态: “locked” 和 “unlocked”. 一般用一个整数中的1个bit来代表这个状态. 当需要锁住一个锁时, 就set bit; 释放锁的时候就clear bit; 等待锁的时候就在while循环里面不停的test bit.
当然Linux内核spinlock的实现机制比上述的要复杂多了, spinlock的底层实现与体系架构有关, 不同的体系架构上实现不一样.
而且对于单处理器(UP)和多处理器(SMP), spinlock的实现机制也不一样. 后面会详述.
头文件: include/linux/spinlock.h
实际的类型定义在include/linux/spinlock_types.h, spinlock.h会自动include此头文件.
一般用spinlock_t lock; 来定义一个自旋锁.
typedef struct spinlock {
union {
struct raw_spinlock rlock;
#ifdef CONFIG_DEBUG_LOCK_ALLOC
# define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map))
struct {
u8 __padding[LOCK_PADSIZE];
struct lockdep_map dep_map;
};
#endif
};
} spinlock_t;
spinlock_t中最主要的元素是struct raw_spinlock rlock.
raw_spinlock在不同的体系架构上实现都不一样, 这里就不详述了, 有兴趣可以自行阅读代码.
头文件: include/linux/spinlock.h
实现文件: kernel/locking/spinlock.c
对于每个API都有详细的原理说明, 阅读这些说明, 可以了解到使用自旋锁时的注意事项.
本节没有太多好说的, 写在这里只是为了文档的对称性.
我们在介绍Semaphores时列举了使用信号量的注意事项, 这些注意事项在自旋锁中同样适用. 针对自旋锁的注意事项, 在《spinlock API》一节中都做了详细说明.
是否会引起休眠: 否
它所保护的代码段中能否进入休眠: 不可以
read/write spinlock存在的意义与read/write semaphores一样, 都是为了提高多个并发读操作的性能. 它一样允许同一时刻有多个读操作, 但是只能有1个写操作.
头文件: include/linux/spinlock.h
实际的类型定义是在include/linux/rwlock_types.h; spinlock.h会自动包含头文件spinlock_types.h, spinlock_types.h中会自动包含rwlock_types.h
一般用rwlock_t lock; 来定义一个读写自旋锁.
typedef struct {
arch_rwlock_t raw_lock;
#ifdef CONFIG_GENERIC_LOCKBREAK
unsigned int break_lock;
#endif
#ifdef CONFIG_DEBUG_SPINLOCK
unsigned int magic, owner_cpu;
void *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
} rwlock_t;
头文件: include/linux/spinlock.h
实际的头文件是include/linux/rwlock.h; spinlock.h中会自动包含此头文件
实现文件: kernel/locking/spinlock.c
RW spinlocks完全遵循spinlock中的一些原则, 比如忙等, 不会导致进程休眠, 在不同的体现结构中有不同的实现机制等, 这里就不再赘述了, 可查看《spinlocks》一节
rw spinlock API |
Comment |
rwlock_init |
这是一个宏, 如果你已经定义了一个rw spinlock, 例如: rwlock_t rwlock; 则可以用此宏进行初始化 |
DEFINE_RWLOCK (name) |
这是一个宏, 用于定义一个rwlock并初始化. |
void read_lock(rwlock_t *lock); |
获取读权限锁 |
void read_unlock(rwlock_t *lock); |
释放读权限锁 |
void read_lock_irq(rwlock_t *lock); |
获取读权限锁, 会禁止中断 |
void read_unlock_irq(rwlock_t *lock); |
释放读权限锁 |
void read_lock_irqsave(rwlock_t *lock, unsigned long flags); |
获取读权限锁, 会禁止中断并保存中断状态 |
void read_unlock_irqrestore(rwlock_t *lock, unsigned long flags); |
释放读权限锁, 恢复中断状态 |
void read_lock_bh(rwlock_t *lock); |
获取读权限锁, 会禁止软件中断 |
void read_unlock_bh(rwlock_t *lock); |
释放读权限锁 |
|
|
void write_lock(rwlock_t *lock); |
获取写权限锁 |
void write_unlock(rwlock_t *lock); |
|
void write_lock_irq(rwlock_t *lock); |
获取写权限锁, 禁止中断 |
void write_unlock_irq(rwlock_t *lock); |
|
void write_lock_irqsave(rwlock_t *lock, unsigned long flags); |
获取写权限锁, 禁止中断并保存中断状态 |
void write_unlock_irqrestore(rwlock_t *lock, unsigned long flags); |
|
void write_lock_bh(rwlock_t *lock); |
获取写权限锁, 禁止软件中断 |
void write_unlock_bh(rwlock_t *lock); |
|
|
|
int write_trylock(rwlock_t *lock); |
尝试获取写权限锁 如果获取成功, 返回非0值 如果获取失败, 返回0 注意没有trylock的read版本哦 |
是否会引起休眠: 否
它所保护的代码段中能否进入休眠: 不可以
前面介绍过一些Read/Write锁, 这些锁的特点是提高并发读操作的性能, 即允许多个读操作, 但是只允许一个写操作, 一旦有写操作, 就不允许读操作了.
Seqlocks的特点是允许多个读操, 但是只允许一个写操作, 而且如果有写操作, 读操作也可以进行.
想象这样一种情况, 有个进程需要读取一个变量, 然后用这个变量进行一系列计算, 最后得到计算结果. 如果在计算的过程中, 这个变量被另外一个进程修改了, 那么前一个进程得到的计算结果就不是我们想要的了.
Seqlocks可以解决这个问题: 它的思路是用一个uint表示共享资源被修改的次数, 在开始进行计算之前, 先获取一次这个计数, 计算完毕之后, 在次获取这个计数, 如果计数值发生变化了, 说明在计算过程中, 共享资源被其他进程修改了, 则重新进行一次计算. 代码示例如下:
unsigned int seq;
do {
seq = read_seqbegin(&the_lock);
/* Do what you need to do */
} while read_seqretry(&the_lock, seq);
从上述描述中可以看出, Seqlocks一般用于保护共享资源比较简单(例如一个变量), 经常会读取, 很少会修改(但是如果要修改, 修改动作非常快)的场景.
注意seqlocks保护的共享资源中不可包含指针, 因为有可能写操作正在修改指针, 导致读操作读到错误的地址.
一般用seqlock_t lock来定义一个seqlock. 细节就不看了, 有兴趣可以自行阅读代码
头文件: include/linux/seqlock.h
实现文件: include/linux/seqlock.h , 没有c文件, 都是在.h里面定义的inline函数.
Comment |
|
seqlock_init |
这是一个宏, 如果你已经定义了一个seqlock, 例如: seqlock_t lock; 则可以用此宏进行初始化 |
DEFINE_SEQLOCK (name) |
这是一个宏, 用于定义一个seqlock并初始化. |
unsigned read_seqbegin(const seqlock_t *sl) |
获取共享资源的修改计数 返回值是此时此刻的计数值 |
unsigned read_seqretry(const seqlock_t *sl, unsigned start) |
比较start所代表的计数值与此时此刻的计数值是否一样. 返回0表示两者相等 返回1表示两者不相等 此API与上一API配合使用, 返回值为1, 则代表读操作要重新获取共享资源(retry) |
unsigned int read_seqbegin_irqsave(seqlock_t *lock, unsigned long flags) |
中断处理函数中, 应该使用这个版本 |
int read_seqretry_irqrestore(seqlock_t *lock, unsigned int seq, unsigned long flags) |
中断处理函数中, 应该使用这个版本 |
|
|
void write_seqlock(seqlock_t *sl) |
seqlock要保证write的互斥性, 同一时刻只能有一个write操作. 所以write这块直接用的spinlock机制, spinlock的所有特性和限制条件, 这里都满足 获取锁, 并开始修改共享资源, 修改完毕之后, 会把修改计数加1 |
void write_sequnlock(seqlock_t *lock) |
释放锁 |
void write_seqlock_irq(seqlock_t *lock) |
获取锁, 并禁止中断 |
void write_sequnlock_irq(seqlock_t *lock) |
释放锁, 并使能中断 |
void write_seqlock_irqsave(seqlock_t *lock, unsigned long flags) |
获取锁, 保存中断状态并禁止中断 |
void write_sequnlock_irqrestore(seqlock_t *lock, unsigned long flags) |
释放锁, 并恢复中断状态 |
void write_seqlock_bh(seqlock_t *lock) |
获取锁, 并禁止软件中断 |
void write_sequnlock_bh(seqlock_t *lock) |
释放锁, 并使能软件中断 |
RCU(Read-Copy-Update), 它的实现机制比较复杂, 这里不详细分析了, 详见官方网站: http://www.rdrop.com/users/paulmck/rclock/intro/rclock_intro.html
内核中实现的头文件: include/linux/rcupdate.h
? 锁机制应该在设计之初就考虑清楚, 如果你的设计中会出现共享资源, 那么在设计之初就要考虑使用什么锁来保护这个共享资源以及如何使用该锁. 如果你在前面偷懒了, 那在后期调试阶段必定会付出N倍的调试时间.
? 可能在你的代码中, 很多函数都会使用同一个锁, 务必注意一定不能在其中一个函数里调用另外一个函数, 否则就会引起死锁. 更普遍的, 当你的代码已经锁住了一个锁, 在释放该锁之前, 不能调用其他尝试获得该锁的代码, 否则就会死锁.
? 有些情况下, 你可能会设计一个函数来访问共享资源, 在这个函数里面不会加任何锁, 而是假定调用者已经获得锁了. 在这种情况下, 请务必记得加上注释说明, 因为一段时间之后, 你很难记得调用该函数时是否应该先获得锁…; 内核中就有很多这样的函数, 而且它们都有清晰的注释
? 有一些情况下, 你的临界区代码会访问两个共享资源, 这2个资源被不同的锁保护着, 这种情况下, 你必须同时获得这2个锁之后, 才能运行临界区的代码.
想象这样一种情形, 假设有两个锁LOCK1, LOCK2, 你要同时获得这两个锁才能访问临界区: 如果一个进程锁住了LOCK1, 而另外一个进程锁住了LOCK2, 那么这两个进程都没法获得另一个锁, 这两个进程都会死锁.
解决这个问题的办法也很简单, 当代码需要获取多个锁时, 使用同样的顺序依次获得各个锁. 也就是不要在一个地方先获取LOCK1, 再获取LOCK2, 但是在另一个地方却先获取LOCK2, 再获取LOCK1.
? 如果需要同时获得你自己定义的锁, 也要获得内核系统定义的锁, 先尝试获取你自己的锁
? 如果需要同时使用semaphore和spinlock, 先尝试获取信号量, 原因也很简单, spinlock保护的临界区不允许休眠, 而semaphore可能导致休眠
锁的粒度可分为粗粒度锁和细粒度锁.
粗粒度锁使用起来更加方便, 逻辑上也更加简单, 可以减少死锁情况的发生, 比如某个函数里面用到了共享资源, 我们直接在进入函数的第一句话加上一个锁, 退出时释放这个锁.
但是粗粒度的锁可能会影响性能, 相反的细粒度锁对提升性能有帮助, 但是使用起来逻辑上会更加复杂.
用的不好, 锁可能会引起各种bug, 而且非常难调试, 因此在设计之初, 我们应该优先考虑使用粗粒度的锁, 而不要过早的考虑性能问题, 因为到最后, 主要的性能问题可能存在某个别的地方.
如果你怀疑是锁影响了性能, 可以使用工具lockmeter来检查, 当然使用这个工具需要在内核里面添加一个补丁用于测量锁所消耗的时间(available at http://oss.sgi.com/projects/lockmeter/).
解决互斥问题, 可以用我们前面提到的锁机制. 但是锁机制本身来讲不太好把握, 稍有不慎就可能引起死锁, 而且锁机制或多或少会有一点性能影响.
内核提供了一些不用上锁的机制来解决互斥问题.
环形缓冲区适用于生产者/消费者问题, 一方往缓冲区里面写入数据, 另外一方从缓冲区读取数据.
Linux内核实现了环形缓冲区机制(kfifo), 我们在《编程基础》一文中详细分析了环形缓冲区的实现机制.
kfifo的实现非常巧妙, 巧妙之处已在《编程基础》一文中介绍过了.
需要注意的是:
? kfifo允许一个读操作和一个写操作同时进行, 不需要任何锁机制. 即读/写操作不需要互斥.
? 如果有多个读操作, 则调用者自己需要加锁来互斥多个读操作.
? 如果有多个写操作, 则调用者自己也需要加锁来互斥多个写操作.
有时候, 我们必须要用一个全局变量来做为共享资源, 例如定义一个uint n_op来代表设备驱动被多少个进程同时打开. 每当一个新进程打开本驱动时, 则n_op++.
注意, 即使是n_op++这么条简单的语句, 都可能导致竞态问题. 有的处理器可以做到原子自增, 但是你不能指望它, 不是所有的处理器都支持这个功能. 如果用一个锁机制来保护这个变量, 又好像太过了.
所以内核提供了一个叫原子整数类型的东东.
头文件: include/asm/atomic.h
变量类型: atomic_t
atomic_t实际上就是一个int型变量, 注意这个变量最多只能用24 bits, 而不是32 bits哦.
API:
Comment |
|
void atomic_set(atomic_t *v, int i) |
当你用atomic_t定义了一个变量, 可以用此API设置初值 |
ATOMIC_INIT(0) |
可以这样: atomic_t v = ATOMIC_INIT(0) |
int atomic_read(atomic_t *v) |
获取v的值 |
void atomic_add(int i, atomic_t *v) |
v + i 注意返回值是void, 意味着这个API不会反馈给你计算结果. 其实很多时候我们都不需要知道计算结果. |
void atomic_sub(int i, atomic_t *v) |
v - i 返回值是void |
void atomic_inc(atomic_t *v) |
v++ |
void atomic_dec(atomic_t *v) |
v-- 返回值是void |
int atomic_inc_and_test(atomic_t *v) |
v++ , 然后测试++后的值 如果值为0, 则返回true 如果值不为0, 返回false |
int atomic_dec_and_test(atomic_t *v) |
v-- , 然后测试--后的值 如果值不为0, 返回false 注意没有atomic_add_and_test这个API哦 |
int atomic_sub_and_test(int i, atomic_t *v) |
v + i , 然后测试+i后的值 如果值不为0, 返回false |
int atomic_add_negative(int i, atomic_t *v) |
v + i, 然后测试+i后的值 如果值为负数, 则返回true 如果值不为负数, 返回false |
int atomic_add_return(int i, atomic_t *v) |
|
int atomic_sub_return(int i, atomic_t *v) |
v - i, 并返回-i之后的结果 |
int atomic_inc_return(atomic_t *v) |
v++, 并返回++之后的结果 |
int atomic_dec_return(atomic_t *v) |
v--, 并返回--之后的结果 |
注意事项:
? 原子变量只能通过上面的API操作, 而且你也不能把一个原子变量赋值给一个整型变量, 否则会编译报错
? 另外需要注意的一个问题是, 上述API中, 例如atomic_sub(int i , atomic_t *v), 这里的i最好是常量. 假设i是一个变量amount, 而这个amount又恰巧是个共享资源, 如下:
atomic_sub(amount, &first_atomic);
atomic_add(amount, &second_atomic);
有可能在执行第一条语句是amount是一个值, 但是在执行第二条语句时amount变成另外一个值了, 这种情况下, 应该用一个锁来保护上面两条代码, 注意保护的是共享资源amount, 而不是原子变量!
atomic_t可以很好的解决原子整数问题.
但是对于位操作, 我们该怎么办?
Linux内核提供了专门的机制用于原子位操作.
头文件: include/linux/bitops.h
原子位操作的一般形式如: void set_bit(nr, void *addr)
原子位操作不用禁止中断, 但可以保证操作的原子性. 其实现机制在不同的体系架构上不一样, 例如针对arm, 其真实的实现文件一般是arch/arm/include/asm/bitops.h.
数据类型nr在不同的体系架构中也不一样, 大多数地方是int, 少数处理器上是unsigned long
地址addr在不同的体系架构中也不一样, 大多数地方是一个指向unsigned long类型的指针, 少数处理器上是void *
API
Atomic API |
Comment |
void set_bit(nr, void *addr) |
|
void clear_bit(nr, void *addr) |
|
void change_bit(nr, void *addr) |
将addr所指向的数据的第nr位翻转 |
test_bit(nr, void *addr) |
This function is the only bit operation that doesn‘t need to be atomic; it simply returns the current value of the bit |
int test_and_set_bit(nr, void *addr) |
并返回操作之前, bit的状态 |
int test_and_clear_bit(nr, void *addr) |
将addr所指向的数据的第nr位清零 并返回操作之前, bit的状态 |
int test_and_change_bit(nr, void *addr) |
将addr所指向的数据的第nr位翻转 并返回操作之前, bit的状态 |
阻塞的处理思路就是, 当某个进程要阻塞时, 就在那里等着, 条件满足之后, 另外一个进程会通知它.
阻塞基本上都会导致被阻塞的进程进入休眠状态, 阅读本章之前, 建议先回想一下本文《休眠》一章.
内核了提供了如下的阻塞机制.
我们在《休眠》一章中详细介绍了什么是休眠以及进程休眠的本质是什么.
但是进程休眠之后如何唤醒呢? 这个话题在本章讨论最合适.
休眠唤醒的一般做法是:
保存当前进程的状态
让当前进程放弃CPU, 进入休眠
合适时候唤醒之前休眠的那个进程.
为什么要保存进程状态呢? 因为唤醒的时候需要知道该唤醒哪个进程. 一般的做法是把进程状态保存到一个链表头下, 唤醒的时候从这个链表头里面依次取出进程唤醒.
因此这里涉及到2个概念, 一是这个链表头, 二是链表头下的元素(也就是用什么数据结构保存进程状态).
wait就是内核提供的处理上述逻辑的机制, 下面我详细分析一下该机制
头文件: include/linux/wait.h
wait_queue_head_t : 就是我们上述的那个链表头, 一般我们可以通过wait_queue_head_t my_queue来定义这样一个链表头.
struct __wait_queue_head {
spinlock_t lock;
struct list_head task_list;
};
typedef struct __wait_queue_head wait_queue_head_t;
非常清楚, 也很简单. 一个spinlock保护一个list_head
wait_queue_t : 就是我们上述的用来保存进程状态的结构体.
typedef struct __wait_queue wait_queue_t;
struct __wait_queue {
unsigned int flags;
void *private;
wait_queue_func_t func;
struct list_head task_list;
};
? task_list用于把自己挂载到链表头下的
? void *private, 注意这个指针实际上会指向结构体struct task_struct, 而内核中就是用一个task_struct来描述一个进程的
头文件: include/linux/wait.h
实现文件: kernel/sched/wait.c
一般的, 调用wait_event就可以让一个进程进入阻塞状态, 等待condition变为true. 看上去很简单, 但其实很多细节, 内核API帮我们封装了这些细节, 直接调用就可以了. 但为了更深入内核, 我们有必要了解这些细节.
那么我们就来分析分析wait_event的实现机制.
wait_event
#define wait_event(wq, condition) \
do { \
if (condition) \
break; \
__wait_event(wq, condition); \
} while (0)
? 首先判断condition是否为true, 如果此时condition已经为true了, 那么就不需要wait了. 所以我们在调用此API, 不用事先判断condition状态, API内部会做判断
? 如果需要wait, 则调用__wait_event
__wait_event会直接调用___wait_event, 后者是wait机制的主要实现函数, 前面介绍的各种API都是对它的封装.
___wait_event
#define ___wait_event(wq, condition, state, exclusive, ret, cmd) \
({ \
__label__ __out; \
wait_queue_t __wait; \
long __ret = ret; /* explicit shadow */ \
\
INIT_LIST_HEAD(&__wait.task_list); \
if (exclusive) \
__wait.flags = WQ_FLAG_EXCLUSIVE; \
else \
__wait.flags = 0; \
\
for (;;) { \
long __int = prepare_to_wait_event(&wq, &__wait, state);\
\
if (condition) \
break; \
\
if (___wait_is_interruptible(state) && __int) { \
__ret = __int; \
if (exclusive) { \
abort_exclusive_wait(&wq, &__wait, \
state, NULL); \
goto __out; \
} \
break; \
} \
\
cmd; \
} \
finish_wait(&wq, &__wait); \
__out: __ret; \
})
? 首先定义一个wait_queue_t __wait, 用于保存当前进程的状态
? 如果是exclusive 休眠, 则设置__wait.flags位, 后面要用到此flags
? 紧接着是一个死循环
? 调用prepare_to_wait_event, 这个函数是一个重要函数, 我们会单独分析. 不过prepare阶段要做哪些事情呢? 你应该能猜到了, 大体上就是把当前进程的状态保存在wait_queue_t, 然后把wait_queue_t挂载到head下, 然后设置当前进程的进程状态(TASK_UNINTERRUPTIBLE/TASK_INTERRUPTIBLE)
注意prepare完毕之后, 只是改变了进程状态, 进程还没有换出CPU哦, 因为还未调用schedule
? 然后再次判断condition是否为true. 为什么要再次判断呢?
这里面有门道, 想象一下, 如果你的代码刚刚进入prepare_to_wait_event, 还没来得及把当前进程状态挂载到链表头下, 此时另外一个进程调用了wake_up, 由于你的进程还未挂载到链表头, 所以wake_up就找不到你, 也就相当于这次wake_up被丢失了.
然后prepare_to_wait_event继续执行, 把你的进程挂载到了链表头下. 如果前一次wake_up是唯一一次唤醒, 却被丢失了, 那么将不会有任何人再次唤醒你.
如果这里不判断condition, 直接让当前进程休眠的话, 那么你的进程就可能永远休眠.
所以, 重点在于, 在把当前进程状态挂载到链表头下之后, 一定要再次判断condition是否为true: 如果为true, 则不让进程休眠, 直接调用finish_wait, 如果不为true, 则当前进程就可以放心休眠了.
? 接下来的一个大的if (___wait_is_interruptible(state) && __int)语句暂时不去分析它, 后面再看会更好理解
? 然后就会执行cmd; cmd是这个宏的一个参数, 一般就是schedule()指令. 所以cmd代表让当前进程放弃CPU, 而且进程state在prepare_to_wait_event阶段已经被设置为TASK_UNINTERRUPTIBLE/TASK_INTERRUPTIBLE, 所以进程就会休眠.
注意, schedule之后, 当前进程休眠了, 你不知道进程什么时候会被唤醒, 也就意味着你永远不知道schedule调用何时会返回.
当进程被唤醒之后, 它就会接着schedule的下一条代码继续运行. 上述代码中, 进程被唤醒后, 又会重新执行for循环, 接下来我们分析一下再次执行for循环会发生什么事情
? 再次执行for循环 (意味着进程被唤醒了)
? 首先还是调用prepare_to_wait_event, 不过此时prepare_to_wait_event不会再次将进程状态挂载到链表头, 它最主要的目的是判断当前进程是不是被信号唤醒的(signal_pending, 本文之前介绍过), 如果是被信号唤醒的, 则返回-ERESTARTSYS, 否则返回0.
? 接着, 判断condition是否为true, 为什么这个时候要再次判断呢?
首先, 假设进程是被信号唤醒的, 但是恰巧此时condition有效了, 则当前进程不在休眠, 跳出for循环
其次, 假设进程不是被信号唤醒的, 那就是被事件唤醒的(一般意味着其他进程把condition设为true, 然后调用wake_up), 一次wake_up动作可能唤醒多个进程, 如果其他进程先被唤醒, 并且又把condition设为了false, 那么后续唤醒的进程就只好重新进入休眠.
这个if(condition)就能实现上述逻辑, 当它判断出condition为false时, 会重新调用schedule, 让进程休眠.
? 那么, if (___wait_is_interruptible(state) && __int)这段话是个什么意思呢?
假设当前进程是被信号唤醒的(即 __int不为0), 而且此时condition是false, 并且当前进程的休眠状态是TASK_INTERRUPTIBLE或TASK_KILLABLE, 则不应该让进程再次休眠, 而应该把__int返回给调用者, 让调用者决定是再次休眠还是做别的事情.
? OK, 上面就是___wait_event的逻辑, 里面涉及到两个重要函数prepare_to_wait_event 和 finish_wait, 我们单独分析.
long prepare_to_wait_event(wait_queue_head_t *q, wait_queue_t *wait, int state)
{
unsigned long flags;
if (signal_pending_state(state, current))
return -ERESTARTSYS;
wait->private = current;
wait->func = autoremove_wake_function;
spin_lock_irqsave(&q->lock, flags);
if (list_empty(&wait->task_list)) {
if (wait->flags & WQ_FLAG_EXCLUSIVE)
__add_wait_queue_tail(q, wait);
else
__add_wait_queue(q, wait);
}
set_current_state(state);
spin_unlock_irqrestore(&q->lock, flags);
return 0;
}
EXPORT_SYMBOL(prepare_to_wait_event);
prepare主要就是做休眠前的准备工作, 另外一个功能时唤醒后判断是否为信号唤醒
? signal_pending_state会调用signal_pending, 判断是否为信号唤醒
? 将当前进程状态保存在wait里面, wait的类型是wait_queue_t
? 把wait挂载到链表头q下面, 如果是exclusive类型的阻塞, 则把wait挂载到链表头尾端
? 设置当前进程的状态为state
finish_wait
void finish_wait(wait_queue_head_t *q, wait_queue_t *wait)
{
unsigned long flags;
__set_current_state(TASK_RUNNING);
/*
* We can check for list emptiness outside the lock
* IFF:
* - we use the "careful" check that verifies both
* the next and prev pointers, so that there cannot
* be any half-pending updates in progress on other
* CPU‘s that we haven‘t seen yet (and that might
* still change the stack area.
* and
* - all other users take the lock (ie we can only
* have _one_ other CPU that looks at or modifies
* the list).
*/
if (!list_empty_careful(&wait->task_list)) {
spin_lock_irqsave(&q->lock, flags);
list_del_init(&wait->task_list);
spin_unlock_irqrestore(&q->lock, flags);
}
}
EXPORT_SYMBOL(finish_wait);
finish_wait相当于prepare_to_wait_event的反函数, 主要是进程被唤醒后做一些清理动作
? 设置当前进程的running state为TASK_RUNNING, 以便进程重新进入处理器调度队列
? 将之前保存的进程状态从链表头中移除
了解了wait机制, 在来看completions机制就非常简单了.
wait机制是用于等待condition, 直到其为true.
completions机制是对wait的封装, 它类似于通知机制, 进程A调用completion阻塞, 进程B在合适的时候唤醒A, 不用去关心condition.
头文件: include/linux/completion.h
一般用struct completion my_completion;来定义一个completion
struct completion {
unsigned int done;
wait_queue_head_t wait;
};
? 为什么completion不用管所谓的condition了呢, 是因为completion内部处理了这个细节, unsigned int done就是所谓的condition
? completion其实就是个链表头, 作用与wait_queue_head_t一样
头文件: include/linux/completion.h
实现文件: kernel/sched/ completion.c
completion API |
Comment |
inline void init_completion(struct completion *x) |
内联函数, 当你用struct completion my_completion 定义了一个completion之后, 可以用此API进行初始化 |
DECLARE_COMPLETION (name) |
|
void wait_for_completion(struct completion *) |
等待completion 该API会让当前进程进入TASK_UNINTERRUPTIBLE的休眠状态 当前进程的状态保存在completion->wait链表头中 没有返回值 |
int wait_for_completion_interruptible(struct completion *x) |
与上类似 不同的是会让进程进入TASK_INTERRUPTIBLE的休眠状态 而且此API有返回值 返回-ERESTARTSYS代表进程是被信号唤醒的 返回0代表进程是正常唤醒的, 已经completed |
int wait_for_completion_killable(struct completion *x) |
与上类似 不同的是会让进程进入TASK_KILLABLE的休眠状态 而且此API有返回值 返回-ERESTARTSYS代表进程是被KILL信号唤醒的 返回0代表进程是正常唤醒的, 已经completed |
unsigned long wait_for_completion_timeout(struct completion *x, unsigned long timeout) |
与上类似 不同的是会设置超时时间timeout (用jiffies表示) 会让进程进入TASK_UNINTERRUPTIBLE的休眠状态 有返回值 返回0代表等待超时 返回 >=1代表未超时且completed, 具体的数值的意义是: 当completed时, remaining jiffies before timeout |
long wait_for_completion_interruptible_timeout(struct completion *x, unsigned long timeout) |
让进程进入TASK_INTERRUPTIBLE的休眠状态并设置超时时间 有返回值 返回-ERESTARTSYS代表进程是被信号唤醒的 返回0代表等待超时 返回>=1代表未超时且completed, 具体的数值的意义是: 当completed时, remaining jiffies before timeout |
long wait_for_completion_killable_timeout(struct completion *x, unsigned long timeout) |
与上类似 让进程进入TASK_KILLABLE的休眠状态并设置超时时间 有返回值 返回-ERESTARTSYS代表进程是被信号唤醒的 返回0代表等待超时 返回>=1代表未超时且completed, 具体的数值的意义是: 当completed时, remaining jiffies before timeout |
|
|
void complete(struct completion *) |
没有返回值 假如有多个进程都在等待这个completion, 则唤醒第一个 |
void complete_all(struct completion *) |
没有返回值 假如有多个进程都在等待这个completion, 则唤醒所有 |
|
|
bool completion_done(struct completion *x) |
判断是否有进程在等待这个completion 返回0代表有进程在等待 返回1代表没有进程在等待 |
总结也没太多好说的, 该说的细节前面都介绍过了.
此文档可作为查阅文档, 哪天你在设计驱动的时候, 需要用到锁机制或者阻塞机制, 可以查阅文档中的API章节, 看看它们是怎么用的.
Add @ 2019.3.28
在内核空间, 原子上下文实质上就是指禁止内核抢占. 根据《进程调度》中的介绍, 当禁止了内核抢占后, 内核代码在执行过程中就不会发生进程切换. 即使进程的时间片用完了, 它也不会马上被换出CPU, 而只是设置下need_resched标记, 然后在调度时机到来(由于内核抢占被禁止, 因此调度时机就只剩一个, 即从内核空间返回用户空间时)时才会进行进程切换.
不过虽然内核抢占被禁止, 中断还没禁, 因此原子上下文是有可能被中断打断的. 如果想把内核抢占与中断都禁止, 可以使用spin_lock_irq这种.
内核的一个基本原则就是:在中断或者说原子上下文中,内核不能访问用户空间,而且内核是不能睡眠的。也就是说在这种情况下,内核是不能调用有可能引起睡眠的任何函数。因为睡眠会引发主动的进程切换, 一旦有进程切换, 就不能称做‘原子’了. 更一步来说, 原子上下中不能调用任何可能导致主动进程切换的代码. (额, 这段话是个人理解, 正确性有待后面考证, 先这样理解把).
原文:https://www.cnblogs.com/jliuxin/p/14129348.html