Linux的中断下半部机制的对比

本文择选自几篇博文,对Linux的几种中断下半部机制进行对比。

转自以下博文:

Linux有以下几种下半部机制:

  • 软中断
  • tasklet
  • workqueue
  • threaded irq

中断服务程序一般都是在中断请求关闭的条件下执行的,以避免嵌套而使中断控制复杂化。但是,中断是一个随机事件,它随时会到来,如果关中断的时间太长,CPU就不能及时响应其他的中断请求,从而造成中断的丢失。因此,Linux内核的目标就是尽可能快的处理完中断请求,尽其所能把更多的处理向后推迟。例如,假设一个数据块已经达到了网线,当中断控制器接受到这个中断请求信号时,Linux内核只是简单地标志数据到来了,然后让处理器恢复到它以前运行的状态,其余的处理稍后再进行(如把数据移入一个缓冲区,接受数据的进程就可以在缓冲区找到数据)。因此,内核把中断处理分为两部分:上半部(tophalf)和下半部(bottomhalf),上半部(就是中断服务程序)内核立即执行,而下半部(就是一些内核函数)留着稍后处理。

首先,一个快速的“上半部”来处理硬件发出的请求,它必须在一个新的中断产生之前终止。通常,除了在设备和一些内存缓冲区(如果你的设备用到了DMA,就不止这些)之间移动或传送数据,确定硬件是否处于健全的状态之外,这一部分做的工作很少。

下半部运行时是允许中断请求的,而上半部运行时是关中断的,这是二者之间的主要区别。

但是,内核到底什时候执行下半部,以何种方式组织下半部?

中断处理的tasklet(小任务)机制

小任务是指对要推迟执行的函数进行组织的一种机制

其数据结构为tasklet_struct,每个结构代表一个独立的小任务,其定义如下:

struct  tasklet_struct {
struct tasklet_struct *next;         /*指向链表中的下一个结构*/
          unsigned long state;                /* 小任务的状态*/
          atomic_t count;        /* 引用计数器*/
          void(*func) (unsigned long);                /* 要调用的函数*/
          unsigned long data;                 /* 传递给函数的参数*/
};

结构中的func域就是下半部中要推迟执行的函数,data是它唯一的参数。State域的取值为TASKLET_STATE_SCHED或TASKLET_STATE_RUN。TASKLET_STATE_SCHED表示小任务已被调度,正准备投入运行,TASKLET_STATE_RUN表示小任务正在运行。TASKLET_STATE_RUN只有在多处理器系统上才使用,单处理器系统什么时候都清楚一个小任务是不是正在运行(它要么就是当前正在执行的代码,要么不是)。Count域是小任务的引用计数器。如果它不为0,则小任务被禁止,不允许执行;只有当它为零,小任务才被激活,并且在被设置为挂起时,小任务才能够执行。

1. 声明和使用小任务大多数情况下,为了控制一个寻常的硬件设备,小任务机制是实现下半部的最佳选择。小任务可以动态创建,使用方便,执行起来也比较快。
我们既可以静态地创建小任务,也可以动态地创建它。选择那种方式取决于到底是想要对小任务进行直接引用还是一个间接引用。如果准备静态地创建一个小任务(也就是对它直接引用),使用下面两个宏中的一个:

DECLARE_TASKLET(name,func, data)
DECLARE_TASKLET_DISABLED(name,func, data)

这两个宏都能根据给定的名字静态地创建一个tasklet_struct结构。当该小任务被调度以后,给定的函数func会被执行,它的参数由data给出。这两个宏之间的区别在于引用计数器的初始值设置不同。第一个宏把创建的小任务的引用计数器设置为0,因此,该小任务处于激活状态。另一个把引用计数器设置为1,所以该小任务处于禁止状态。例如:

DECLARE_TASKLET(my_tasklet,my_tasklet_handler, dev);
这行代码其实等价于
structtasklet_struct my_tasklet = { NULL, 0, ATOMIC_INIT(0), tasklet_handler,dev};

这样就创建了一个名为my_tasklet的小任务,其处理程序为tasklet_handler,并且已被激活。当处理程序被调用的时候,dev就会被传递给它。

2. 编写自己的小任务处理程序小任务处理程序必须符合如下的函数类型:

void tasklet_handler(unsigned long data)

由于小任务不能睡眠,因此不能在小任务中使用信号量或者其它产生阻塞的函数。但是小任务运行时可以响应中断。

3. 调度自己的小任务通过调用tasklet_schedule()函数并传递给它相应的tasklt_struct指针,该小任务就会被调度以便适当的时候执行:

tasklet_schedule(&my_tasklet);        /*把my_tasklet标记为挂起 */

在小任务被调度以后,只要有机会它就会尽可能早的运行。在它还没有得到运行机会之前,如果一个相同的小任务又被调度了,那么它仍然只会运行一次

可以调用tasklet_disable()函数来禁止某个指定的小任务。如果该小任务当前正在执行,这个函数会等到它执行完毕再返回。调用tasklet_enable()函数可以激活一个小任务,如果希望把以DECLARE_TASKLET_DISABLED()创建的小任务激活,也得调用这个函数,如:

tasklet_disable(&my_tasklet);        /*小任务现在被禁止,这个小任务不能运行*/tasklet_enable(&my_tasklet);        /*  小任务现在被激活*/

也可以调用tasklet_kill()函数从挂起的队列中去掉一个小任务。该函数的参数是一个指向某个小任务的tasklet_struct的长指针。在小任务重新调度它自身的时候,从挂起的队列中移去已调度的小任务会很有用。这个函数首先等待该小任务执行完毕,然后再将它移去。

中断处理的工作队列机制

工作队列(work queue)是另外一种将工作推后执行的形式,它和前面讨论的tasklet有所不同。工作队列可以把工作推后,交由一个内核线程去执行,也就是说,这个下半部分可以在进程上下文中执行。这样,通过工作队列执行的代码能占尽进程上下文的所有优势。最重要的就是工作队列允许被重新调度甚至是睡眠。

那么,什么情况下使用工作队列,什么情况下使用tasklet。如果推后执行的任务需要睡眠,那么就选择工作队列。如果推后执行的任务不需要睡眠,那么就选择tasklet。另外,如果需要用一个可以重新调度的实体来执行你的下半部处理,也应该使用工作队列。它是唯一能在进程上下文运行的下半部实现的机制,也只有它才可以睡眠。这意味着在需要获得大量的内存时、在需要获取信号量时,在需要执行阻塞式的I/O操作时,它都会非常有用。如果不需要用一个内核线程来推后执行工作,那么就考虑使用tasklet。

1.工作、工作队列和工作者线程

如前所述,我们把推后执行的任务叫做工作(work),描述它的数据结构为work_struct,这些工作以队列结构组织成工作队列(workqueue),其数据结构为workqueue_struct,而工作线程就是负责执行工作队列中的工作。系统默认的工作者线程为events,自己也可以创建自己的工作者线程。

工作用<linux/workqueue.h>中定义的work_struct结构表示:

struct work_struct{
unsigned long pending; /* 这个工作正在等待处理吗?*/
struct list_head entry; /* 连接所有工作的链表 */
void (*func) (void *); /* 要执行的函数 */
void *data; /* 传递给函数的参数 */
void *wq_data; /* 内部使用 */
struct timer_list timer; /* 延迟的工作队列所用到的定时器 */
};

这些结构被连接成链表。当一个工作者线程被唤醒时,它会执行它的链表上的所有工作。工作被执行完毕,它就将相应的work_struct对象从链表上移去。当链表上不再有对象的时候,它就会继续休眠。

2.创建推后的工作

要使用工作队列,首先要做的是创建一些需要推后完成的工作。可以通过DECLARE_WORK在编译时静态地建该结构:

DECLARE_WORK(name, void (*func) (void *), void *data);

这样就会静态地创建一个名为name,待执行函数为func,参数为data的work_struct结构。

同样,也可以在运行时通过指针创建一个工作:

INIT_WORK(struct work_struct *work, woid(*func) (void *), void *data);

会动态地初始化一个由work指向的工作。

3.工作队列中待执行的函数

工作队列待执行的函数原型是:

void work_handler(void *data)

这个函数会由一个工作者线程执行,因此,函数会运行在进程上下文中。默认情况下,允许响应中断,并且不持有任何锁。如果需要,函数可以睡眠。需要注意的是,尽管该函数运行在进程上下文中,但它不能访问用户空间,因为内核线程在用户空间没有相关的内存映射。通常在系统调用发生时,内核会代表用户空间的进程运行,此时它才能访问用户空间,也只有在此时它才会映射用户空间的内存。

4.工作队列中待执行的函数

现在工作已经被创建,我们可以调度它了。想要把给定工作的待处理函数提交给缺省的events工作线程,只需调用

schedule_work(&work);

work马上就会被调度,一旦其所在的处理器上的工作者线程被唤醒,它就会被执行。

有时候并不希望工作马上就被执行,而是希望它经过一段延迟以后再执行。在这种情况下,可以调度它在指定的时间执行:

schedule_delayed_work(&work, delay);

这时,&work指向的work_struct直到delay指定的时钟节拍用完以后才会执行。

可以在中断服务函数中调用schedule_work(&yd_sync_work_queue);

现在工作已经被创建,我们可以调度它了。想要把给定工作的待处理函数提交给缺省的events工作线程,只需调用schedule_work(&work);

work马上就会被调度,一旦其所在的处理器上的工作者线程被唤醒,它就会被执行。

有时候并不希望工作马上就被执行,而是希望它经过一段延迟以后再执行。在这种情况下,可以调度它在指定的时间执行:

schedule_delayed_work(&work,delay);

这时,&work指向的work_struct直到delay指定的时钟节拍用完以后才会执行。

中断线程(threaded_irq)

前面讲的硬中断,它是外设中断处理中必不可少的一部分。Softirq和tasklet虽然不会禁用中断,提高了系统对中断的响应性,但是softirq的执行优先级还是比进程的优先级高,有些确实不那么重要的任务其实可以放到进程里执行,和普通进程共同竞争CPU。而且软中断里不能调用会阻塞、休眠的函数,这对软中断函数的编程是很不利的,所以综合各种因素,我们需要把中断处理任务中的与硬件无关有不太紧急的部分放到进程里面来做。为此内核开发了两种方法,中断线程和工作队列。

我们这节先讲中断线程,其接口如下:

/include/linux/interrupt.h

extern int __must_check
request_threaded_irq(unsigned int irq, irq_handler_t handler,
         irq_handler_t thread_fn,
         unsigned long flags, const char *name, void *dev);

如果我们要为某个外设注册中断处理程序,可以使用这个接口。其中handler是硬中断,是处理与硬件密切相关的事物。其处理完成后,可以把接收到的数据、要继续处理的事情放到某个位置,然后返回是否需要唤醒对应的中断线程。如果需要的话,系统会唤醒其对应的中断线程来继续处理任务,这个线程的主函数就是第三个参数thread_fn。

linux中断线程化(threaded irq)和工作队列(work queue)的异同

两者差异:

1、调度优先级不同:

  • threaded irq handler所在的进程(内核线程),调度类别是SCHED_FIFO,是实时内核线程。
  • workqueue所依赖的线程池创建的kworker线程调度类别是SCHED_NORMAL,是普通内核线程。

2、多核并发效率不同:

  • workqueue机制是内核启动时会为每个CPU创建几个不同优先级的kworker(worker_thread)内核线程,用以集中处理各种中断的下半部的work。
  • 新技术threaded irq,为每一个中断都创建一个内核线程;多个中断的内核线程可以分配到多个CPU上执行。
  • 所以多个不同设备中断的work都会由同一个kworker线程来处理,在多CPU系统中并发效率不如threaded irq。

两者相同:

都是可抢占,可调度,可睡眠的内核线程,是内核态的线程上下文。(SCHED_FIFO可以被SCHED_DEADLINE抢占;threaded irq handler也可以被更高优先级的SCHED_FIFO实时线程抢占)

Linux下的内核线程threaded irq机制分析与应用

threaded irq的引入
我们知道,工作队列可以用来处理中断的一些耗时的事情,但是他又一个缺点,就是如果有多个事件同时放入工作队列中,如果前面的事件处理得太久,那么就会影响到后面的事件的执行,因为内核线程事一个个依次执行的,如果一个事件真的非常耗时间,那么我们可以用 threaded irq来处理,大概意思就是中断上半部不变,中断的下半部用一个内核线程来替代tasklet,这个内核线程专门负责来执行某一个中断。

相应的头文件在kernel/irq/manage.h

使用哪一个中断函数
通过下面的这个函数替代掉了我们一直用的request_irq函数,

    //err = request_irq(gpio_keys_100ask[i].irq, gpio_key_isr, IRQF_TRIGGER_RISING | IRQF_TRIGGER_FALLING, "100ask_gpio_key", &gpio_keys_100ask[i]);
    err = request_threaded_irq(gpio_keys_100ask[i].irq, gpio_key_isr, gpio_key_thread_func, IRQF_TRIGGER_RISING | IRQF_TRIGGER_FALLING, "100ask_gpio_key", &gpio_keys_100ask[i]);

内核机制实现
代码追踪图

调用该函数时,也会同时创建一个内核线程

编程要点分析
① 如果不提供上半部中断函数内核会提供默认的上半部处理函数:irq_default_primary_handler,它是直接返回IRQ_WAKE_THREAD。
② 如果提供的话
返回值必须是:IRQ_WAKE_THREAD。
在thread_fn中,如果中断被正确处理了,应该返回IRQ_HANDLED。
其实我们只需要实现这个函数的下半部函数就好了,内部机制也可以不用我们去理解,只用知道单独使用一个内核线程去管理这个中断