一 、Tasklets 机制基础知识点
1、Taklets 机制概念
Tasklets 机制是linux中断处理机制中的软中断延迟机制。通常用于减少中断处理的时间,将本应该是在中断服务程序中完成的任务转化成软中断完成。
为了最大程度的避免中断处理时间过长而导致中断丢失,有时候我们需要把一些在中断处理中不是非常紧急的任务放在后面执行,而让中断处理程序尽快返回。在老版本的 linux 中通常将中断处理分为 top half handler 、 bottom half handler 。利用 top half handler 处理中断必须处理的任务,而 bottom half handler 处理不是太紧急的任务。
但是 linux2.6 以后的 linux 采取了另外一种机制,就是软中断来代替 bottom half handler 的处理。而 tasklet 机制正是利用软中断来完成对驱动 bottom half 的处理。 Linux2.6 中软中断通常只有固定的几种: HI_SOFTIRQ( 高优先级的 tasklet ,一种特殊的 tasklet) 、 TIMER_SOFTIRQ (定时器)、 NET_TX_SOFTIRQ (网口发送)、 NET_RX_SOFTIRQ (网口接收) 、 BLOCK_SOFTIRQ (块设备)、 TASKLET_SOFTIRQ (普通 tasklet )。当然也可以通过直接修改内核自己加入自己的软中断,但是一般来说这是不合理的,软中断的优先级比较高,如果不是在内核处理频繁的任务不建议使用。通常驱动用户使用 tasklet 足够了。
机制流程:当linux接收到硬件中断之后,通过 tasklet 函数来设定软中断被执行的优先程度从而导致软中断处理函数被优先执行的差异性。
特点:tasklet的优先级别较低,而且中断处理过程中可以被打断。但被打断之后,还能进行自我恢复,断点续运行。
2、Tasklets 解决什么问题?
a -- tasklet是I/O驱动程序中实现可延迟函数的首选方法;
b -- tasklet和工作队列是延期执行工作的机制,其实现基于软中断,但他们更易于使用,因而更适合与设备驱动程序...tasklet是“小进程”,执行一些迷你任务,对这些人物使用全功能进程可能比较浪费。
c -- tasklet是并行可执行(但是是锁密集型的)软件中断和旧下半区的一种混合体,这里既谈不上并行性,也谈不上性能。引入tasklet是为了替代原来的下半区。
软中断是将操作推迟到未来时刻执行的最有效的方法。但该延期机制处理起来非常复杂。因为多个处理器可以同时且独立的处理软中断,同一个软中断的处理程序可以在几个CPU上同时运行。对软中断的效率来说,这是一个关键,多处理器系统上的网络实现显然受惠于此。但处理程序的设计必须是完全可重入且线程安全的。另外,临界区必须用自旋锁保护(或其他IPC机制),而这需要大量审慎的考虑。
我自己的理解,由于软中断以ksoftirqd的形式与用户进程共同调度,这将关系到OS整体的性能,因此软中断在Linux内核中也仅仅就几个(网络、时钟、调度以及Tasklet等),在内核编译时确定。软中断这种方法显然不是面向硬件驱动的,而是驱动更上一层:不关心如何从具体的网卡接收数据包,但是从所有的网卡接收的数据包都要经过内核协议栈的处理。而且软中断比较“硬”——数量固定、编译时确定、操作函数必须可重入、需要慎重考虑锁的问题,不适合驱动直接调用,因此Linux内核为驱动直接提供了一种使用软中断的方法,就是tasklet。
软中断和 tasklet 的关系如下图:
上图可以看出, ksoftirqd 是一个后台运行的内核线程,它会周期的遍历软中断的向量列表,如果发现哪个软中断向量被挂起了( pend ),就执行对应的处理函数,对于 tasklet 来说,此处理函数就是 tasklet_action ,这个处理函数在系统启动时初始化软中断的就挂接了。Tasklet_action 函数,遍历一个全局的 tasklet_vec 链表(此链表对于 SMP 系统是每个 CPU 都有一个),此链表中的元素为 tasklet_struct 。下面将介绍各个函数
二、tasklet数据结构
tasklet通过软中断实现,软中断中有两种类型属于tasklet,分别是级别最高的HI_SOFTIRQ和TASKLET_SOFTIRQ。
Linux内核采用两个PER_CPU的数组tasklet_vec[]和tasklet_hi_vec[]维护系统种的所有tasklet(kernel/softirq.c),分别维护TASKLET_SOFTIRQ级别和HI_SOFTIRQ级别的tasklet:
- struct tasklet_head
- {
- struct tasklet_struct *head;
- struct tasklet_struct *tail;
- };
- static DEFINE_PER_CPU(struct tasklet_head, tasklet_vec);
- static DEFINE_PER_CPU(struct tasklet_head, tasklet_hi_vec);
tasklet的核心结构体如下(include/linux/interrupt.h):
- struct tasklet_struct
- {
- struct tasklet_struct *next;
- unsigned long state;
- atomic_t count;
- void (*func)(unsigned long);
- unsigned long data;
- };
各成员的含义如下:
a -- next指针:指向下一个tasklet的指针。
b -- state:定义了这个tasklet的当前状态。这一个32位的无符号长整数,当前只使用了bit[1]和bit[0]两个状态位。其中,bit[1]=1表示这个tasklet当前正在某个CPU上被执行,它仅对SMP系统才有意义,其作用就是为了防止多个CPU同时执行一个tasklet的情形出现;bit[0]=1表示这个tasklet已经被调度去等待执行了。对这两个状态位的宏定义如下所示(interrupt.h)
- enum
- {
- TASKLET_STATE_SCHED,
- TASKLET_STATE_RUN
- };
TASKLET_STATE_SCHED置位表示已经被调度(挂起),也意味着tasklet描述符被插入到了tasklet_vec和tasklet_hi_vec数组的其中一个链表中,可以被执行。TASKLET_STATE_RUN置位表示该tasklet正在某个CPU上执行,单个处理器系统上并不校验该标志,因为没必要检查特定的tasklet是否正在运行。
c -- 原子计数count:对这个tasklet的引用计数值。NOTE!只有当count等于0时,tasklet代码段才能执行,也即此时tasklet是被使能的;如果count非零,则这个tasklet是被禁止的。任何想要执行一个tasklet代码段的人都首先必须先检查其count成员是否为0。
d -- 函数指针func:指向以函数形式表现的可执行tasklet代码段。
e -- data:函数func的参数。这是一个32位的无符号整数,其具体含义可供func函数自行解释,比如将其解释成一个指向某个用户自定义数据结构的地址值。
三、tasklet操作接口
tasklet对驱动开放的常用操作包括:
a -- 初始化,tasklet_init(),初始化一个tasklet描述符。
b -- 调度,tasklet_schedule()和tasklet_hi_schedule(),将taslet置位TASKLET_STATE_SCHED,并尝试激活所在的软中断。
c -- 禁用/启动,tasklet_disable_nosync()、tasklet_disable()、task_enable(),通过count计数器实现。
d -- 执行,tasklet_action()和tasklet_hi_action(),具体的执行软中断。
e -- 杀死,tasklet_kill()
即驱动程序在初始化时,通过函数task_init建立一个tasklet,然后调用函数tasklet_schedule将这个tasklet放在 tasklet_vec链表的头部,并唤醒后台线程ksoftirqd。当后台线程ksoftirqd运行调用__do_softirq时,会执行在中断向量表softirq_vec里中断号TASKLET_SOFTIRQ对应的tasklet_action函数,然后tasklet_action遍历 tasklet_vec链表,调用每个tasklet的函数完成软中断操作。
1、tasklet_int()函数实现如下(kernel/softirq.c)
用来初始化一个指定的tasklet描述符
- void tasklet_init(struct tasklet_struct *t,void (*func)(unsigned long), unsigned long data)
- {
- t->next = NULL;
- t->state = 0;
- atomic_set(&t->count, 0);
- t->func = func;
- t->data = data;
- }
2、tasklet_schedule()函数
与tasklet_hi_schedule()函数的实现很类似,这里只列tasklet_schedule()函数的实现(kernel/softirq.c),都挺明白就不描述了:
- static inline void tasklet_schedule(struct tasklet_struct *t)
- {
- if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
- __tasklet_schedule(t);
- }
- void __tasklet_schedule(struct tasklet_struct *t)
- {
- unsigned long flags;
- local_irq_save(flags);
- t->next = NULL;
- *__this_cpu_read(tasklet_vec.tail) = t;
- __this_cpu_write(tasklet_vec.tail, &(t->next));
- raise_softirq_irqoff(TASKLET_SOFTIRQ);
- local_irq_restore(flags);
- }
该函数的参数t指向要在当前CPU上被执行的tasklet。对该函数的NOTE如下:
a -- 调用test_and_set_bit()函数将待调度的tasklet的state成员变量的bit[0]位(也即TASKLET_STATE_SCHED位)设置为1,该函数同时还返回TASKLET_STATE_SCHED位的原有值。因此如果bit[0]为的原有值已经为1,那就说明这个tasklet已经被调度到另一个CPU上去等待执行了。由于一个tasklet在某一个时刻只能由一个CPU来执行,因此tasklet_schedule()函数什么也不做就直接返回了。否则,就继续下面的调度操作。
b -- 首先,调用local_irq_save()函数来关闭当前CPU的中断,以保证下面的步骤在当前CPU上原子地被执行。
c -- 然后,将待调度的tasklet添加到当前CPU对应的tasklet队列的首部。
d -- 接着,调用__cpu_raise_softirq()函数在当前CPU上触发软中断请求TASKLET_SOFTIRQ。
e -- 最后,调用local_irq_restore()函数来开当前CPU的中断。
3、tasklet_disable()函数、task_enable()函数以及tasklet_disable_nosync()函数(include/linux/interrupt.h)
使能与禁止操作往往总是成对地被调用的
- static inline void tasklet_disable_nosync(struct tasklet_struct *t)
- {
- atomic_inc(&t->count);
- smp_mb__after_atomic_inc();
- }
- static inline void tasklet_disable(struct tasklet_struct *t)
- {
- tasklet_disable_nosync(t);
- tasklet_unlock_wait(t);
- smp_mb();
- }
- static inline void tasklet_enable(struct tasklet_struct *t)
- {
- smp_mb__before_atomic_dec();
- atomic_dec(&t->count);
- }
4、tasklet_action()函数在softirq_init()函数中被调用:
- void __init softirq_init(void)
- {
- ...
- open_softirq(TASKLET_SOFTIRQ, tasklet_action);
- open_softirq(HI_SOFTIRQ, tasklet_hi_action);
- }
tasklet_action()函数
- static void tasklet_action(struct softirq_action *a)
- {
- struct tasklet_struct *list;
- local_irq_disable();
- list = __this_cpu_read(tasklet_vec.head);
- __this_cpu_write(tasklet_vec.head, NULL);
- __this_cpu_write(tasklet_vec.tail, &__get_cpu_var(tasklet_vec).head);
- local_irq_enable();
- while (list)
- {
- struct tasklet_struct *t = list;
- list = list->next;
- if (tasklet_trylock(t))
- {
- if (!atomic_read(&t->count))
- {
- if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state))
- BUG();
- t->func(t->data);
- tasklet_unlock(t);
- continue;
- }
- tasklet_unlock(t);
- }
- local_irq_disable();
- t->next = NULL;
- *__this_cpu_read(tasklet_vec.tail) = t;
- __this_cpu_write(tasklet_vec.tail, &(t->next));
- __raise_softirq_irqoff(TASKLET_SOFTIRQ);
- local_irq_enable();
- }
- }
注释如下:
①首先,在当前CPU关中断的情况下,“原子”地读取当前CPU的tasklet队列头部指针,将其保存到局部变量list指针中,然后将当前CPU的tasklet队列头部指针设置为NULL,以表示理论上当前CPU将不再有tasklet需要执行(但最后的实际结果却并不一定如此,下面将会看到)。
②然后,用一个while{}循环来遍历由list所指向的tasklet队列,队列中的各个元素就是将在当前CPU上执行的tasklet。循环体的执行步骤如下:
a -- 用指针t来表示当前队列元素,即当前需要执行的tasklet。
b -- 更新list指针为list->next,使它指向下一个要执行的tasklet。
c -- 用tasklet_trylock()宏试图对当前要执行的tasklet(由指针t所指向)进行加锁
如果加锁成功(当前没有任何其他CPU正在执行这个tasklet),则用原子读函atomic_read()进一步判断count成员的值。如果count为0,说明这个tasklet是允许执行的,于是:
(1)先清除TASKLET_STATE_SCHED位;
(2)然后,调用这个tasklet的可执行函数func;
(3)执行barrier()操作;
(4)调用宏tasklet_unlock()来清除TASKLET_STATE_RUN位。
(5)最后,执行continue语句跳过下面的步骤,回到while循环继续遍历队列中的下一个元素。如果count不为0,说明这个tasklet是禁止运行的,于是调用tasklet_unlock()清除前面用tasklet_trylock()设置的TASKLET_STATE_RUN位。
如果tasklet_trylock()加锁不成功,或者因为当前tasklet的count值非0而不允许执行时,我们必须将这个tasklet重新放回到当前CPU的tasklet队列中,以留待这个CPU下次服务软中断向量TASKLET_SOFTIRQ时再执行。为此进行这样几步操作:
(1)先关CPU中断,以保证下面操作的原子性。
(2)把这个tasklet重新放回到当前CPU的tasklet队列的首部;
(3)调用__cpu_raise_softirq()函数在当前CPU上再触发一次软中断请求TASKLET_SOFTIRQ;
(4)开中断。
c -- 最后,回到while循环继续遍历队列。
5、tasklet_kill()实现
- void tasklet_kill(struct tasklet_struct *t)
- {
- if (in_interrupt())
- printk("Attempt to kill tasklet from interruptn");
- while (test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
- {
- do {
- yield();
- } while (test_bit(TASKLET_STATE_SCHED, &t->state));
- }
- tasklet_unlock_wait(t);
- clear_bit(TASKLET_STATE_SCHED, &t->state);
- }
四、一个tasklet调用例子
找了一个tasklet的例子看一下(drivers/usb/atm,usb摄像头),在其自举函数usbatm_usb_probe()中调用了tasklet_init()初始化了两个tasklet描述符用于接收和发送的“可延迟操作处理”,但此是并没有将其加入到tasklet_vec[]或tasklet_hi_vec[]中:
- tasklet_init(&instance->rx_channel.tasklet,
- usbatm_rx_process, (unsigned long)instance);
- tasklet_init(&instance->tx_channel.tasklet,
- usbatm_tx_process, (unsigned long)instance);
- static int usbatm_atm_send(struct atm_vcc *vcc, struct sk_buff *skb)
- {
- ...
- tasklet_schedule(&instance->tx_channel.tasklet);
- ...
- }
- void usbatm_usb_disconnect(struct usb_interface *intf)
- {
- ...
- tasklet_disable(&instance->rx_channel.tasklet);
- tasklet_disable(&instance->tx_channel.tasklet);
- ...
- tasklet_enable(&instance->rx_channel.tasklet);
- tasklet_enable(&instance->tx_channel.tasklet);
- ...
- }
在其销毁接口usbatm_destroy_instance()中调用tasklet_kill()函数,强行将该tasklet踢出调度队列。
从上述过程以及tasklet的设计可以看出,tasklet整体是这么运行的:驱动应该在其硬中断处理函数的末尾调用tasklet_schedule()接口激活该tasklet;内核经常调用do_softirq()执行软中断,通过softirq执行tasket,如下图所示。图中灰色部分为禁止硬中断部分,为保护软中断pending位图和tasklet_vec链表数组,count的改变均为原子操作,count确保SMP架构下同时只有一个CPU在执行该tasklet: