Linux内核学习笔记(二)
x86平台中,一个进程调用C库函数读取文件内容,其经历的处理步骤可能如下:
- 在用户空间,进程调用fread()库函数
- 库函数触发read()系统调用
- 0x80软中断被触发,CR0寄存器的PE位被置0,进入内核态
- read()系统调用对VFS层发起调用file_operations.read(),VFS则调用实际的文件系统的函数实现
- 文件系统向块I/O层发起请求,当前进程在内核中睡眠
- I/O调度程序向硬盘发送悬挂的请求
- 硬盘寻道,读取数据完毕,触发硬件中断,内核在中断上下文中处理
- 中断处理程序拷贝数据到指定的缓冲区,并唤醒等待队列上的进程
- 时钟中断发生,进程被调度,从内核态醒来
- 系统调用返回,CR0寄存器PE位被置3,进入用户态
- fread()函数返回
Intel 的 x86 架构的 CPU 提供了 0 到 3 四个特权级,数字越小,特权越高,Linux 操作系统中主要采用了 0 和 3 两个特权级,分别对应的就是内核态和用户态。
运行于用户态的进程可以执行的操作和访问的资源都会受到极大的限制,而运行在内核态的进程则可以执行任何操作并且在资源的使用上没有限制。很多程序开始时运行于用户态,但在执行的过程中,一些操作需要在内核权限下才能执行,这就涉及到一个从用户态切换到内核态的过程。比如 C 函数库中的内存分配函数 malloc(),它使用 sbrk() 系统调用来分配内存,当 malloc 调用 sbrk() 的时候就涉及一次从用户态到内核态的切换,类似的函数还有 printf(),调用的是 wirte() 系统调用来输出字符串,非常普遍。
用户进程切换到内核态的方式主要有三种:
- 系统调用:用户进程主动发起的操作。用户态进程发起系统调用主动要求切换到内核态,陷入内核之后,由操作系统来操作系统资源,完成之后再返回到进程
- 异常:被动的操作,且用户进程无法无法预测其发生的时机。当用户进程在运行期间发生了异常(比如某条指令出了问题),这时会触发由当前运行进程切换到处理此异常的内核相关进程中,也即是切换到了内核态。异常包括程序运算引起的各种错误如除 0、缓冲区溢出、缺页等
- 中断:当外围设备完成用户请求的操作后,会向 CPU 发出相应的中断信号,这时 CPU 会暂停执行下一条即将要执行的指令而转到与中断信号对应的处理程序去执行,如果前面执行的指令是用户态下的程序,那么转换的过程自然就会是从用户态到内核态的切换。中断包括 I/O 中断、外部信号中断、各种定时器引起的时钟中断等。中断和异常类似,都是通过中断向量表来找到相应的处理程序进行处理。区别在于,中断来自处理器外部,不是由任何一条专门的指令造成,而异常是执行当前指令的结果
为了便于用户进程和内核交互,现代OS都提供了一组接口,让应用能够受限的访问硬件、创建新进程、和既有进程通信,以及申请其它OS资源。这组接口一般称为系统调用,它的主要出发点是保证系统的稳定性,防止应用破坏系统。
系统调用在用户进程和硬件设备之间设立中间层,目的有三:
- 为用户空间提供硬件的抽象接口,而不顾硬件的规格(例如磁盘介质类型)
- 保证系统的稳定性和安全性,内核可以基于用户类型、权限等规则对访问进行控制
- 让每个进程能运行在独立的虚拟系统中,以实现虚拟内存和多任务
系统调用是用户空间访问内核的唯一手段,除了异常(exceptions)和陷入(traps),它是进入内核的唯一合法入口。
要执行系统调用(Linux称为syscall),一般通过C库中定义的函数来进行,这些系统调用一般具有0-N个入参,一个long型的返回值,并且对系统具有副作用(side effects)。返回非零值往往意味着错误,C库会把错误码写入 errno 全局变量,应用程序可以调用 perror() 将errno解析为有意义的字符串。
系统调用通过SYSCALL_DEFINE系列宏来定义,例如getpid系统调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
//asmlinkage是一个编译指令,提示编译器仅从栈中提取函数入参 //long型返回值保证32/64位系统兼容 #define SYSCALL_DEFINE0(name) asmlinkage long sys_##name(void) //SYSCALL_DEFINE0表示无入参系统调用 SYSCALL_DEFINE0( getpid ) { return task_tgid_vnr( current ); } //展开后的形式为: asmlinkage long sys_getpid(void) { //sys_是所有系统调用命名的统一前缀 return task_tgid_vnr( current ) } |
Linux中的每个系统调用被分配一个唯一的系统调用号,即使在后续的内核版本中,某个系统调用被删除(尽管很罕见),该调用号也不能重用,而是关联到 sys_ni_call() ,该函数仅仅返回一个 -ENOSYS
进程不会提及系统调用的名称,而只是使用系统调用号来执行系统调用。
内核记录所有已注册的系统调用的列表,存放在 sys_call_table ,该表和体系结构相关,对于x86_64,它定义在/arch/x86/kernel/syscall_64.c中,该表为每个有效的系统调用分配调用号:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
// /arch/x86/kernel/syscall_64.c #define __SYSCALL(nr, sym) [nr] = sym, const sys_call_ptr_t sys_call_table[__NR_syscall_max+1] = { /* *Smells like a like a compiler bug -- it doesn't work *when the & below is removed. */ [0 ... __NR_syscall_max] = &sys_ni_syscall, #include <asm/unistd_64.h> }; // /arch/x86/include/asm/unistd_64.h #define __NR_read 0 __SYSCALL(__NR_read, sys_read) #define __NR_write 1 __SYSCALL(__NR_write, sys_write) #define __NR_open 2 __SYSCALL(__NR_open, sys_open) …… |
相比起其它OS,Linux的系统调用执行速度更快,原因是:
- Linux上下文切换消耗的时间很短,进出内核都被优化的很高效
- 系统调用和系统调用处理程序本身被设计的很简洁
内核运行在受保护的地址空间上,因此用户空间的程序不能直接调用系统调用(内核代码),否则系统安全性和稳定性被破坏。因此,需要一种合理的方式通知内核需要执行一个系统调用,并由内核自己去执行。
通知内核的机制依靠软中断来实现:通过触发一个异常,促使系统切换到内核态去执行异常处理程序。在x86体系结构中,预定义的软中断是中断号128,通过 int $0x80 指令可以触发该中断,该指令会触发一个异常,导致切换到内核态并执行第128号异常处理程序——该程序恰恰就是系统调用处理程序。系统调用出来程序的名称为 system_call() ,其实现和体系结构密切相关。
新型的x86处理器添加了一条 sysenter 指令,可以更快的陷入内核执行系统调用,Linux已经支持该指令。
由于所有系统调用陷入内核的方式都一样,因此需要通过某种方式把系统调用号传递给内核。在x86上,系统调用号通过eax寄存器传递。在陷入内核前,用户空间已经负责把系统调用号存入eax。内核获取调用号后,在sys_call_table上查找,并找到相应的系统调用实现。
除了调用号,大部分系统调用还需要一些入参,这些入参也需要传递给内核。最简单的方式是同调用号一样,通过寄存器传递,在x86系统中,ebx、ecx、edx、esi、edi可以存放前五个参数。超过五个参数的系统调用比较少见,此时,使用单个寄存器来存放所有入参在用户空间地址的指针。
系统调用的返回值,需要传递给用户空间,在x86上返回值存放在eax寄存器中。
下面是系统调用执行过程的示意图:
如果要实现一个新的系统调用,那么首先要明确其功能,Linux提倡系统调用遵循单一职责的原则。然后,需要明确参数、返回值和错误码,参数应该尽可能少,尽量着眼未来,避免不必要的限制(通用化),另外不要做错误的假设(机器字长、字节序…)
应当避免实现新的系统调用,因为系统调用号由官方分配,而且内核稳定后即固化。
必须对系统调用的参数进行严格的验证:
- 参数有效、合法性:例如I/O操作应当检测文件描述符有效性,进程操作应当检测PID有效性
- 指针检查:必须严格检查进程传入的指针:
- 指针指向的内存区域必须属于用户空间,防止进程诱骗内核读取内核空间的数据
- 指针指向的内存区域必须在进程的地址空间中,防止进程窃取其它进程的数据
- 如果是读操作,指针目标内存必须可读;写操作、执行操作类似。防止进程绕过内存访问限制
内核提供了两个函数,用于交换用户空间和内核空间的数据,同时完成必要的检查:
函数 | 说明 |
copy_to_user() |
将内核中的数据复制到用户空间,包含三个参数:dst进程地址空间中的目标地址;src内核空间中的源地址;size需要拷贝的字节数。如果执行失败,返回没能完成拷贝的字节数,如果成功返回0 注意:该函数可能引起阻塞,因为包含用户数据的页可能被交换在磁盘上,此时当前进程将会休眠,直到缺页处理程序将页换入内存 |
copy_from_user() | 将用户空间的数据拷贝到内核。类似上面 |
下面这个简单的系统调用说明了这两个函数的用法:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
//使用内核中的一个long作为中转,将src拷贝到dst SYSCALL_DEFINE3( silly_copy, unsigned long *, src, unsigned long *, dst, unsigned long len ) { unsigned long buf; //内核中的缓冲区 if ( copy_from_user( &buf, src, len ) ) //复制到内核 return -EFAULT; if ( copy_to_user( dst, &buf, len ) ) //复制到用户 return -EFAULT; return len; } |
必须检查进程的用户是否有足够的权限来执行系统调用。老版本的内核提供一个粗粒度的函数 suser() 来判断是否具有超级用户权限。现在则可以进行细粒度的权限控制, capable() 函数可以用来检查用户是否有权对特定的资源进行操作,例如 capable(CAP_SYS_NICE) 用来检查当前用户是否有资格调整其它进程的nice值。
默认情况下,超级用户具有所有权限,其它用户没有任何权限。下面的例子展示了reboot系统调用如何进行权限检查:
1 2 3 4 5 6 7 8 9 |
SYSCALL_DEFINE4(reboot, int, magic1, int, magic2, unsigned int, cmd, void __user *, arg) { char buffer[256]; int ret = 0; /* We only trust the superuser with rebooting the system. */ if (!capable(CAP_SYS_BOOT)) return -EPERM; |
/include/linux/capability.h包含了所有权限定义的列表。
一个系统调用编写完后,需要注册:
- 在系统调用表中,将其注册为最后一个表项。对于每个需要支持的体系结构的系统调用表,都需要做这样的工作。对于大部分体系结构,系统定义表位于 entry.s 中
- 对于每个需要支持的体系结构,必须把系统调用号定义在 asm/unistd.h
- 系统调用必须编译进内核映像,而不是内核模块,这意味着系统调用代码必须存放在 /kernel 目录下的某个源文件中,例如 sys.c ,该文件包含大量系统调用
一般系统调用靠C库来支持,用户程序通过包含头文件,并与C库链接,即可使用系统调用。对于自定义的系统调用,可以通过Linux提供的一组宏来直接调用,而不需要C库的支持。这些宏命名为 _syscalln() ,下面以open系统调用为例说明:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
//open系统调用的签名: long open( const char *filename, int flags, int mode ); //不依赖库的系统调用直接使用形式 /** * 下面的这个宏定义在asm/unistd.h中,对应了系统调用号 */ #define __NR_open 5 /** * 对于每个宏,具有2 + 2 * n个参数 * 第一个参数为系统调用返回值类型 * 第二个参数为系统调用的名称 * 后面的2 * n个参数为系统调用入参类型、名称对 * * 这些宏会被扩展为内联汇编的C函数,由汇编语言负责把系统调用号、入参压入寄存器,并触发软中断导致陷入内核 */ _syscall3(long, open, const char *, filename, int, flags, int, mode) |
内核执行系统调用时,处于进程上下文——current指针指向引发系统调用的那个进程。在进程上下文中内核可以休眠或者被抢占:休眠能力为内核编程带来很大便利;可抢占性意味着和用户空间内的进程一样,处于内核中的当前进程同样可以被抢占。由于获得CPU的新进程可以执行同一系统调用,这要求系统调用必须是可重入的。
与之相反的,当内核在执行中断处理程序时,则不能休眠,因此中断处理程序相对于系统调用,能够进行的操作受到很大限制。
当系统调用返回时,控制权仍然在 system_call() 手中,它负责切换到用户空间,并让用户进程继续执行下去。
对连接到计算的设备(硬盘、键盘、显卡、网卡……)进行有效管理是任何操作系统的核心任务之一。
相比起CPU,外围硬件的速度非常慢,因此,如果让CPU向硬件发送请求并等待响应,显然是性能低劣的做法。既然如此,就应该在硬件处理请求期间,让内核去处理其它事务,等硬件真正完成处理后,再去处理响应。
那么,CPU怎么知道硬件完成处理了呢?轮询(Polling)是一种可能的方法,但是会让内核做很多无用功,更好的办法是允许硬件完成处理后,主动向内核推送一个信号,这就是中断机制。
从硬件角度来说,中断使得硬件能够发送通知给处理器,例如,当键盘击键时,键盘控制器会发送一个中断,通知处理器有按键被按下。处理器接收到信号后,马上向操作系统反馈,操作系统就会立即处理此中断。硬件设备产生中断时,不会考虑与处理器时钟的同步,也就是说,硬件中断随时都可能发生——内核随时可能被新到来的中断打断。
从物理学角度来说,中断的本质是一种特殊的电信号,由硬件设备生成,并直接送入中断控制器的输入引脚中。中断控制器是一个简单的芯片,其作用是汇集多路中断管线,采用复用技术,将它们连接到单个处理器管线。
不同硬件设备的中断不同,它们具有唯一的数字标识,因此操作系统能够识别中断来自硬盘还是键盘,进而调用不同的中断处理程序。这些中断值通常被称为中断请求(IRQ)线。在经典的PC上,IRQ 0是时钟中断,IRQ1是键盘中断,但是某些中断号是动态分配的,例如连接在PCI总线上的设备。不过中断号的数值本身并不重要,重要的是内核知道中断号和哪个设备关联。
讨论中断时,往往提及一个相关的概念——异常。与中断不同,异常产生时必须与处理器时钟同步,异常经常被称为同步中断。当处理器执行到错误指令(比如除零)或者特殊情况(如缺页),必须依靠内核处理时,处理器就会产生一个异常。异常发生时,内核代表产生异常的进程执行。
由于很多体系结构下,异常(处理器本身产生的同步中断)和中断(外围硬件产生的异步中断)的处理方式相同,因而很多知识是通用的。
x86的系统调用就是一种异常,通过软中断指令,导致陷入内核,进而引起一种特殊的异常——系统调用处理程序异常。
中断一旦发生,硬件一般会一直等待,直到CPU应答它为止,因此CPU必须尽快的处理中断并先响应硬件。响应一个特定中断的时候,内核会执行一个称为“中断处理程序(Interrupt service routine)”的函数,不同中断(例如系统时钟中断和键盘中断)的函数亦不同。设备的中断处理程序是它的设备驱动程序(Driver,用于对设备进行管理的内核代码)的一部分,某个设备发起多种中断也是可能的。
在Linux中,中断处理程序是遵循特定参数格式的C函数。这些函数与内核其它函数的本质区别是,中断处理程序运行于特殊的上下文——中断上下文中。中断上下文有时也被称为“原子上下文”,因为它不可阻塞、不可被抢占(但是可能被其它中断处理程序中断)。这一原子特性也要求中断处理程序必须在尽可能短的时间内完成。
中断处理程序至少需要通知设备,中断已经收到。实际场景中,它往往需要完成复杂的工作,以网卡的中断处理程序为例,它需要:
- 给网卡以中断应答
- 将网卡中的网络数据包拷贝到内存
- 将数据包转交给协议栈或者应用程序处理
中断处理程序的执行速度与其工作量是一对此消彼长的矛盾需求,因此,一般把中断处理切分为两部分:
- 上半部(Top half):中断处理程序属于上半部,接收到中断就立刻执行,但只执行具有严格时限的工作,例如应答或者复位硬件,这些工作是在中断被禁止的情况下完成的
- 下半部(Bottom half):其它能够允许稍后执行的工作,推迟到下半部中,在适当的时机,下半部会被执行,其时所有中断被启用
还是以网卡中断处理为例,当网卡接收到来自网络的数据包时,需要通过硬件中断通知内核,为了优化网络吞吐量和传输周期,网卡需要立即发出通知。内核也立即开始执行中断处理:
- 上半部处理:通知网卡中断已收到,拷贝网络数据包到内存,并读取更多网络数据包,这些都属于紧迫而硬件相关的工作,因为网卡的缓冲区很小而又大小固定,拷贝数据的动作如果延迟到下半部,很可能导致网卡缓冲区溢出,而导致后续到达的数据被丢弃
- 结束中断处理程序,控制器回归先前被中断的程序
- 下半部处理:处理数据包,由协议栈和应用程序后续完成
中断处理程序是设备驱动的组成部分,因此,驱动负责将其注册到内核。使用下面的函数可以注册中断处理程序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
/** * 注册中断处理程序,成功返回0,返回-EBUSY表示给定的中断线已经被使用 * 注意:该函数可能会休眠(调用了kmalloc),因此不能在中断上下文或者其它不允许阻塞的代码中调用该函数 */ extern int request_irq( /** * 需要分配的中断号(中断线的号码) * 某些设备是固定的,例如PC系统时钟和键盘;大部分其它设备可以通过探测获取或者编程动态确定 */ unsigned int irq, irq_handler_t handler, //指向实际中断处理函数的指针 unsigned long flags, //中断处理程序标志 const char *name, //与中断相关的设备的ASCII文本表示,例如PC键盘是“keyboard” /** * 用于共享中断线,当中断处理程序需要释放时dev提供一个唯一标识(cookie), * 以便从共享中断线的诸多处理程序中定位并删除当前程序 * * 内核每次调用中断处理程序时,都会传递该参数,实践中往往使用该参数传递驱动程序的设备结构 */ void *dev ); //下面是一个例子: if (request_irq(irqn, my_interrupt, IRQF_SHARED, "my_device", my_dev)) { printk(KERN_ERR "my_device: cannot register IRQ %d\n", irqn); return -EIO; } |
request_irq() 函数的第三个参数flags可以是0,或者是包含(但不限,仅列出重要的)下表中标志的掩码:
标志 | 说明 |
IRQF_DISABLED | 指示内核在处理该中断处理程序时,要禁用所有其它中断。一般不用设置,除非是轻量级、需要快速执行的中断 |
IRQF_SAMPLE_RANDOM | 指示该设备产生的中断可以对内核entropy pool有贡献,从而利于产生真正的随机数,如果设备产生中断的频率不可预知,则可以设置该位 |
IRQF_TIMER | 特地为系统定时器的中断处理准备 |
IRQF_SHARED | 指示可以在多个中断处理程序之间共享中断线,在同一个中断线上注册的每个中断处理程序必须设置该标志,否则每条线上只能有一个中断处理程序 |
当卸载驱动程序时,需要注销相关的中断处理程序,并释放中断线:
1 2 3 4 5 6 7 8 |
/** * 释放中断处理程序 * 如果中断线不是共享的,那么删除该程序的同时会禁用中断线 * 否则,只删除dev指向的中断处理程序。仅当中断线上所有程序被删除,中断线才会被禁用 * @param irq 中断号 * @param dev 当前中断处理程序的标识 */ void free_irq(unsigned int irq, void *dev); |
中断处理程序的签名如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
enum irqreturn { IRQ_NONE, //中断处理程序检测到一个中断,但是产生该中断的设备并不是注册该程序时指定的设备 IRQ_HANDLED, //中断处理程序被正确调用,中断来自指定的源 IRQ_WAKE_THREAD, }; typedef enum irqreturn irqreturn_t; /** * 中断处理函数 * @params int irq 中断号 * @params void *dev 设备标识(通常为包含设备特有信息的结构体),2.0-没有该参数,导致使用同一驱动的不同设备必须依赖irq去区分 * @return irqreturn_t 处理结构枚举值,其实就是int,之所以用typedef是为了和老版本内核兼容,老版本返回值是void */ typedef irqreturn_t (*irq_handler_t)( int irq, void *dev ); |
中断处理程序包含的逻辑,取决于产生中断的设备,以及产生中断的原因。最简单的程序可能仅仅告知设备中断已经收到,复杂一些的程序可能需要在中断处理程序中收发数据或者其它一些扩充性的工作。扩充性的工作应当尽量推迟到下半部而不是在中断处理程序中。
在Linux中,中断处理程序是无需支持重入的: 因为当一个中断处理程序正在执行时,相应中断线(其它中断线默认不受影响)在所有处理器上都被屏蔽,这就防止了在此期间出现新的中断。不可重入性简化了中断处理程序的编程复杂性。
比起非共享的中断处理程序,共享版本具有以下三个特点:
- 注册时必须设置IRQF_SHARED标志
- 注册时dev参数必须具有唯一性
- 中断处理程序必须能够识别它的设备是否真的产生了中断,这需要硬件的配合。内核接收到中断后,会依次调用目标中断线上注册的所有中断处理程序,因此,中断处理程序必须知道自己是否应该为某个中断负责,如果它相关的设备没有产生这个中断,那么中断处理程序应该立即退出(返回IRQ_NONE)。硬件必须提供必要的支持,例如包含一个状态寄存器,大部分硬件都有类似机制
所有共享中断线的驱动程序必须满足上面三点,否则中断线就无法共享。
下面以一个实际的中断处理程序为例,realtime Clock(RTC)驱动程序,该驱动被PC等体系结构支持。该程序用于设置系统时钟、提供报警器(Alarm)或周期性定时器的支持,第一个功能通过直接写寄存器或者内存地址即可实现,但是后两个必须依赖于中断机制。
RTC驱动加载时,它会注册中断处理程序:
1 2 3 4 5 6 7 8 9 |
static int __init rtc_init(void) { //rtc_irq是中断号,在PC上为8 if (request_irq(rtc_irq, rtc_interrupt, IRQF_SHARED, "rtc", (void *)&rtc_port)) { rtc_has_irq = 0; printk(KERN_ERR "rtc: cannot register IRQ %d\n", rtc_irq); return -EIO; } |
处理程序的内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
/** * 只要内核接收到一次RTC中断,就会调用此函数 */ static irqreturn_t rtc_interrupt(int irq, void *dev_id) { //该自旋锁定防止rtc_irq_data被SMP上其它CPU并发访问 spin_lock(&rtc_lock); //rtc_irq_data是unsigned long,存放RTC相关信息,每次中断都会更新 rtc_irq_data += 0x100; rtc_irq_data &= ~0xff; if (is_hpet_enabled()) { rtc_irq_data |= (unsigned long)irq & 0xF0; } else { rtc_irq_data |= (CMOS_READ(RTC_INTR_FLAGS) & 0xF0); } //如果设置了RTC周期性定时器 if (rtc_status & RTC_TIMER_ON) mod_timer(&rtc_irq_timer, jiffies + HZ/rtc_freq + 2*HZ/100); spin_unlock(&rtc_lock); //该自旋锁定防止rtc_callback被SMP上其它CPU并发访问 spin_lock(&rtc_task_lock); if (rtc_callback) //执行预先设置好的回调函数 rtc_callback->func(rtc_callback->private_data); spin_unlock(&rtc_task_lock); wake_up_interruptible(&rtc_wait); kill_fasync(&rtc_async_queue, SIGIO, POLL_IN); //示意已经完成处理,因为该中断非共享,因此不需要测试虚假中断 return IRQ_HANDLED; } |
当执行中断处理程序时,内核处于中断上下文(Interrupt context)。与之对应的是进程上下文,它是内核所处的另外一种操作模式,此时内核代表进程执行——例如执行系统调用或者运行内核线程,在进程上下文中, current 宏关联当前进程。在进程上下文中内核可以休眠,也可以执行调度。
和进程上下文不同,中断上下文和任何进程无关(尽管current指向被中断的进程),由于不存在对应的进程,因而也就不能睡眠(睡眠是进程的概念)、不能被调度。在中断上下文中,不能调用任何可能导致休眠的函数。
中断上下文具有严格的时间限制,因为其打断了其它代码(甚至是不同中断线上的中断处理程序)的执行。中断上下文中的代码应当迅速、简洁,避免循环处理。
中断处理程序的栈是一个配置选项,以前中断处理程序没有自己的栈,而是借用被中断进程的内核栈。内核栈的大小为2页(32位8KB,64位16KB)。在2.6+版本,增加了一个选项,可以将内核栈减小为1页,为了应对栈的减小,中断处理程序开始拥有自己的栈了,每个处理器一个,大小1页,称为中断栈。
中断处理机制的实现非常依赖于具体的体系结构。下图是中断从硬件到内核的处理路由:
- 设备产生中断,并通过总线把电信号传递给中断控制器
- 如果中断线是激活的,则中断控制器将中断发往处理器。在大部分体系结构上,通过电信号向CPU特定管脚发送一个信号实现
- 如果处理器上没有禁用该中断,则处理器立即停止当前工作,关闭中断系统,并跳转到内存中预定义的位置开始执行那里的代码,此预定义位置由内核设置,是中断处理程序的入口点(类似于系统调用通过预定义的Exception handler进入内核)
- 对于每一条中断线,处理器都会跳转到一个唯一的内存位置,内核因此知晓中断号。初始的入口点代码仅仅是简单的保存中断号和寄存器值(属于被中断的进程)到栈上,然后调用 do_IRQ() 。尽管还是和体系结构相关,从这里开始,代码基本上使用C编写
- do_IRQ() 的签名为 unsigned int do_IRQ(struct pt_regs regs) ,因为C语言的调用惯例是把函数入参存放在栈顶,因此 pt_regs 包含了原始寄存器的值和中断号,do_IRQ将中断号提取出来,并禁止这条中断线,在PC上,这一逻辑由 mask_and_ack_8259() 完成
- 然后,
do_IRQ() 判断是否该中断线上具有有效的处理程序,并且当前没有被执行,如果为真,则调用
handle_IRQ_event() 来运行模板中断处理程序:
123456789101112131415161718192021222324252627282930313233343536373839404142434445irqreturn_t handle_IRQ_event(unsigned int irq, struct irqaction *action){irqreturn_t ret, retval = IRQ_NONE;unsigned int status = 0;//如果没有指定“禁止其它所有中断”,则恢复当前CPU的硬件中断if (!(action->flags & IRQF_DISABLED))local_irq_enable_in_hardirq();do {//遍历所有潜在的处理程序trace_irq_handler_entry(irq, action);ret = action->handler(irq, action->dev_id); //调用一个处理程序trace_irq_handler_exit(irq, action, ret);switch (ret) {case IRQ_WAKE_THREAD: //唤醒该处理程序的执行线程(handler thread)ret = IRQ_HANDLED;if (unlikely(!action->thread_fn)) {warn_no_thread(irq, action);break;}if (likely(!test_bit(IRQTF_DIED,&action->thread_flags))) {set_bit(IRQTF_RUNTHREAD, &action->thread_flags);wake_up_process(action->thread);}case IRQ_HANDLED: //如果该处理程序完成了中断的处理,则退出status |= action->flags;break;default:break;}retval |= ret;action = action->next;} while (action);if (status & IRQF_SAMPLE_RANDOM)add_interrupt_randomness(irq); //处理entropy poollocal_irq_disable(); //再次禁止中断return retval;} - 返回到 do_IRQ() ,执行一些清理工作,并返回到初始入口点,并跳转到 ret_from_intr()
-
ret_from_intr() 类似于初始入口点,是用汇编编写的,它检查是否存在挂起的重新调度(need_resched),如果是,并且:
- 内核正在返回用户空间(即当初被中断的是用户进程),则调用 schedule()
- 内核正在返回内核空间(即当初被中断的内核代码),则判断 preempt_count 是否为0,是则 schedule() ,否则说明抢占内核是不安全的,不得进行重新调度
对于x86_64,初始的汇编例程位于 arch/x86/kernel/entry_64.S ,相关的C方法位于 arch/x86/kernel/irq.c 。其它体系结构类似。
Procfs是一个仅存在于内核内存的虚拟文件系统,通常挂载于 /proc ,读取/写入Procfs文件系统会导致相关内核函数的调用。
读取/proc/interrupts文件,可以获得系统关于中断的统计性信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
root@gmem:~# cat /proc/interrupts CPU0 CPU1 16: 18473588 0 xen-percpu-virq timer0 17: 158 0 xen-percpu-ipi spinlock0 18: 3935398 0 xen-percpu-ipi resched0 19: 0 0 xen-percpu-ipi callfunc0 20: 0 0 xen-percpu-virq debug0 21: 8117 0 xen-percpu-ipi callfuncsingle0 22: 1 0 xen-percpu-ipi irqwork0 23: 0 16477170 xen-percpu-virq timer1 #第一列:中断线(中断号) #第2-N列:各CPU上,该中断发生的次数 #倒数第2列:处理此中断线的中断控制器 #最后一列:与该中断相关的设备的名称,该名就是request_irq()的参数*name |
内核提供了一组接口(定义在asm/system.h、<asm/irq.h),用于操控计算机的中断状态。通过这些接口可以:
- 禁止当前CPU的中断系统
- 屏蔽整台计算机的某条中断线
之所有要对中断功能进行控制,归根到底是需要提供同步。通过禁止中断,可以确保某个中断处理程序不会抢占当前的代码,禁止中断还可以禁止内核抢占。
禁止中断并没有提供保护机制来防止其它CPU的并发访问(内核数据结构),因此内核代码常常需要获得某种锁。
总结一下:
- 禁止中断提供保护机制,防止来自其它中断处理程序的并发访问(共享数据)
- 锁提供保护机制,防止来自其它CPU的并发访问(共享数据)
下面两个函数用于禁止和激活当前处理器上的中断:
1 2 3 4 |
//禁用当前处理器的中断 local_irq_disable(); //启用当前处理器的中断 local_irq_enable(); |
这两个函数往往使用单条汇编指令实现,例如在x86上,分别对应了 cli 和 sti 指令(即CLearInterrupt、SeTInterrupt),这些指令将禁用/启用中的关的传递。
这连个函数强制性的设置中断为禁止或启用状态,可能招致危险——代码执行路径的复杂性导致判断先前是禁用还是启用变得困难。因此,保存/恢复中断状态是更好的选择:
1 2 3 4 |
unsigned long flags; local_irq_save( flags ); /* 导致中断被禁用 */ local_irq_restore( flags ); /* 导致中断被恢复到原先的状态 */ //注意:flags不得传递给另外一个函数,即,必须在当前栈帧上使用flags,上面的函数对只能在一个函数内部使用 |
上面四个函数既可以在中断上下文中调用,也可以在进程上下文中调用。
从2.5+开始,Linux禁止了全局的cli()、sti()调用,这两个调用可以在全局(所有处理器)返回禁止或者启用中断。 为了确保对共享数据的互斥访问,以前可以通过全局禁止中断达到互斥,限制必须通过本地中断控制+自旋锁。
某些时候,只需要屏蔽(masking out)系统中的某条中断线就足够:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
/** * 禁用指定中断线上的中断,该函数只有当前所有中断处理程序完成后才返回 */ void disable_irq( unsigned int irq ); /** * 类似上面,立即返回 */ void disable_irq_nosync( unsigned int irq ); /** * 启用中断线上的中断,对于前两个函数的每一次调用,必须有相应的该函数调用,才能最终启用中断 */ void enable_irq( unsigned int irq ); /** * 等待中断线上正在执行的处理程序完毕 */ void synchronize_irq( unsigned int irq ); |
禁止共享中断线是不合适的,因为其它设备的中断也无法传递,因此新的设备驱动程序应避免使用这些接口。
下表中的函数用于查询中断系统的状态:
函数/宏 | 说明 |
irqs_disabled() | 定义在asm/system.h,如果当前处理器的中断系统被禁用,返回非0 |
in_interrupt() | 定义在linux/hardirq.h,如果内核处理任何类型的中断处理(内核要么在执行中断处理程序,要么在执行下半部处理)中,返回非0。如果返回0,则内核处于进程上下文 |
in_irq() | 定义在linux/hardirq.h,如果内核正在执行中断处理程序,返回非0 |
由于下列原因,中断处理程序只能完成中断处理的上半部分:
- 中断处理程序以异步方式执行,并且可能打断其它重要代码(包括其它中断处理程序),因此,为了避免被打断代码停止时间过长,中断处理程序应该尽快的执行
- 如果一个中断处理程序正在运行:当IRQF_DISABLED被没有被设置,这是最好的情况,相同中断线上其它中断会被屏蔽;当IRQF_DISABLED被设置则是最坏的情况,当前处理器上所有其它中断被屏蔽。由于禁用中断后,硬件无法与OS通信,这也要求中断处理程序执行的越快越好
- 中断处理程序往往需要对硬件进行操作,因此时效性要求很高
- 中断处理程序不在进程上下文中运行,因此不能阻塞,这限制了它们可以做的事情
综上,必须快速、异步、简单的对硬件中断做出响应并完成对时间要求很严格的操作,对于其它的、时间要求相对宽松的操作,应该推后到中断被激活之后执行。
上一段内容已经很好的讨论了上半部,本段主要介绍下半部的工作。
下半部的任务就是处理和中断处理密切相关,但中断处理程序本身不执行的工作。为了让中断处理程序最快,理想情况下是所有工作都由下半部执行,到底哪些工作由下半部处理并无一致性的规定,下面几条建议供借鉴:
- 如果一项任务对时间非常敏感,则放在中断处理程序中
- 如果一项任务和硬件相关,则放在中断处理程序中
- 如果一项任务要保证不被其它中断(特别是同一中断)打断,则放在中断处理程序中
- 其它任务,考虑放在下半部
那么下半部到底要推迟到什么时候呢?精确的时间并不需要,通常下半部在中断处理程序返回后就会立马执行,关键是,下半部运行的时候,中断机制已经恢复,不会因为下半部导致时效性问题。
与上半部(必须通过中断处理程序)不同,下半部可以有多种实现机制,这些机制在Linux发展过程中相继出现(有的已经废弃不用),由不同的接口和子系统组成:
下半部机制 | 说明 |
BH |
最初的下半部机制就称为“下半部”,因为它是推迟中断处理工作的唯一方式。这里称其为BH以便和术语下半部区分。 BH提供一个静态创建的、由32个bottom halves组成的链表,上半部通过32位整数中的一位来标明哪个bottom half可以执行。每个BH都是全局范围同步(即使中断分属不同CPU也不允许任何两个BH同时执行),该机制简单但是有性能瓶颈 在2.5+,BH被完全废弃 |
任务队列 | 任务队列(Task Queue)是不久出现的BH代替。内核定义一组队列,每个队列包含一个由等待被调用函数组成的链表。驱动程序会把自己的下半部注册到适当的队列上去,该机制离不开BH。对性能要求高的子系统,例如网络,该机制依然难以胜任 |
软中断和Tasklet |
从2.3+,内核引入了软中断(softirqs,和系统调用的软中断,确切的说软件中断,不是一个概念)和Tasklet,如果不考虑兼容性,可以完全代替BH 软中断是一组静态定义(编译期)的下半部接口,共32个,支持在所有CPU上同时执行——即使两个类型相同。软中断在执行期间不能睡眠,也不会被其它软中断打断,它只能被硬件中断打断(因为执行期间不禁止中断) Tasklet是基于软中断实现的灵活性强、动态创建(注册)的下半部实现机制(和进程没有任何关系,尽管叫task),软中断的限制性特征同样适用于Tasklet Tasklet是性能和易用性的折中,不同类型的Tasklet可以在不同处理器上同时执行,相同类型的,则不可 除非是性能要求非常高的情况,例如网络,才需要使用软中断,否则Tasklet就足够了 |
工作队列 |
在2.5+,任务队列被工作队列代替。工作队列简单有效:它们先对推后执行的工作排队,稍后在进程上下文中执行它们 |
内核定时器 |
用于把某个操作推迟到确定的时间段之后执行 |
综上,在2.6+,软中断、Tasklet、工作队列等是可用的几种不同形式的下半部机制。
软中断在实践中运用较少,但是它是Tasklet的基础,其代码位于kernel/softirq.c。软中断在编译期静态分配,由下面的结构表示:
1 2 3 4 |
struct softirq_action { void (*action)(struct softirq_action *); //把整个结构体传递给软中断处理程序,是为了方便未来扩展新字段 }; |
32个该结构体的数组定义如下:
1 |
static struct softirq_action softirq_vec[NR_SOFTIRQS] __cacheline_aligned_in_smp; |
每个注册的软中断,都会占据上述数组的一个元素,因此,最多可能有32个软中断。在2.6版本的内核,有9个元素已经被占用。
软中断处理程序(软中断action字段)的原型为: void softirq_handler(struct softirq_action *) ,当内核运行一个软中断处理程序时,就会执行该action函数,唯一的入参指向softirq_vec数组的元素,对于元素 my_softirq ,内核调用 my_softirq ->action(my_softirq) 。
一个软中断不会抢占另外一个,实际上,只有中断处理程序才能抢占软中断。但是其它软中断(包括同类型的)可以在别的处理器上运行。
一个注册的软中断必须在被标记后才会执行,这一标记动作称为软中断触发(raising the softirq)。通常,中断处理程序会在返回前标记它的软中断,使其在稍后被执行。在以下地方待处理的软中断会被检查和执行:
- 硬件中断代码路径的返回点
- 内核线程ksoftirqd
- 显式调用检查、执行待处理软中断的任何代码,例如网络子系统
不管使用什么方式触发,软中断都在下面的函数中执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
asmlinkage void do_softirq( void ) { __u32 pending; unsigned long flags; if ( in_interrupt() ) return; local_irq_save( flags ); //保存当前CPU中断状态,并禁用中断 //32位图,如果第n位设置为1,则意味着第n个软中断等待处理 pending = local_softirq_pending(); if ( pending ) __do_softirq(); local_irq_restore( flags ); //恢复当前CPU的中断 } |
如果当前有等待处理的软中断,就会调用__do_softirq(),其核心代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
asmlinkage void __do_softirq( void ) { struct softirq_action *h; pending = local_softirq_pending(); restart : //这里,本地中断是被禁止的,而下面一句必须在禁止的情况下执行 //否则,可能导致新出现的、未处理的软中断被置零 set_softirq_pending( 0 ); //重置位图,因为当前待处理软中断已经保存 local_irq_enable(); //软中断程序执行期间允许响应中断 h = softirq_vec; do { //循环遍历所有软中断 if ( pending & 1 ) { h->action( h ); //调用软中断处理程序 } h++; pending >>= 1; } while ( pending ); local_irq_disable(); //检查是否在循环过程中又有新的待处理软中断出现,如果是 pending = local_softirq_pending(); if ( pending && --max_restart ) //并且重复最大次数未到达,则再次循环执行 goto restart; if ( pending ) wakeup_softirqd(); //否则标记软中断处理,等待下一轮执行 } |
软中断保留给系统中时效性要求最高、最重要的下半部使用。 在2.6中只有网络、SCSI两个子系统直接使用软中断。此外,内核定时器和Tasklet都是基于软中断的,因此应当优先考虑Tasklet。
分配索引
在编译期间,必须在下面的枚举中静态的声明软中断,根据优先级需要,插入到相应位置:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
//高优先级的软中断应当声明在前面,默认的9项,优先级分别0-8 enum { HI_SOFTIRQ=0, //高优先级Tasklet,一般作为第一项 TIMER_SOFTIRQ, //定时器的下半部 NET_TX_SOFTIRQ, //发送网络数据报 NET_RX_SOFTIRQ, //接收网络数据报 BLOCK_SOFTIRQ, //块设备 Tasklet_SOFTIRQ, //正常优先级的Tasklet SCHED_SOFTIRQ, //进程调度器 HRTIMER_SOFTIRQ, //高分辨率定时器 RCU_SOFTIRQ //RCU锁 }; |
注册软中断处理程序
分配完索引后,需要调用 open_softirq() 注册软中断处理程序,该函数的两个参数分别是软中断索引、处理函数指针,例如网络子系统这样注册软中断处理程序:
1 2 3 4 5 |
static int __init net_dev_init(void) { //... open_softirq(NET_TX_SOFTIRQ, net_tx_action); open_softirq(NET_RX_SOFTIRQ, net_rx_action); |
软中断处理程序执行时:
- 允许当前CPU响应中断,但是软中断处理程序本身不能休眠
- 禁止当前CPU上的软中断,但是其它CPU可以执行软中断,包括同一类型的软中断。这意味着任何共享变量(哪怕仅仅是某个软中断处理程序内部使用的全局变量)都需要严格的锁保护。如果使用互斥锁,则软中断就失去价值了,因此大部分软中断处理程序使用单处理器数据(属于单个CPU的私有数据),免去了加锁的开销,提高了性能
触发软中断
调用 raise_softirq() 可以标记一个软中断为挂起中断,这样下一次 do_softirq() 时它就会被执行。
在中断处理程序中触发软中断,是最常见的形式。内核执行完中断处理程序后,马上就会调用 do_softirq() ,这让上半部/下半部的分界一目了然。
每个处理器包括辅助处理软中断(包括Tasklet)的内核线程,当内核中出现大量软中断的时候,这些线程就会辅助处理之。
在中断处理程序返回时,处理软中断是最常见的情况。但是,软中断可能被高频的触发(例如网络子系统),甚至软中断处理函数还会自行重复触发(网络子系统会这样做)——当软中断正在处理时,触发自己以便再次执行。这种高频触发和自我重新触发的能力会导致用户空间进程无法获得足够的CPU时间而处于饥饿状态。
为了防止用户进程饥饿(同时也避免软中断饥饿),软中断设计者做了一些折中:当大量软中断出现时,一组最低优先级(nice=19)的内核线程被唤醒,以处理软中断。低优先级是防止其与其它重要任务抢夺资源,同时能保证软中断最终会被执行而不致饥饿。
处理软中断的内核线程被命名为ksoftirqd/n,其中n为CPU的编号,每核CPU对应一个这样的线程。一旦ksoftirqd线程被初始化,就会执行类似下面的死循环:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
for ( ;; ) { if ( !softirq_pending( cpu ) ) //如果没有未决的软中断,则执行调度,放弃CPU schedule(); set_current_state( TASK_RUNNING ); //否则,加入运行红黑树 while ( softirq_pending( cpu ) ) //循环处理未决软中断,每次执行后都判断是否需要重新调度 { do_softirq(); if ( need_resched() ) //如需则重新调度 schedule(); } set_current_state( TASK_INTERRUPTIBLE ); //此设置会在下一次循环时生效,决定线程是否被加入睡眠队列 } |
Tasklet是一种软中断(HI_SOFTIRQ、Tasklet_SOFTIRQ),但是接口更简单、锁要求更低(同一Tasklet不会在多个CPU上并发执行),应当优先使用Tasklet而不是软中断,它们非常易用而且大部分情况下性能不错。
Tasklet使用下面的结构体表示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
struct Tasklet_struct { struct Tasklet_struct *next; //链表中下一个Tasklet /** * 状态: * 0 * Tasklet_STATE_SCHED 已经调度待执行(等价于软中断触发) * Tasklet_STATE_RUN 正在执行,只对多处理器有意义,单处理器总是知道当前是不是在运行Tasklet */ unsigned long state; atomic_t count; //引用计数器,只有为0的时候,Tasklet才有资格运行 void (*func)( unsigned long ); //Tasklet处理函数,类似于软中断的action unsigned long data; //给Tasklet处理函数的唯一参数 }; |
已调度(即被触发的软中断)的Tasklet,存放在两个单处理器结构中: Tasklet_vec 存放普通Tasklet; Tasklet_hi_vec 存放高优先级Tasklet,它们都是 Tasklet_struct 的链表。
函数 Tasklet_schedule() 和 Tasklet_hi_schedule() 用来执行Tasklet的调度,它们都接受 Tasklet_struct* 作为入参,调度过程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
/* /include/linux/interrupt.h */ static inline void Tasklet_schedule( struct Tasklet_struct *t ) { //判断Tasklet状态是否为Tasklet_STATE_SCHED,如果是,说明其已经被调度,立即返回 if ( !test_and_set_bit( Tasklet_STATE_SCHED, &t->state ) ) __Tasklet_schedule( t ); } /* /kernel/softirq.c */ void __Tasklet_schedule( struct Tasklet_struct *t ) { unsigned long flags; //保存当前CPU中断状态并禁止中断,防止新到达的中断破坏数据一致性 local_irq_save( flags ); t->next = NULL; //把需要调度的Tasklet存放到变量Tasklet_vec或Tasklet_hi_vec的头部,这两个变量每个CPU都具有独立的副本 *__get_cpu_var( Tasklet_vec ).tail = t; __get_cpu_var( Tasklet_vec ).tail = & ( t->next ); //触发软中断Tasklet_SOFTIRQ/HI_SOFTIRQ,这导致do_softirq()在下一轮执行被调度的Tasklet raise_softirq_irqoff( Tasklet_SOFTIRQ ); local_irq_restore( flags ); //恢复中断状态 } |
当 do_softirq() 执行时,它会调用 Tasklet_action() 和 Tasklet_hi_action() ,后者会依次执行被调度的Tasklet:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
static void Tasklet_action( struct softirq_action *a ) { struct Tasklet_struct *list; //禁止中断,没必要保存中断状态,因为该例程作为软中断一部分调用,此时中断必然是启用的 local_irq_disable(); list = __get_cpu_var( Tasklet_vec ).head; //拷贝链表数据 //在当前处理器上清空链表 __get_cpu_var( Tasklet_vec ).head = NULL; __get_cpu_var( Tasklet_vec ).tail = &__get_cpu_var( Tasklet_vec ).head; //启用中断 local_irq_enable(); while ( list ) //循环遍历链表上每一个待处理的Tasklet { struct Tasklet_struct *t = list; list = list->next; if ( Tasklet_trylock( t ) ) //尝试锁定 { if ( !atomic_read( &t->count ) ) //确认Tasklet没有被禁止 { if ( !test_and_clear_bit( Tasklet_STATE_SCHED, &t->state ) ) BUG(); t->func( t->data ); //执行Tasklet处理函数 Tasklet_unlock( t ); //解锁Tasklet continue; } Tasklet_unlock( t ); } local_irq_disable(); t->next = NULL; *__get_cpu_var( Tasklet_vec ).tail = t; __get_cpu_var( Tasklet_vec ).tail = & ( t->next ); __raise_softirq_irqoff( Tasklet_SOFTIRQ ); local_irq_enable(); } } /** * 对于多处理器系统,这里需要检查Tasklet是否正在其它CPU上执行 * 如果正在其它CPU上执行,则跳过,因为同一Tasklet不能并发执行 */ #ifdef CONFIG_SMP static inline int Tasklet_trylock(struct Tasklet_struct *t) { return !test_and_set_bit(Tasklet_STATE_RUN, &(t)->state); } #endif |
可以静态或者动态的创建Tasklet:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
//静态创建一个名为name的task,处理函数为func,传递的参数为data DECLARE_TASKLET( name, func, data ); //类似,但是设置count=1,导致Tasklet被禁止 DECLARE_TASKLET_DISABLED( name, func, data ); //例如: DECLARE_TASKLET( my_tasklet, my_tasklet_handler, dev ); //展开为: struct tasklet_struct my_tasklet = { NULL, 0, ATOMIC_INIT( 0 ), my_tasklet_handler, dev }; //动态创建,结构体由指针指定 void tasklet_init(struct tasklet_struct *t, void (*func)(unsigned long), unsigned long data); |
创建完Tasklet后,需要编写Tasklet处理函数:
1 2 3 4 5 6 |
/** * 由于Tasklet基于软中断实现,因此不得睡眠,这意味着不能调用信号量之类的阻塞性函数 * Tasklet运行期间运行CPU响应中断,这意味着如果如果Tasklet和中断处理程序共享数据,需要进行防护(例如屏蔽中断,然后回去锁) * Tasklet实例不会在不同CPU上并发执行,这意味着仅仅是Tasklet使用的全局变量不需要保护 */ void tasklet_handler(unsigned long data); |
在合适的时候(例如在中断处理程序返回前),需要调度Tasklet:
1 |
static inline void tasklet_schedule(struct tasklet_struct *t) |
Tasklet被调度后,会尽可能早的执行,如果在执行前,同一个Tasklet再一次被调度,它仍然只会执行一次。 如果当前Tasklet已经在运行,则会被重新调度并再次执行。
作为一种优化措施,Tasklet只会在调度它的CPU上执行,以更好的利用CPU高速缓存。
调用下面的函数可以禁用或者激活Tasklet:
1 2 3 4 5 6 |
//禁用Tasklet,如果Tasklet正在运行,该函数直到它运行完毕才返回 static inline void tasklet_disable(struct tasklet_struct *t); //类似上面,但是立即返回不同步 static inline void tasklet_disable_nosync(struct tasklet_struct *t); //激活 static inline void tasklet_disable_nosync(struct tasklet_struct *t); |
下面的函数用于从调度队列中移除Tasklet:
1 2 3 4 5 6 7 |
/** * 从挂起(等待执行)队列中去除一个Tasklet * 该函数会等待目标Tasklet执行完毕,然后移除 * * 该函数可能引起休眠,因此不能在中断上下文中使用 */ void tasklet_kill(struct tasklet_struct *t); |
与其它机制不同,工作队列(Work queue)总是在进程上下文中执行,因而它支持重新调度甚至睡眠(信号量、阻塞式I/O)。工作队列将工作推后,由一个内核线程负责去执行,它是唯一能在进程上下文中运行的下半部机制。
工作队列是一个用于创建内核线程的接口,其创建的线程负责执行由内核其它部分排到队里的任务,这些内核线程称为工作者线程(Worker thread)。
尽管可以自己创建新的内核线程,但是工作队列提供了一个缺省的工作者线程来处理任务队列,其命名为events/n,其中n为处理器编号(每个处理器一个线程)。一般情况下,应当尽量使用缺省工作者线程,除非你需要处理CPU密集型、性能要求严格的任务,并需要减轻缺省工作者线程的负担、防止其它任务饥饿。
工作者线程使用下面的结构表示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
/** * 外部可见的工作队列抽象,是由每CPU一个的内部工作队列组串的数组 */ struct workqueue_struct { //cpu_workqueue_struct表示每一个CPU的工作者线程 struct cpu_workqueue_struct cpu_wq[NR_CPUS]; struct list_head list; const char *name; // int singlethread; int freezeable; int rt; }; struct cpu_workqueue_struct { spinlock_t lock; //保护此结构的自旋锁 struct list_head worklist; //工作任务链表 wait_queue_head_t more_work; struct work_struct *current_struct; //当前工作 struct workqueue_struct *wq; //关联工作队列结构 task_t *thread; //关联线程 }; |
而工作,则使用下面的结构来抽象:
1 2 3 4 5 6 |
struct work_struct { atomic_long_t data; //入参 struct list_head entry; //链表结构 work_func_t func; //执行的工作处理函数 }; |
所有工作者线程都是普通的内核线程,它们都会执行 worker_thread() 函数,并陷入死循环睡眠,当有任务被插入队列时,它们则被唤醒并执行,然后继续休眠:
1 2 3 4 5 6 7 8 9 10 11 12 |
int main( int argc, char **argv ) { for ( ;; ) { //设置自己为休眠状态,在队列more_work上等待 prepare_to_wait( &cwq->more_work, &wait, TASK_INTERRUPTIBLE ); if ( list_empty( &cwq->worklist ) ) //如果队列为空,则放弃CPU schedule(); finish_wait( &cwq->more_work, &wait ); //移出休眠队列,可运行 run_workqueue( cwq );//执行工作队列中的任务 } } |
run_workqueue() 函数是处理任务的核心逻辑所在:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
int main( int argc, char **argv ) { //循环遍历工作列表 while ( !list_empty( &cwq->worklist ) ) { struct work_struct *work; //获取函数和参数 work_func_t f; void *data; work = list_entry(cwq->worklist.next, struct work_struct, entry); f = work->func; list_del_init( cwq->worklist.next );//从链表中解除当前项目 work_clear_pending( work ); //清零当前任务的待处理标记 f( work ); //执行调用 } } |
首先,使用下面的宏创建需要延迟执行的任务:
1 2 3 4 |
//静态声明 DECLARE_WORK( name, void (*func)( void * ), void *data ); //动态创建,结构由指针提供 INIT_WORK( struct work_struct *work, void (*func)( void * ), void *data ); |
其中func参数就是工作处理函数,其原型为: void work_handler(void *data) ,该函数总是在进程上下文中执行,默认的,该函数执行时中断处于启用状态、不进行任何锁定。注意,由于内核线程没有关联任何用户空间内存映射,因此func不能访问用户空间(只有在内核代表进程执行,例如系统调用时,内核才能访问用户空间)。
创建好任务后,需要对其进行调度:
1 2 3 4 |
//调度任务,工作者线程一旦被唤醒,即立刻执行该任务 schedule_work(&work); //调度任务,延迟至少若干时钟节拍(timer ticks)后执行 schedule_delayed_work(&work, delay); |
有些时候(例如模块卸载),在进行下一步工作前,需要确保一批次的任务已经完成:
1 2 |
//休眠,直到队列中所有任务都执行完毕 void flush_scheduled_work( void ); |
注意上述函数不会取消任何延迟执行的任务,下面的函数可以取消之:
1 |
int cancel_delayed_work( struct work_struct *work ); |
如果默认的工作队列不满足需求,可以创建新的工作队列(包括关联的内核线程):
1 2 |
//该函数会为每个CPU创建一个工作线程 struct workqueue_struct *create_workqueue( const char *name ); //name表示工作线程的名称 |
对于自己创建的工作队列,可以使用下面的函数进行调度、刷空:
1 2 3 |
int queue_work( struct workqueue_struct *wq, struct work_struct *work ); int queue_delayed_work( struct workqueue_struct *wq, struct work_struct *work, unsigned long delay ); flush_workqueue( struct workqueue_struct *wq ); |
下表简单的比较了几种下半部机制:
下半部 | 上下文 | 固有串行化特征 | 优势 |
软中断 | 中断 | 无 | 性能最好,因为序列化限制最少 |
Tasklet | 中断 | 相同Tasklet不得同时执行 | 接口简单,不必考虑同一Tasklet重入并发 |
工作队列 | 进程 | 无(作为进程上下文调度) | 进程上下文,可以阻塞、休眠,可以推迟到指定的时间之后执行下半部 |
由于Tasklet不可重入,因此不需要考虑Tasklet私有全局变量的同步。
如果某个进程上下文和下半部共享数据,那么,进程上下文应当禁用中断、得到锁,然后再访问共享数据。
任何工作队列使用到的共享数据,和一般的内核代码的锁考虑没有区别。
为了保证共享数据的安全,可能需要得到锁,然后禁止下半部的处理,驱动程序中经常使用到这种技巧。
下面两个函数可以禁止所有的下半部(软中断和Tasklet),注意,调用disable多少次,就要相应调用enable多少次,才能实现启用:
1 2 3 4 |
//禁止当前处理器的软中断、Tasklet处理 void local_bh_disable(); //恢复当前处理器的软中断、Tasklet处理 void local_bh_enable(); |
这两个函数使用了每任务一个的 preempt_count 计数器(与内核抢占计数器同名),禁用时递增,启用时递减,为0时下半部机制可用。
在以前的Linux内核版本中,只有发生中断或者内核明确的请求重新调度时,才会存在数据并发访问的问题。从2.0+开始,内核支持对称多处理器(SMP,一种硬件架构,每个CPU地位平等,对资源具有相同、共享的使用权限,并且由单个OS内核控制,现代多数多处理器系统都使用该架构),支持SMP意味着内核代码可以同时运行在多个CPU上,不加保护的内核代码完全可能并发访问共享数据。
从2.6+,Linux发展成抢占式内核,这意味着调度程序可以随时抢占运行中的内核代码。内核中的并发来源现在包括三类:
- SMP:相同的内核代码片段可能在多个CPU上并发运行
- 内核抢占(Kernel Preemption):即使对于单CPU系统,只要允许内核抢占,内核仍然是并发性的。例如运行中的系统调用可能被其它高优先级的进程抢占,后者可能访问共享数据
- 中断处理程序(Interrupt Handler):即使单CPU系统上只有一个进程(不存在SMP、内核抢占问题),内核仍然是并发性的。除非中断被禁止,任何硬件触发的中断会传递给中断处理程序,后者会立即抢占正在运行的进程,中断处理程序可能和此进程共享某些数据
Linux内核是可重入的,这意味着多个控制路径可以同时在内核下运行。当然,在单处理器下,同一时刻只能有一个进程在运行,在SMP上则会出现真正的同时运行。
内核函数可以是可重入的,即同一时刻有多个进程同时访问函数;也可以是非可重入的,这时需要使用锁机制来保护,确保同一时刻只有一个进程执行函数(中的非可重入代码,以及临界区)。任意一个函数,如果它只能修改局部变量,不能修改任何全局变量,那么它必然是可重入的。
内核的可重入性对内核的运作有重要影响,为便于表述,我们称内核处理系统调用、异常、中断时所执行的指令序列为:内核控制路径(Kernel control path)。在最简单的情况下,CPU从第一条指令开始,顺序、不间断的执行内核控制路径,一直到最后一条指令。但是下列事件,会导致CPU交错执行多条内核控制路径:
- 用户态下的进程发起系统调用,陷入内核,内核控制路径发现请求无法立刻得到满足。这是内核控制路径调用Scheduler,选择一个新进程运行。这样,一个内核控制路径尚未完成,另外一个又开始执行。这种情况下,两个路径代表不同的进程执行
- 一个内核路径正在执行时,CPU检测到一个异常(例如访问一个不在RAM中的页)。这样,第一个路径被挂起,CPU转而执行合适的例程。在这个例子中,该例程会负责分配一个新页,并从磁盘读取它的内容,完毕后,第一个控制路径可以恢复执行。在这种情况下,两个控制路径代表同一个进程执行
- 当一个启用了中断的内核路径正在执行时,硬件中断发生,CPU转而执行另外一个路径,处理中断,完毕后,恢复第一个路径的执行。这种情况下,第二个路径不代表任何进程,但是其所消耗的CPU时间,将强行的算在被中断的进程头上
- 在支持抢占式调度的内核中,一个内核路径正在执行,这时一个高优先级的进程加入就绪队列,时钟中断发生后,CPU将代表高优先级进程执行新的路径
所有这些出现交错执行的情况,只要存在共享的数据结构,都牵涉到内核同步。在多CPU的SMP机器上,并发执行的情况更加复杂。
所谓临界区(Critical Regions)是指访问和操控共享数据的代码路径,并发修改共享数据一般是不安全的,可能破坏数据结构。要防止临界区中的并发修改,必须保证代码以原子性的方式来操作——操作一旦开始就会不被打断的执行完毕,就好像是单条CPU指令一样。注意这里的“不被打断”不是指不能被调度程序切换出去,而是指临界区执行完毕之前,任何其它执行序列均不得访问此临界区。
如果多个线程同时访问临界区,一般是一个BUG,这种情况称为竞态条件(Race condition),确保竞态条件不发生称为同步(synchronization)。
即使 i++ 这样简单的操作也会对共享变量i引入竞态条件,完成该操作需要三个指令序列:
- 得到当前(内存)变量i的值,并拷贝到寄存器
- 在寄存器中加法运算
- 将运算结果写回到变量
很多CPU提供了原子的方式来读取、增加并修改变量,可以解决这种简单的并发问题。更复杂的并发场景,必须使用加锁来管理。
伪并发,这种并发不是真正的“同时发生”,而是交叉的执行:
- 抢占和重新调度:用户程序很可能在正处于临界区时非自愿的被抢占,后续获得CPU的进程可能随后进入临界区
- 信号处理:信号处理是异步发生的,其可能和应用程序本身形成竞争
真并发,在启用SMP的情况下,两个进程可能真正的在临界区内同时执行(使用不同的CPU)。尽管和伪并发的原因和含义不同的,但是需要同样的保护。
内核中可能造成并发的原因有:
- 中断:中断几乎可以在任何时刻异步的发生,也就可能随时打断当前正在执行的代码
- 软中断和Tasklet:内核可能在任何时刻唤醒或者调度软中断,从而打断当前正在执行的代码
- 内核抢占:由于内核具有抢占性,所以内核中任何一个任务都可能被另外一个任务抢占
- 睡眠:在内核中运行的进程可能会睡眠,导致唤醒调度程序,使另外一个进程运行
- 对称多处理:两个或者更多CPU同时执行内核代码
- 中断安全(interrupt-safe)代码:在中断处理程序中能够避免并发访问的安全代码
- SMP安全(smp-safe)代码:在对称对处理机器中能够避免并发访问的安全代码
- 内核抢占安全(preempt-safe)代码:在内核抢占时能避免并发访问的安全代码
锁可以用来防止两个线程并发访问临界区,当一个线程对临界区进行标记时,后续到达的线程就无法访问。锁有多种不同的形式,并且加锁的粒度和范围也不相同。Linux实现了几种不同的锁机制,它们的区别主要是当锁不可用时的行为,某些锁简单的进行忙等待(循环轮询锁可用性),其它锁则睡眠直到锁可用为止。
加锁/解锁操作是使用原子操作来实现的,几乎所有的处理器都实现了原子的测试和设置指令,该指令测试一个整数值,如果它为0(代表锁打开)则设置为新值。
锁的使用并不困难,真正的挑战在于发现需要共享的数据和相应的临界区,应当在设计初期就考虑加入锁,而不是发现问题后才想到。
任何可能并发访问的变量都需要保护,下面的代码不会并并发执行:
- 局部自动变量:仅存在于栈中的变量不需要任何保护,因为它只能被当前函数访问
- 只被一个线程访问的全局变量
编写内核代码时,应当思考以下问题:
- 这个数据是否全局的?除了当前线程外,其它线程能不能访问它?
- 这个数据会不会在进程上下文和中断上下文中共享?是不是要在两个中断处理程序中共享?
- 进程在访问数据时可不可能被抢占?被调度的新程序会不会访问同一数据?
- 进程会不会睡眠(阻塞)在某些资源上,如果是,它会让共享数据处于何种状态?
- 如果函数又在另外一个CPU上被调度执行,会发生什么?
死锁的产生需要一定的条件:一个或者更多的执行线程+一个或者多个资源。如果每个线程都在等待其中一个资源,但是所有资源都被占用了,就会导致无限期的等待,从而导致死锁。死锁最简单的例子是自死锁:一个进程尝试获取已经被自己持有的锁。
要避免死锁,应当遵守一些简单的规则:
- 按顺序加锁:使用嵌套锁时,不同程序必须保证以相同的顺序获取锁,这一点非常重要
- 防止发生饥饿:代码的执行是否一定会结束,从而是否锁
- 不要重复请求同一个锁
- 设计尽可能简单的加锁方案
尽管锁的释放顺序和死锁无关,最好是按照获取的反序来释放锁。
术语锁争用(lock contention),用来描述当前正被持有而另外一个线程又尝试获取的锁,如果很多线程在尝试获取锁,则称该锁高度争用,其原因可能是:
- 频繁获取锁
- 长时间持有锁
由于锁的作用是使程序以串行方式访问共享资源,因此其必然会降低系统性能,高度争用的锁会成为系统的性能瓶颈。
可扩容性是对系统可通过硬件扩容的能力的一种度量,对于OS,扩容意味着增加CPU、内存以实现更大的进程并发度,理想情况下,CPU数量翻倍可以使性能翻倍,实际上则不可能达到,扩容后增加的锁开销是一个原因。
2.0+以后内核引入SMP支持,Linux对集群处理器的扩展性大大提高,在2.x早期版本中,一个时刻只能有一个任务在内核中运行,2.2开始取消了这一限制,锁的粒度也渐渐变细,到了2.6+内核加的锁是非常细粒度的,可扩容性很好。
粗粒度的锁可以锁住大块数据,编程简单,锁争用严重时性能差;反之,细粒度的锁锁住小块数据,编程难度大,锁争用严重时性能好(锁争用少见时则浪费资源,加大系统开销)。很多锁在设计初期是粗粒度的,后续因为性能原因而细化——初期的加锁方案应当力求简单。
原子曾被认为是不可再分的粒子,计算机科学借用它表示不可再分的操作。这里的不可再分,是指操作不能被中途打断(调度程序、中断系统导致的打断和这里的打断不是一个含义):两个原子操作不可能同时访问共享资源,原子操作的所有中间结果都符合预期。
内核提供了两组原子操作接口,分别用于对整数、位进行操作。大部分体系结构提供了支持原子操作的简单算术指令,其它体系结构则提供了锁内存总线的指令来支持原子操作(确保其它改变内存的操作不能同时发生)。
能使用原子操作的时候,就不应该使用更复杂的锁机制,因为原子操作的系统开销较小、对cache-line的冲击也更小。
原子性(Atomicity)和顺序性(Ordering)是两个概念,原子性仅仅保证操作不可再分,顺序性要求操作依据预期的顺序发生。顺序性可以通过屏障来保证。
Linux支持原子操作的整数使用类型:
1 2 3 4 |
typedef struct { volatile int counter; } atomic_t; //支持32位整数,由于可移植性的考虑,即使是64bit体系结构,该类型仍然只能是32位 |
原子整数最常见的用途是实现计数器,其相关的操作定义在对应体系结构的 /arch/x86/include/asm/atomic.h 头文件中。这些函数基本都是内联函数,如果函数本身就是原子的(例如大部分体系结构上,读取一个字本身就是原子操作,其执行期间不可能对该字进行写入)则会定义为宏。下面是操作的列表:
函数/宏 | 说明 | ||
ATOMIC_INIT(int i) |
在声明时初始化原子整数为i,举例:
|
||
int atomic_read(atomic_t *v) |
原子的读取整数值,其代码实现:
|
||
void atomic_set(atomic_t *v, int i) | 原子的设置整数值 | ||
void atomic_add(int i, atomic_t *v) | 原子的进行加法运算 | ||
void atomic_sub(int i, atomic_t *v) | 原子的进行减法运算 | ||
void atomic_inc(atomic_t *v) | 原子的进行递增运算 | ||
void atomic_dec(atomic_t *v) | 原子的进行递减运算 | ||
int atomic_sub_and_test(int i, atomic_t *v) | 原子的进行减法运算(v - i)并测试结果是否为0,如果为0则返回true | ||
int atomic_add_negative(int i, atomic_t *v) | 原子的进行加法运算并测试结果是否负数,如果是负数则返回true | ||
int atomic_add_return(int i, atomic_t *v) | 原子操作,并返回结果 | ||
int atomic_sub_return(int i, atomic_t *v) | |||
int atomic_inc_return(int i, atomic_t *v) | |||
int atomic_dec_return(int i, atomic_t *v) | |||
int atomic_dec_and_test(atomic_t *v) | 原子的递减并测试结果是否为0,如果是返回true | ||
int atomic_inc_and_test(atomic_t *v) | 原子的递增并测试结果是否为0,如果是返回true |
由于64位体系结构的越发普及,内核引入了64位整数原子操作的支持:
1 2 3 4 5 |
#ifdef CONFIG_64BIT typedef struct { volatile long counter; } atomic64_t; #endif |
该类型的用法和32位原子整数完全一样。大部分32位体系结构不支持该类型,x86_32除外。
可以按位进行原子操作,相关函数定义在体系结构的 /arch/x86/include/asm/bitops.h 文件中,下面是简单用例:
1 2 3 4 5 6 |
unsigned long word = 0; set_bit( 0, &word ); /* 最低位被原子的设置为0 */ set_bit( 1, &word ); /* 第二位被原子的设置为1 */ printk( "%ul\n", word ); /* 打印3 */ clear_bit( 1, &word ); /* 原子的清零第二位 */ change_bit( 0, &word ); /*原子的翻转第一位*/ |
自旋锁(Spin lock)是内核中最常见的锁,它是一个快速简单的锁实现。自旋锁同时只能被一个执行线程持有,如果出现争用,后续线程会一直忙循环,就好像不停旋转的电机一样;如果没有争用,则会立即获得自旋锁。
自旋锁特别浪费CPU时间,因此不适合长时间持有。自旋锁设计的初衷是用于预期会很快被释放的锁,空转的开销相比起上下文切换更小。自旋锁适用于加锁时间不长且不会睡眠的场景(例如中断处理程序、软中断)中。
对于Per-cpu数据,不需要使用自旋锁来保护,因为其它CPU上运行的内核代码无法访问之。但这类数据的安全性仍然收到内核抢占、中断的影响。
在多处理器机器中,自旋锁在同一时刻只能被一个执行线程持有,因此只有一个线程可能处于临界区内,这为防止并发提供了有效的保护机制。在单处理器机器上,编译的时候并不会加入自旋锁(没有意义),如果禁止内核抢占,那么编译的时候会完全剔除自旋锁。
需要注意:自旋锁是不可重入的,如果尝试获得已经被自己持有的锁,会导致死锁。
自旋锁的实现和体系结构密切相关,头文件为 /include/linux/spinlock.h ,函数定义位于体系结构的 asm/spinlock.c ,下面的代码示例了自旋锁基本的使用方法:
1 2 3 4 |
DEFINE_SPINLOCK( mr_lock ); spin_lock( &mr_lock ); /* critical region ... */ spin_unlock( &mr_lock ); |
自旋锁可以用于中断处理程序中(此处不能使用可能导致休眠的信号量)。如果某个自旋锁会被中断处理程序使用,则任何使用该锁的程序都应该:
- 首先禁用本地(当前CPU)的中断处理
- 然后获取自旋锁
如果不禁用本地中断,当前持有自旋锁的内核代码(例如下半部)可能被中断打断,中断处理程序尝试获取自旋锁时必然自旋(中断处理程序不可睡眠性导致无限期占用CPU自旋是问题的本质所在),但是锁的持有者在中断处理程序执行完毕前不可能运行,这就导致锁永远无法释放,导致死锁。其它处理器上的中断不会妨碍当前处理器上的内核代码释放锁,因此不必禁用。
内核提供了同时操控自旋锁和中断控制的函数:
1 2 3 4 5 |
DEFINE_SPINLOCK( mr_lock ); unsigned long flags; //貌似传值,实质是宏方式使用 spin_lock_irqsave( &mr_lock, flags ); //保存当前中断状态,禁用中断,并获得自旋锁 /* critical region ... */ spin_unlock_irqrestore( &mr_lock, flags ); //是否自旋锁,并恢复中断状态 |
配置选项 CONFIG_DEBUG_SPINLOCK 用于方便自旋锁的调试,激活该选项后内核会检查未初始化的锁、未加锁而尝试解锁。 CONFIG_DEBUG_LOCK_ALLOC 可以进一步全称调试锁。
自旋锁相关的操作如下表:
函数/宏 | 说明 |
spin_lock() | 获取自旋锁 |
spin_lock_irq() | 禁用本地中断并获取自旋锁 |
spin_lock_irqsave() | 保存并禁用本地中断并获取自旋锁 |
spin_unlock() | 释放自旋锁 |
spin_unlock_irq() | 释放自旋锁并启用本地中断处理 |
spin_unlock_irqrestore() | 释放自旋锁并恢复本地中断处理状态 |
spin_lock_init() | 动态初始化自旋锁 |
spin_trylock() | 尝试获取自旋锁,如果失败会立即返回非0而不自旋 |
spin_is_locked() | 如果自旋锁被锁定,则返回非0 |
和下半部配合使用时,必须小心的使用锁机制。函数 spin_lock_bh() 用于获得自旋锁并禁用下半部, spin_unlock_bh() 则进行相反的操作。
由于下半部可以抢占进程上下文(因为下半部可能立即随着中断处理程序执行,而后者必然意味着中断抢占),因此当下半部和进程上下文共享数据时,必须对进程上下文中的共享数据进行保护——加自旋锁的同时需要禁止下半部执行。
由于中断处理程序可以抢占下半部,因此当下半部和中断处理程序共享数据时,必须在加自旋锁的同时禁止中断。
有时候锁的用途可以明确的分为读取、写入两个场景,只要没有写操作,并发的读操作是安全的,写操作则需要完全的互斥。内核专门提供了读写自旋锁,它允许多个执行线程共享的获取读锁而不需自旋。读写自旋锁的用法类似于自旋锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
//静态声明 DEFINE_RWLOCK( mr_rwlock ); //动态创建 struct rwlock_t mr_rwlock; rwlock_init( &mr_rwlock ); //读线程 read_lock( &mr_rwlock ); read_unlock( &mr_rwlock ); //写线程 write_lock( &mr_rwlock ); write_unlock( &mr_lock ); //下面的代码会导致死锁,读写自旋锁不支持读锁向写锁“升级” read_lock( &mr_rwlock ); write_lock( &mr_rwlock ); //写锁不断自旋,等待所有锁持有者(包括自己持有的读锁)释放锁 |
尽管不支持向写锁的升级,但是一个执行线程递归的获取一个读锁是安全的, 该特性经常被用作一种优化手段:如果在中断处理程序中只有读操作,可以使用 read_lock() 而不是 read_lock_irqsave() 进行读保护。但是如果有任何以写的方式使用共享数据的中断,就必须禁止中断,否则可能出现死锁。
需要注意,读写锁对读操作更加照顾,因此大量读操作可能导致写线程处于饥饿状态。
注意这里的信号量和用户空间的Unix System V信号量类似,但是并不等同,根本区别是本节的信号量专用于内核编程。
信号量是一种睡眠锁,如果一个任务尝试获取信号量而不可用,则进入等待队列睡眠而放弃CPU。比起自旋锁,信号量具有更大的开销和更好的CPU利用率。信号量的特点如下:
- 适用于锁被长时间持有的情况:当出现争用时,长时间的自旋不合适,让等待线程睡眠更节约CPU;相反,如果锁持有时间短,那么睡眠、维护等待队列即唤醒的开销可能比锁被占用的全部时间更长
- 只能在进程上下文中使用:中断上下文不允许睡眠
- 持有信号量的同时,可以继续睡眠:其它线程请求信号量时会自动睡眠,不会死锁
- 当请求信号量时,不能持有自旋锁:请求信号量可能会导致睡眠,而持有自旋锁时不能睡眠
与自旋锁不同,信号量允许多个任务持有,支持的使用者数量(usage count)在声明信号量时指定:如果使用者数量为1,称为二值信号量或者互斥信号量;否则称为计数信号量。
类型 struct semaphore 代表信号量,定义在头文件 asm/semaphore.h 中,信号量的实现和体系结构相关。通过下面的方式声明和初始化信号量:
1 2 |
struct semaphore name; sema_init(&name, count); //count为使用者数量 |
使用 static DECLARE_MUTEX(name); 或者 init_MUTEX(sem); 可以快捷的创建/初始化一个互斥信号量。
调用 down_interruptible() 可以尝试请求信号量,如果不可用,当前线程进入 TASK_INTERRUPTIBLE 睡眠状态,如果睡眠期间进程被信号唤醒,该函数会返回 -EINTR 。类似的函数 down() 则导致进程进入 TASK_UNINTERRUPTIBLE 状态。一般前者更合适。如果信号量可用,则可用计数递减。
调用 down_trylock() 可以非阻塞的获取信号量,如果不可用,立即返回非0,否则返回0并获取信号量。
调用 up() 则可以释放一个信号量。
下面的代码示例了如何使用信号量:
1 2 3 4 5 6 7 8 9 10 |
/* 声明使用者数量为1的信号量(互斥信号量) */ static DECLARE_MUTEX( mr_sem ); /* 尝试获取信号量 */ if ( down_interruptible( &mr_sem ) ) { /* 接收到信号,没有获取到信号量 */ } /* 获取到信号量,这里是临界区 */ /* 释放信号量 */ up( &mr_sem ); |
读写信号量的功能类似于读写自旋锁,它声明在 linux/rwsem.h 中,使用 struct rw_semaphore 表示。可以通过下面的方式来创建:
1 2 3 4 |
//静态声明 static DECLARE_RWSEM(name); //动态创建 init_rwsem(struct rw_semaphore *sem); |
读写信号量都是互斥的,也就是说使用者数量必须是1。任何数量的读请求者可以同时持有信号量,而一个写请求者则必须独占信号量:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
static DECLARE_RWSEM( mr_rwsem ); /* 尝试以读方式获取信号量 */ down_read( &mr_rwsem ); /* 临界区(只读操作)*/ /* 释放信号量 */ up_read( &mr_rwsem ); /* 尝试以写方式获取信号量 */ down_write( &mr_rwsem ); /* 临界区(读写操作).. */ /* 释放信号量 */ up_write( &mr_rwsem ); |
读写信号量支持降级: downgrade_write() 可以把写锁变为读锁。
为了使用更加简单的睡眠锁,互斥体(Mutex)被引入内核,其行为与计数为1的信号量类似,但是更简单、高效,也具有更多的限制:
- 任何时候只有一个任务能够持有互斥体
- 互斥体的上锁者必须负责解锁。不能在一个上下文中加锁,而在另外一个上下文中解锁,因此互斥体不适合内核同用户空间复杂的同步场景
- 互斥体不可重入,递归的获取和释放锁是不允许的
- 当持有互斥体时,进程不可以退出
- 互斥体不能被中断处理程序或者下半部获取,即使 mutex_trylock() 也不行
CONFIG_DEBUG_MUTEXES 配置性可以开启互斥体调试,以监控非法使用互斥体的代码。
互斥体相关函数包括:
函数 | 说明 |
mutex_lock(struct mutex *) | 锁定互斥体,如果不可用则睡眠 |
mutex_unlock(struct mutex *) | 解锁互斥体 |
mutex_trylock(struct mutex *) | 尝试锁定,如果不可用返回0,否则返回1 |
mutex_is_locked (struct mutex *) | 判断是否被锁定,如果是返回1 |
如果内核中一个任务需要通知另外一个任务发生了某个特定事件,完成变量(completion variables)是一个简单的方法。一个任务可以在完成变量上等待并睡眠,另外一个任务则在其睡眠期间执行一些工作,这些工作完成后,第二个睡眠唤醒等待中的任务。
完成变量和信号量很类似,它提供后者的简单替代。例如vfork()系统调用在子进程执行或者退出时,使用完成变量来唤醒父进程。
完成变量使用声明在 linux/completion.h 的类型 struct completion 表示,可以用 DECLARE_COMPLETION(mr_comp); 静态的声明。与之相关的函数包括:
函数 | 说明 |
init_completion(struct completion *) | 初始化指定的动态创建的完成变量 |
wait_for_completion(struct completion *) | 等待完成信号的出现 |
complete(struct completion *) | 唤醒任何等待的任务 |
BKL是Linux引入SMP支持后,向全面细粒度加锁机制过渡时期的产物。BKL是一种全局自旋锁。它具有以下特性:
- 持有BKL的任务可以睡眠,但是任务不可被调度时,其持有的锁自动释放;当它恢复调度时,又自动获得
- BKL支持重入,进程可以多次请求一个锁,但是需要相应次数的释放
- BKL只能用于进程上下文中
- BKL会导致整个内核串行化执行
BKL现在已经过时,但是内核代码中还有其踪迹。与之相关的函数包括:
函数 | 说明 |
lock_kernel () | 获取大内核锁 |
unlock_kernel() | 释放大内核锁 |
kernel_locked() | 如果大内核锁当前被持有,返回非0 |
顺序锁是2.6+引入的一种锁机制,它提供一种简单的读写共享数据的机制。顺序锁维护一个序列计数器,当执行写操作时计数器会增加并得到一个锁,在读取数据前、后,分别得到计数器值,如果:
- 计数器值没有变化:说明在读取数据期间,没有启动过写操作
- 如果计数器值为偶数:说明在此刻没有进行中的写操作(因为初值0,获取写锁导致值为奇数,释放则变为偶数)
- 如果计数器值为奇数:说明当前正在进行写操作
可以使用 seqlock_t mr_seq_lock = DEFINE_SEQLOCK( mr_seq_lock ) 定义一个顺序锁,执行写的代码很简单:
1 2 3 4 |
//写代码路径 write_seqlock( &mr_seq_lock ); //获取写锁,增加计数,行为类似于自旋锁 /* 临界区 */ write_sequnlock( &mr_seq_lock ); //释放写锁,增加计数 |
执行读的代码则很不一样:
1 2 3 4 5 6 7 8 9 |
//读代码路径 unsigned long seq; do { seq = read_seqbegin( &mr_seq_lock ); //循环检测,直到seq不是奇数(即当前没有写操作) /* 在这里读取数据 */ } //每次读取数据完毕,需要进行乐观并发控制:检查释放读取了无效数据,即在读取数据期间,数据释放被修改 while ( read_seqretry( &mr_seq_lock, seq ) ); //判断当前计数是否和读操作开始前的seq一致,即没发生修改 |
多个读线程和少数写线程共享一个顺序锁时,它更倾向与写线程,因为只要没有其它任务正在写,写锁总是能成功获得。未决的写操作会导致读线程循环,直到写锁释放。
顺序锁适合这样的场景:
- 共享数据具有多个读请求者,很少的写请求者
- 虽然写请求者很少,还是倾向与写请求,从不让读请求致使写请求饥饿
- 共享数据结构简单,例如是一个简单的结构体甚至整数
内核中使用顺序锁典型的例子是jiffies,该变量存储Linux机器启动到当前时间依赖,流逝的时钟节拍累加数。读写jiffies的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
//读取当前jiffies u64 get_jiffies_64( void ) { unsigned long seq; u64 ret; do { seq = read_seqbegin( &xtime_lock ); ret = jiffies_64; } while ( read_seqretry( &xtime_lock, seq ) ); return ret; } //时钟中断负责写jiffies write_seqlock(&xtime_lock); jiffies_64 += 1; write_sequnlock(&xtime_lock); |
由于内核是抢占性的,内核中的进程在任何时候都可能被其它进程抢占。内核抢占代码使用自旋锁作为禁止抢占区域的标记——如果当前内核代码持有自旋锁,那么对应CPU上的抢占是关闭的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
static inline void spin_lock(spinlock_t *lock) { raw_spin_lock(&lock->rlock); } #define raw_spin_lock(lock) _raw_spin_lock(lock) #define _raw_spin_lock(lock) __LOCK(lock) #define __LOCK(lock) \ do { preempt_disable(); __acquire(lock); (void)(lock); } while (0) //禁用抢占,然后获得锁 static inline void spin_unlock(spinlock_t *lock) { raw_spin_unlock(&lock->rlock); } #define raw_spin_unlock(lock) _raw_spin_unlock(lock) #define _raw_spin_unlock(lock) __UNLOCK(lock) #define __UNLOCK(lock) \ do { preempt_enable(); __release(lock); (void)(lock); } while (0) //启用抢占,然后再释放锁 |
如果自旋锁不具有禁止抢占的特性,会发生什么呢?考虑下面的事件序列(A/B执行路径一致):
- 线程A获得自旋锁,正在CPU0上操控共享数据
- 发生内核抢占,A丢失CPU0
- 线程B获得CPU0,尝试获得自旋锁,它开始自旋,它获得自旋锁之前的时间一直是在浪费CPU
这样,A因为被抢占,其持有自旋锁的总计时间必然延长,导致性能变差。因此个人觉得自旋锁附带禁止抢占,是出于性能优化的考虑。此外,对于单CPU系统,只需要优化掉自旋部分,保留禁用/启用抢占的代码,恰恰可以满足“锁定”需求——禁止抢占导致自旋锁总是可用的,因为不同线程必须串行化执行,没必要自旋。
某些情况下并不需要自旋锁,但仍然需要关闭内核抢占。例如对于Per-cpu变量,如果没有自旋锁保护、同时内核是抢占式的,那么新调度的任务就可能访问同一个变量,这是一种伪并发。同样的,对于单CPU系统,不管是不是Per-cpu变量,前述伪并发问题都存在,区别就是Per-cpu变量不需要自旋锁。此时可以直接调用下面的API来禁止抢占:
1 2 3 4 5 6 7 8 |
preempt_disable(); //增加抢占计数器,从而禁止抢占 /* 抢占被禁止 */ preempt_enable(); //减少抢占计数器,如果为0,则启用抢占,并执行被挂起需要调度的任务 //注意:对于单CPU的自旋锁,加锁、解锁代码已经优化为上面两条语句,因此该代码可专用于保护Per-cpu数据而不必使用更重的自旋锁 //其它函数 preempt_enable_no_resched(); //与上面类似,但是不检查任何挂起的调度 preempt_count(); //返回抢占计数器 |
需要注意的是, preempt_disable() 和 preempt_enable() 调用次数必须一致,才能最终启用抢占。
更简洁的保护Per-cpu数据的方式是使用:
1 2 3 4 |
int cpu; cpu = get_cpu(); //禁用内核抢占,并获得当前CPU /* 操控Per-cpu数据 */ put_cpu(); //恢复内核抢占 |
有时候指令执行顺序问题很重要:
- 当在多处理器之间、硬件设备之间进行同步时,需要明确按代码中指定的顺序进行读内存、写内存操作
- 和硬件交互时,常需要确保一个读操作发生在其它读写之前
- 在多处理器上,可能需要按照写顺序来读取数据
但是,编译器和处理器为了提高效率,可能对读写指令进行重新排序,这是问题复杂化了。幸好,执行指令重排的处理器都提供了机器指令来保障顺序要求,我们也可以指示编译器不在给定点周围的指令序列进行重新排序。这些确保顺序的指令称为屏障(Barriers)。
考虑代码: a = 1; b = 2; ,在某些处理器上,对b的赋值可能发生在对a赋值之前(Happens-bafore)。这是由于编译器、CPU都以为a和b没有任何关系,因此编译器可能在编译期间静态的重新排序,生成和编码时不一样的目标码;CPU也可能在执行期间动态的重新排序,抓取和分发貌似无关的指令以达到最高执行效率。对于 a = 1; b = a; 这样的代码, 重新排序就不会发生,因为这两条指令显然是存在依赖关系的,改变顺序语义就不同。
不管是编译器还是CPU,都不知道其它上下文(进程)中的相关代码,特定条件下指令重排可能导致严重的逻辑错误(例如双重检查锁定问题,其本质就是写地址和写对象操作的指令重排)。
Linux提供了若干内存和编译器屏障函数:
屏障 | 说明 |
rmb() | 读内存屏障,阻止载入(load,即读)操作跨越此屏障重排序,也就是说,此屏障之前的任何load操作都不会重排到屏障后面 |
wmb() | 写内存屏障,阻止存储(store,即写)操作跨越此屏障重排序,也就是说,此屏障之前的任何store操作都不会重排到屏障后面 |
mb() | 同时提供读写内存屏障 |
read_barrier_depends() | 与读内存屏障类似,但是仅仅针对此屏障后面读操作依赖的那些载入,不依赖的load操作仍然可能重排序 |
smp_rmb() | 这几个宏对屏障进行了优化:对于SMP内核,等价于上面的屏障;对于单处理器内核,优化为编译器屏障 |
smp_read_barrier_depends() | |
smp_wmb() | |
smp_mb() | |
barrier() | 防止编译器跨越此屏障进行load、store操作的重排序。前面讨论的内存屏障可以完成barrier()的功能,但是barrier()更加轻量 |
对于不同的体系结构,屏障的实际效果差别很大,例如对于不会打乱存储顺序的体系结构(例如X86), wmb() 不做任何事情。
下面是 mb()和 wmb()的一个示例,假设共享变量初始值 a = 1; b = 2; ,则下面的指令序列:
1 2 3 4 5 6 7 |
/* 线程1 线程2*/ a = 3; mb(); b = 4; c = b; rmb(); d = a; //wmb()、rmb()相当于提示CPU,在继续执行之前,立即提交尚未处理的存储、载入指令 |
如果线程1不使用屏障,那么c可能为4而d却为1(c接受b的新值,而d却接受初值),其原因是,没有内存屏障的情况下,线程1对a和b的赋值顺序可能颠倒。而使用写屏障后,如果b为4则势必a为3。
同样,如果线程2不使用读屏障,那么可能在读取b之前读a,这就无法在c为4的情况下,断定a必然为3。因为重排后a先读取,线程1可能刚执行到第1行;而b被读取时,线程1已经执行完第3行,导致c的值为4。
有些时候可以使用 read_barrier_depends() 代替读屏障:
1 2 3 |
pp = p; read_barrier_depends(); b = *pp; //由于载入*pp依赖于载入p,因此该屏障保证顺序性 |
对于内核来说时间管理非常重要:
- 内核中大量函数是时间驱动的——这些函数要么延迟一段时间执行(推迟的磁盘I/O),要么周期性执行(红黑树平衡性调整)
- 内核需要管理系统的运行时间
- 内核需要管理当前日期时间
内核关心两种时间概念:绝对时间、相对时间,所谓相对时间是指相对于当前时间的延迟。不同场景下可能分别用到这两个时间概念。
系统定时器是一种可编程硬件芯片,它以固定频率产生中断(例如10ms),该中断称为定时器中断。该中断对应的时钟中断程序负责更新系统时间、驱动周期性任务的运行。系统定时器和时钟中断程序是Linux内核管理机制中的中枢。
内核需要硬件配合才能计算和管理时间,这个硬件就是系统定时器。系统定时器以特定频率定期自行触发时钟中断,此频率称为节拍率(Tick rate),可以通过编程指定。
由于节拍率对于内核可知,因此内核就可以根据时钟中断发生的次数来推断系统已运行的时间,并推断墙上时间(Wall time)。
利用时间中断周期性执行的工作包括:
- 更新系统运行时间
- 更新墙上时间
- 在SMP系统中,均衡调度程序中各处理器上的运行队列,维持负载均衡
- 检查当前进程是否用完时间片,如果用完,则重新调度
- 运行timout的动态定时器
- 更新资源消耗和处理器时间的统计值
这些工作中,有的是在每次时钟中断时都执行,有些则是每隔n次时钟中断执行一次。
节拍率通常是静态预处理定义的,对应宏为HZ,内核启动时会根据这一值对硬件进行设置。不同体系结构HZ默认值不一样,x86为100,即每秒100此时钟中断。
调整HZ对系统又重大的影响,高HZ的好处是:
- 更高的时钟中断频率(解析度)可以提高时间驱动事件的解析度。对于100HZ,系统中事件周期最快只能有10ms,不能更短,而调整为1KHZ时则能够短至1ms
- 提高事件驱动事件的准确度。定时任务可能在任何一个时刻超时,而只有在时钟中断发生时,才能执行任务,这意味着最大可能导致1/HZ秒的误差,平均误差则是2/HZ——对于100HZ为±5ms,对于1KHZ则小于1ms
从更细节的角度来说,高HZ的优势包括:
- 内核定时器以更高频率和准确度运行,这将带来大量好处
- 依赖定时值执行的系统调用,例如poll()和select(),可以更精确的运行。这将大幅提高系统性能,因为频繁调用这两个系统调用的应用程序将在等待时钟等待上浪费大量时间
- 对资源消耗、系统运行时间的度量更精确
- 提高进程抢占的精确度,时钟中断程序负责减少当前进程的时间片计数,如果跌至0的同时need_resched被设置,内核就会立刻重新运行调度程序,假设当前进程只剩1ms的时间,而时钟中断在5ms后才发生,那么进程可能多运行5ms。这个问题一般情况下不严重,因为这种误差是针对所有进程的。但是,对于时间非常敏感的“实时”应用,则不能容忍
高HZ获得益处的同时,也要付出相应的代价——HZ越高,中断处理程序占用的CPU时间越多,这导致应用程序可用CPU时间减少,并且频繁的打乱CPU高速缓存。在现代硬件中,高达1000HZ的节拍率不会造成难以接收的负担。
Linux支持动态节拍,当编译内核时指定CONFIG_HZ选项, 那么系统会根据需要动态调整HZ值,这可以减少空闲系统的电量消耗。
全局变量jiffies用来记录自系统启动以来,产生的节拍的总数。该变量用来计算已经启动的时间:
1 2 3 4 5 6 |
//volatile提示不适用CPU缓存,每次都到主存获取此变量的值 extern unsigned long volatile jiffies; //类型必须明确为unsigned long,使用任何其它类型存储均是错误的 //将秒数转换为jffies (seconds * HZ) //将jiffies转换为秒树 (jiffies / HZ) |
将jiffies转换为秒,往往在与用户空间通信时才需要,内核很少关心绝对时间。
在内核jiffies被表示为无符号长整型,这意味着在32位体系结构上它是32位,64位系统上则是64位。在100HZ设置下,32位系统497天后此变量就溢出;在1000HZ下仅49天后就会溢出。如果使用64位jiffes,则不会溢出。
出于兼容性的考虑,内核使用 extern u64 jiffies_64 存储64位jiffies,而变量jiffies和它存放到一个区域,占用它的低32位。这样,老的32位代码就可以和以前一样访问jiffies(统计流逝的时间),而时间管理代码则使用jiffies_64,避免溢出。访问jiffies的代码只能获取低32位,函数 get_jiffies_64() 可以用来读取整个64位的值。在64位体系结构上jiffies_64和jiffies完全等同。
对于32位系统,存在回绕问题,即jiffies(jiffies_64低32位)达到2^32-1后,其值会回复到0 的问题,这会导致计算时间差的代码出现错误:
1 2 3 4 5 6 7 |
unsigned long timeout = jiffies + HZ / 2; /* 半秒后超时 */ /* 完成一些工作 */ /* 判断工作消耗时间是否过大 */ if (timeout > jiffies) { /* 如果jiffies回绕,可能导致判断错误,基本上timeout必然大于jiffies */ } |
因此,直接使用jiffies进行算术运算是不安全的,应当使用下面的宏:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
//当a大于b时返回真 #define time_after(a,b) \ (typec heck(unsigned long, a) && \ typecheck(unsigned long, b) && \ ((long)(b) - (long)(a) < 0)) //这里把无符号数作为有符号数看待,回绕前的大数最高位必然是1,认为是负数,回绕后最高为0,认为是正数 //因此回绕后的值总被认为是大的数,也就是after #define time_before(a,b) time_after(b,a) #define time_after_eq(a,b) \ (typecheck(unsigned long, a) && \ typecheck(unsigned long, b) && \ ((long)(a) - (long)(b) >= 0)) #define time_before_eq(a,b) time_after_eq(b,a) |
很多应用程序依赖于将HZ看做是100,则就导致当内核改变HZ时,用户空间的应用程序出现问题。为此,内核定义了 USER_HZ 来表示用户空间“看到”的节拍率,在x86上它就是100。内核使用函数在HZ和USER_HZ表示的节拍计数之间进行转换:
1 2 3 4 |
//把基于HZ计数的jiffies转换为基于USER_HZ技术的用户空间节拍数 clock_t jiffies_to_clock_t(long x); //类似上面,转换64位jiffies u64 jiffies_64_to_clock_t(u64 x); |
系统定时器是内核定时机制中最重要的角色,其硬件实现有数种,x86主要使用可编程中断时钟(PIT),可以在内核启动时对其进行编程初始化。
除了前面提到的系统定时器以外,系统结构还提供了另外一种方式进行即时——实时时钟(RTC)。RTC可以用来持久化的存放系统时间,即时关机后仍然可以依靠主板上的电池持续计时。RTC最重要的作用是用来初始化 xtime 变量,尽管x86等体系会定期把当前时间回写到RTC
时钟中断的功能可以分为体系结构相关、无关两部分。前者作为系统定时器的中断处理程序注册到内核,它至少需要完成下面的工作:
- 获得 xtime_lock 锁,以便访问 jiffies_64 ,并对墙上时间xtime进行保护
- 必要时,应答或者设置系统时间
- 周期性的使用墙上时间更新实时时钟
- 调用体系结构无关的时钟例程: tick_periodic()
tick_periodic() 是体系结构无关的部分,它完成:
- 递增 jiffies_64 变量
- 更新资源消耗的统计值,必然当前进程的系统时间和用户时间
- 执行已经到期的动态定时器: run_local_timers
- 调用 scheduler_tick() ,该函数负责更新剩余时间片等
- 更新存储在 xtime 变量的墙上时钟
- 计算平均负载值
该函数的代码分析如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
static void tick_periodic( int cpu ) { if ( tick_do_timer_cpu == cpu ) { write_seqlock( &xtime_lock ); //使用序列锁定 /* 跟踪下一个节拍事件 */ tick_next_period = ktime_add( tick_next_period, tick_period ); do_timer( 1 ); write_sequnlock( &xtime_lock ); } /** * user_mode通过读取当前寄存器来判断处于用户空间还是内核空间 * 这并不准确,因为: * 1、在一个节拍内,进程可以多次出入内核 * 2、在一个节拍内,该进程不一定真正完全占有CPU * 不幸的是,内核只能支持到这一程度 */ update_process_times( user_mode( get_irq_regs() ) ); profile_tick( CPU_PROFILING ); } void do_timer( unsigned long ticks ) { jiffies_64 += ticks; //更新jiffies update_wall_time(); //更新墙上时间 calc_global_load(); //计算平均负载统计值 } //更新各种处理时间 void update_process_times( int user_tick ) { struct task_struct *p = current; int cpu = smp_processor_id(); account_process_tick( p, user_tick ); run_local_timers(); //运行本地的动态定时器 rcu_check_callbacks( cpu, user_tick ); printk_tick(); perf_event_do_pending(); scheduler_tick(); run_posix_cpu_timers( p ); } /* * 计算一个节拍的CPU时间 * @p: CPU时间统计到哪个进程头上 * @user_tick: 只是这是用户节拍还是系统节拍 */ void account_process_tick(struct task_struct *p, int user_tick) { cputime_t one_jiffy_scaled = cputime_to_scaled(cputime_one_jiffy); struct rq *rq = this_rq(); if (user_tick) //计算为用户空间的时间 account_user_time(p, cputime_one_jiffy, one_jiffy_scaled); else if ((p != rq->idle) || (irq_count() != HARDIRQ_OFFSET)) //计算为内核空间的时间 account_system_time(p, HARDIRQ_OFFSET, cputime_one_jiffy, one_jiffy_scaled); else //位于内核空间,运行idle线程,计算为系统空闲时间 account_idle_time(cputime_one_jiffy); } /** * 减少当前进程的时间片计数值,必要时设置need_resched */ void scheduler_tick(void) { int cpu = smp_processor_id(); struct rq *rq = cpu_rq(cpu); struct task_struct *curr = rq->curr; sched_clock_tick(); raw_spin_lock(&rq->lock); update_rq_clock(rq); update_cpu_load(rq); //更新CPU负载统计信息 curr->sched_class->task_tick(rq, curr, 0); raw_spin_unlock(&rq->lock); perf_event_task_tick(curr); #ifdef CONFIG_SMP rq->idle_at_tick = idle_cpu(cpu); trigger_load_balance(rq, cpu); //在SMP上负责平衡运行队列 #endif } |
当天的真实时间定义在 kernel/time/timekeeping.c 中: struct timespec xtime; ,其类型定义在:
1 2 3 4 |
struct timespec { __kernel_time_t tv_sec; /* 从19700101到现在的秒数 */ long tv_nsec; /* 从上一秒开始流逝的纳秒数 */ }; |
读写此变量需要获得序列锁 xtime_lock 进行写锁定:
1 2 3 |
write_seqlock(&xtime_lock); /* 在这里更新时间 */ write_sequnlock(&xtime_lock); |
读取此变量时则需要获得读锁:
1 2 3 4 5 6 7 8 9 10 |
do { unsigned long lost; seq = read_seqbegin(&xtime_lock); //获得读锁 usec = timer->get_offset(); lost = jiffies - wall_jiffies; if (lost) usec += lost * (1000000 / HZ); sec = xtime.tv_sec; usec += (xtime.tv_nsec / 1000); }while (read_seqretry(&xtime_lock, seq)); //如果乐观并发失败,重试 |
从用户空间获得墙上时间,需要调用 gettimeofday() ,它对应系统调用 sys_gettimeofday() 。
除了更新墙上时间以外,内核本身很少使用xtime变量,文件系统代码是一个例外,因为它需要存储多种时间戳在inode中。
又称内核定时器(kernel timers),或者简称定时器,是内核能够管理时间流逝的关键。内核常常需要在一定时间后执行特定函数,内核定时器恰恰满足需要。
定时器由下面的结构表示:
1 2 3 4 5 6 7 8 |
struct timer_list { struct list_head entry; /* 定时器链表的入口 */ unsigned long expires; /* jiffies到达多少时超时 */ void (*function)( unsigned long ); /* 定时器处理函数 */ unsigned long data; /* 处理函数入参 */ struct tvec_t_base_s *base; /* 内部字段,不需要使用 */ }; |
下面的代码例示了如何使用定时器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
//定义一个定时器 struct timer_list my_timer; //初始化 init_timer(&my_timer); my_timer.expires = jiffies + delay; my_timer.data = 0; my_timer.function = my_function; //激活定时器 add_timer(&my_timer); //修改超时时间 mod_timer(&my_timer, jiffies + new_delay); //删除定时器(必须在超时前) del_timer(&my_timer); /** * 注意删除定时器存在潜在的竞态条件:该函数返回后,只能保证定时器不会在将来执行 * 但是在SMP上,其它CPU可能正在运行定时器中断,因此删除定时器时可能需要等待 * 其它处理器上的定时器处理程序都退出。 * 在几乎全部场景下,都应该使用下面的函数代替del_timer */ del_timer_sync(&my_timer); //注意该函数可能休眠,不能在中断上下文使用 //任何使用不要尝试用下面的代码代替mod_timer,在SMP上不安全 del_timer(my_timer);//此时其它CPU可能正在执行定时器… my_timer->expires = jiffies + new_delay; add_timer(my_timer); |
需要注意的是,定时器不能用来执行硬实时的任务,因为它可能在超时后的下一个节拍时才执行。
由于定时器代码和当前代码是异步的,因此存在潜在的竞争条件,需要注意同步。
内核在时钟中断发生后执行定时器,定时器作为软中断在下半部上下文(bottom-half context)中执行。其核心逻辑位于:
1 2 3 4 5 6 7 8 9 10 11 |
/* * Called by the local, per-CPU timer interrupt on SMP. */ void run_local_timers( void ) { hrtimer_run_queues(); //执行定时器软中断 //所有超时的定时器将在本处理器上执行 raise_softirq( TIMER_SOFTIRQ ); softlockup_tick(); } |
为避免遍历定时器列表导致的耗时,内核将定时器划分为5组,定时器超时时间接近时,定时器将随组一起下移。
除了使用定时器和下半部机制以外,内核代码(特别是驱动)还需要更特殊的手段延迟执行,这种延迟时间往往极短。例如,设置网卡的以太网模式可能需要2ms,因此设置网卡速度后,驱动程序需要等待至少2ms才能继续运行。
内核提供了若干延迟执行的方法,其中某些会挂起CPU,防止它执行任何其它工作;另外一些则不会挂起CPU,也就不能确保延迟代码能够在特定延迟后执行。
最简单(也最不理想)的延迟方式,忙等待就是空转CPU,此方法只能在需要延迟的时间是节拍的整数倍或者精确度要求不高时使用:
1 2 3 4 5 6 7 8 |
unsigned long timeout = jiffies + 10; /* 10个节拍 */ //由于jiffies是volatile变量,因此循环中每次都会去主存加载新值 while (time_before(jiffies, timeout)) cond_resched(); //单纯的忙等待很蹩脚,因此可以调用cond_resched() //该函数可以调度一个新程序运行,但是仅仅在need_resched被设置后才会调度 //也就是说,只有存在一个“更重要的”任务时,才会进行调度 //由于该方法需要调用调度程序,因而不能在中断上下文中执行 |
有时内核代码(通常是驱动)需要更短的、精确的延迟,这个延迟往往小于节拍率,这是任何基于jiffies的方法都不可用了。幸好内核提供了可以处理毫秒、微秒(1/1000ms)、纳秒(1/1000us)的函数:
1 2 3 |
void udelay(unsigned long usecs);//延迟指定微秒 void ndelay(unsigned long nsecs);//延迟指定纳秒 void mdelay(unsigned long msecs);//延迟指定毫秒 |
这些方法的原理是:内核知道CPU的运行速度——指定时间内能够执行的忙循环的次数 。这一速度称为BogoMIPS(伪的每秒百万指令数),存放在变量 loops_per_jiffy 中,通过文件 /proc/cpuinfo 可以读到bogomips。
内核就是利用 loops_per_jiffy 来精确的判断需要多少次循环,进而实现超短延迟执行。
这些函数不应当在长延迟中使用。
该函数可以让任务睡眠若干节拍,然后在投入运行, 注意它不能保证时间精度,用法如下:
1 2 3 4 |
/* 设置任务为不可中断睡眠 */ set_current_state( TASK_INTERRUPTIBLE ); /* 导致当前任何小睡s秒,然后唤醒 */ schedule_timeout( s * HZ ); |
Leave a Reply