Linux内核的并发控制
现代Linux系统中存在大量的并发来源,导致可能的竞态,竞态通常作为对资源的共享访问结果而产生。访问管理的常见技术称为“锁定”或者“互斥”——确保一次只有一个执行单元可操作共享资源。在Linux内核中,主要的竞态发生在如下几种情况:对称多处理器(SMP)的多个CPU;单CPU内进程与抢占它的进程;中断(硬中断、软中断、Tasklet、底半部)与进程之间。
另外有两种可能的原因造成程序出错,一种可能性是编译乱序,另一种可能性是执行乱序。现代的高性能编译器在目标代码优化上都具备对指令进行乱序优化的能力,编译器对访存的指令进行乱序,减少逻辑上不必要的访存,以及尽量提高Cache命中率和CPU的Load/Store单元的工作效率。解决编译乱序问题,通过barrier()编译屏障,#define barrier() _asm__volatile_("":::"memory")。高级的CPU可以根据自己缓存的特性,将访存指令重新排序执行,解决乱序执行,需要内存屏障指令,在Linux内核中,定义了读写屏障mb(),读屏障rmb(),写屏障wmb(),以及作用寄存器读写_iormb()、_iowmb()的屏障。
CPU一般都具备屏蔽中断和打开中断的能力,保证正在执行的内核执行路径不被中断处理程序抢占,防止竞态条件的发生。中断屏蔽的使用方法:local_irq_disable(),local_irq_enable(),禁止中断的底半部:local_bh_disable(),local_bh_enable();
1,信号量
头文件<asm/semaphore.h>,类型struct semaphore.
void sema_init(struct semaphore *sem, int val)
静态互斥模式的信号量的声明及初始化:
DECLARE_MUTEX(name) 声明一个name的信号量,初始化为1
DECLARE_MUTEX_LOCKED(name) 声明一个name的信号量,初始化为0
动态分配互斥体:
void init_MUTEX(struct semaphore *sem);
void init_MUTEX_LOCKED(struct semaphore *sem)
在Linux的世界中,P函数被称为down或这个名字的其他变种,down指的是该函数减少了信号量的值,也许会将调用者置于休眠状态,然后等待信号量变得可用,之后授予调用者对被保护的资源的访问。
void down(struct semaphore *sem) 减小信号量的值,必要时一直等待,不可中断,不可杀进程。
int down_interruptible(struct semaphore *sem) 可用户中断,返回非零值,调用者不拥有信号量
int down_trylock(struct semaphore *sem) 永远不会休眠,不可获得时立即返回非零值。
在Linux中等价于V的函数是up
void up(struct semaphore *sem)
2,读写信号量
信号量对所有的调用者执行互斥,而不管每个线程到底想做什么。许多任务可以划分两种不同的工作类型,一些任务只需要读取受保护的数据结构,而其他的必须修改。允许多个并发的读取者是可能的,只读任务并行完成工作,不需要等待其他读取者退出临界区,可以大大提高性能。
头文件<linux/rwsem.h> 类型 struct rw_semaphore,一个rwsem对象必须在运行时通过下面的函数显式地初始化:
void init_rwsem(struct rw_semaphore *sem)
只读访问:
void down_read(struct rw_semaphore *sem)
int down_read_trylock(struct rw_semaphore *sem)
void up_read(struct rw_semaphore *sem)
写入者访问:
void down_write(struct rw_semaphore *sem)
int dow_write_trylock(struct rw_semaphore *sem)
void up_write(struct rw_semaphore *sem)
void downgrade_write(struct rw_semaphore *sem)
一个rwsem可允许一个写入者或无限多个读取者拥有该信号量,写入者拥有更高的优先级,当某个写入者试图进入临界区时,在所有写入者完成其工作之前,不会允许读取者获得访问,如果有大量的写入者竞争该信号量,则这种实现会导致读取者“饿死”。
3,完成量
completion是一种轻量级的机制,它允许一个线程告诉另一个线程某个工作已经完成。
头文件<linux/completion.h> 类型 struct completion
静态声明:
DECLARE_COMPLETION(my_completion)
动态创建和初始化:
struct completion my_completion
init_completion(&my_completion)
等待completion:
void wait_for_completion(struct completion *c) 不可中断的等待,产生一个不杀的进程。
触发完成事件:
void complete(struct completion *c) 唤醒一个等待线程
void complete_all(struct completion *c) 唤醒所有等待线程
一个completion通知是一个单次(one-shot)设备,使用一次后被丢弃。如果没有使用complete_all,则我们可以重复使用一个completion结构,只要那个将要触发的事件是明确而不含糊的,就不会带来任何问题。如果使用了complete_all,则必须在重复使用之前重新初始化它:
INIT_COMPLETION(struct completion c)
4,自旋锁
内核中大多数锁定是通过称为“自旋锁(spinlock)”的机制实现。自旋锁可在不能休眠的代码中使用,比如中断处理程序。在正确使用的情况下,自旋锁通常可以提供比信号量更高的性能。在概念上,自旋锁非常简单,一个自旋锁是一个互斥设备,它只能有两个值:“锁定”和“解锁”。它通常实现为某个整数值的单个位。希望获得特定锁的代码测试相关的位,如果锁可用,则“锁定”位被设置,代码继续进入临界区,相反,如果锁被其他人获得,则代码进入忙循环并重复检查这个锁,直到该锁可用为此。这个循环就是自旋锁的“自旋”部分。
头文件<linux/spinlock.h> 类型spinlock_t
静态初始化:
spinlock_t my_lock = SPIN_LOCK_UNLOCKED;
运行时初始化:
void spin_lock_init(spinlock_t *lock)
获取锁:
void spin_lock(spinlock_t *lock) 所有的自旋锁本质上是不可中断的,一旦调用spin_lock,在获取锁之前将一直处于自旋状态。
void spin_lock_irqsave(spinlock_t *lock, unsigned long flags);获取自旋锁前禁止本地处理器上的中断,将中断状态保存在flags变量中。
void spin_lock_bh(spinlock_t *lock) 在获取锁之前禁止软件中断,让硬件中断保持打开。
释放锁:
void spin_unlock(spinlock_t *lock)
void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags)
void spin_unlock_irq(spinlock_t *lock)
void spin_unlock_bh(spinlock_t *lock)
非阻塞的操作:
int spin_trylock(spinlock_t *lock)
int spin_trylock_bh(spinlock_t *lock)
注意:任何拥有自旋锁的代码必须是原子的,不可休眠的。禁止中断!禁止抢占!拥有锁的时间越短越好!
5,读取者/写入者自旋锁
头文件<linux/spinlock.h> 类型 rwlock_t
声明并初始化:
rwlock_t my_lock = RW_LOCK_UNLOCKED;
rwlock_t my_rwlock;
rwlock_init(&my_rwlock);
读取者相关操作:
void read_lock(rwlock_t *lock)
void read_lock_irqsave(rwlock_t *lock, unsigned long flags)
void read_lock_irq(rwlock_t *lock)
void read_lock_bh(rwlock_t *lock)
void read_unlock(rwlock_t *lock)
void read_unlock_irqrestore(rwlock_t *lock, unsigned long flags)
void read_unlock_irq(rwlock_t *lock)
void read_unlock_bh(rwlock_t *lock)
写入者相关操作:
void write_lock(rwlock_t *lock)
void write_lock_irqsave(rwlock_t *lock, unsigned long flags)
void write_lock_irq(rwlock_t *lock)
void write_lock_bh(rwlock_t *lock)
void write_unlock(rwlock_t *lock)
void write_unlock_irqrestore(rwlock_t *lock, unsigned long flags)
void write_unlock_irq(rwlock_t *lock)
void write_unlock_bh(rwlock_t *lock)
6,原子变量
头文件<asm/atomic.h> 类型atomic_t,不能记录大于24位的整数。
void atomic_set(atomic_t *v, int i)
atomic_t v = ATOMIC_INIT(0);
int atomic_read(atomic_t *v)
void atomic_add(int i, atomic_t *v)
void atomic_sub(init i, atomic_t *v)
void atomic_dec(atomic_t *v)
void atomic_inc(atomic_t *v)
执行操作并测试,如果操作结束后,原子值为0,返回值为true,否则返回false
int atomic_inc_and_test(atomic_t *v);
int atomic_dec_and_test(atomic_t *v);
int atomic_sub_and_test(atomic_t *v);
将整数i加到v,返回值在结果为负时为true,否则false
int atomic_add_negative(int i, atomic_t *v)
执行操作并将新值返回给调用者
int atomic_add_return(int i, atomic_t *v)
int atomic_sub_return(int i, atomic_t *v)
int atomic_inc_return(atomic_t *v)
int atomic_dec_return(atomic_t *v)
7,位操作
nr参数用来描述要操作的位,通常定义为int或unsigned long,addr通常指向unsigned long的指针,但在某些架构上却使用void *来代替。
可用的位操作:
void set_bit(nr, void* addr)
void clear_bit(nr, void* addr)
void change_bit(nr, void* addr)
test_bit(nr, void* addr)返回指定位当前的值
int test_and_set_bit(nr, void* addr) 返回先前值
int test_and_clear_bit(nr, void* addr)
int test_and_change_bit(nr, void* addr)
8,顺序锁
提供对资源的快速、免锁访问。当要保护的资源很小,很简单,会频繁被访问而且写入访问很少发生且必须快速时,可以使用seqlock。seqlock允许读取者对资源的自由访问,但需要读取者检查是否和写入者发生冲突,当这种冲突发生时,就需要重试对资源的访问,seqlock通常不能用于保护包含指针的数据结构,因为写入者在修改数据结构的同时,读取者可能会追随一个无效的指针。
头文件<linux/seqlock.h>类型seqlock_t
seqlock_t lock = SEQLOCK_UNLOCKED;
seqlock_t lock1;
seqlock_init(&lock1);
读取访问通过获得一个(无符号的)整数顺序值而进入临界区,在退出时,该顺序值会和当前值比较,如果不相等,则必须重试读取访问,其结果是读取者代码会如下编写:
unsigned int seq;
do{
seq = read_seqbegin(&the_lock);
/*完成需要做的工作*/
}while read_reqretry(&the_lock, seq);
在中断处理程序中使用IRQ安全版本的seqlock:
unsigned int read_beginirqsave(seqlock_t *lock, unsigned long flags);
int read_seqretry_irqrestore(seqlock_t *lock, unsigned int seq, unsigned long flags);
写入者必须在进入由seqlock保护的临界区时获取一个互斥锁:
void write_seqlock(seqlock_t *lock)
写入者使用自旋锁实现,因此自旋锁的限制也适用于写入者顺序锁。
释放锁:
void write_sequnlock(seqlock_t *lock)
常见的变种:
void write_seqlock_irqsave(seqlock_t *lock, unsigned long flags)
void write_seqlock_irq(seqlock_t *lock)
void write_seqlock_bh(seqlock_t *lock)
void write_sequnlock_irqrestore(seqlock_t *lock, unsigned long flags)
void write_sequnlock_irq(seqlock_t *lock)
void write_sequnlock_bh(seqlock_t *lock)
9,RCU
读取-复制-更新是一种高级的互斥机制。它针对经常发生读取而很少写入的情形做了优化。被保护的资源应该通过指针来访问,而对这些资源的引用必须仅由原子代码拥有。在需要修改数据时,写入时线程首先复制,然后修改副本,之后用新的版本替代相关的指针,这也是算法名称的由来,当内核确信老版本上没有其他引用时,就可释放老的版本。
头文件<linux/rcupdate.h>在读取端代码在使用受RCU保护的数据结构时,必须将引用的数据结构的代码在rcu_read_lock和rcu_read_unlock调用之间。
struct my_stuff *stuff;
rcu_read_lock();
stuff = find_the_stuff(args...);
do_something_with(stuff);
rcu_read_unlock();
rcu_read_lock调用非常快,它会禁止内核抢占,但不会等待任何东西,用来检验读取锁的代码必须是原子的,在调用rcu_read_unlock之后就不应该存在对受保护结构的任何引用。用来修改受保护结构的代码必须在一个步骤中完成,分配一个新的结构,如果必要从老的结构中复制数据,然后将读取代码看到的指针替换掉。剩下的工作就是释放老的数据结构,一旦每个处理器至少调度一次,所有的引用都会消失,因此RCU所做的就是设置一个回调函数并等待所有的处理器被调度,之后由回调函数执行清除工作。
修改受RCU保护的数据结构的代码必须分配一个struct rcu_head结构来获得清除用的回调函数,但并不需要用什么方式初始化这个结构,这个结构内嵌在由RCU保护的大资源中,在修改完资源后,应该做如下的调用,void call_rcu(struct rcu_head *head, void (*func)(void *arg), void *arg);在可安全释放该资源时,给定的func会被调用,通知func要做的唯一工作就是调用kfree使用旧版本的结构。