软中断、tasklet 以及工作队列,均是 linux 中将任务推后执行的机制。其中工作队列与用户态使用的线程池类似。
什么是任务推后执行呢 ?
可以借助于开发应用时经常使用的线程池来理解。任务推后执行,就是任务本该执行的时候没有立即执行,而是将任务放到任务容器(标志位或者任务队列)中,相当于生产者;任务容器还有消费者,消费者从容器中取出任务来执行。这就是推后执行,其中有 3 个组成元素,生产者,任务容器和消费者。
1 tasklet 通过软中断实现
tasklet 是通过软中断来实现的,相当于在软中断的基础上又扩展了一层。有一种场景的思路和 tasklet 与软中断的关系比较类似,在开发网络应用时,由于 tcp 端口是有限的,如果系统的规格下 tcp 端口号不够用的话,在应用层有时会做一层虚拟化,用户使用的连接是虚拟连接,并不是独占一个 tcp 端口,而是多个虚拟连接共享一个 tcp 端口。tasklet 对软中断的使用类似于使用 tcp 时的端口复用。
enum
{HI_SOFTIRQ=0,TIMER_SOFTIRQ,NET_TX_SOFTIRQ,NET_RX_SOFTIRQ,BLOCK_SOFTIRQ,IRQ_POLL_SOFTIRQ,TASKLET_SOFTIRQ,SCHED_SOFTIRQ,HRTIMER_SOFTIRQ,RCU_SOFTIRQ, /* Preferable RCU should always be the last softirq */NR_SOFTIRQS
};
tasklet 使用了两个软中断:HI_SOFTIRQ 和 TASKLET_SOFTIRQ。两者优先级不同,前者比后者优先级高。软中断的优先级是怎么实现的呢 ?不同的软中断触发,就是设置软中断的标志位,类似于一个 bitmap, 软中断数值越小,对应的优先级就越高,因为在函数 __do_softirq() 中遍历软中断时,是从小向大进行遍历的,这样保证了数值小的软中断被先处理。软中断的优先级类似于调度策略中的 rt 调度策略,rt 调度策略在内核中的优先级取值范围是 [0, 98],也使用了一个 bitmap 来标志这个优先级下有没有待执行的线程,优先级数值越小,越早被遍历,表示优先级越高。
这两个软中断在函数 softirq_init() 中进行注册。软中断处理函数分别是 tasklet_action 和 tasklet_hi_action。
void __init softirq_init(void)
{int cpu;for_each_possible_cpu(cpu) {per_cpu(tasklet_vec, cpu).tail =&per_cpu(tasklet_vec, cpu).head;per_cpu(tasklet_hi_vec, cpu).tail =&per_cpu(tasklet_hi_vec, cpu).head;}open_softirq(TASKLET_SOFTIRQ, tasklet_action);open_softirq(HI_SOFTIRQ, tasklet_hi_action);
}
2 tasklet 数据结构和 api
2.1 struct tasklet_struct
struct tasklet_struct 表示一个 tasklet。其中最主要的成员是这个 tasklet 的处理函数,处理函数有两个选择,一个是 func,func 的入参是用户自定义的参数,data 在 struct tasklet_struct 中的 data 成员中保存;一个是 callback,callback 的参数是 tasklet 本身,当使用 callback 的时候需要将 use_callback 设置为 true。
struct tasklet_struct
{struct tasklet_struct *next;unsigned long state;atomic_t count;bool use_callback;union {void (*func)(unsigned long data);void (*callback)(struct tasklet_struct *t);};unsigned long data;
};
state 字段表示 tasklet 当前的状态。状态可取如下两个值:
enum
{// 说明 tasklet 已经被提交了,等待被执行TASKLET_STATE_SCHED, /* Tasklet is scheduled for execution */// 说明 tasklet 当前正在被执行// 该状态只在多处理器系统上才会生效// 单处理器系统上不需要对 tasklet 做是不是 running 的判断TASKLET_STATE_RUN /* Tasklet is running (SMP only) */
};// 只用定义了 CONFIG_SMP,TASKLET_STATE_RUN 才会被使用
#ifdef CONFIG_SMP
static inline int tasklet_trylock(struct tasklet_struct *t)
{return !test_and_set_bit(TASKLET_STATE_RUN, &(t)->state);
}static inline void tasklet_unlock(struct tasklet_struct *t)
{smp_mb__before_atomic();clear_bit(TASKLET_STATE_RUN, &(t)->state);
}static inline void tasklet_unlock_wait(struct tasklet_struct *t)
{while (test_bit(TASKLET_STATE_RUN, &(t)->state)) { barrier(); }
}
#else
#define tasklet_trylock(t) 1
#define tasklet_unlock_wait(t) do { } while (0)
#define tasklet_unlock(t) do { } while (0)
#endif
count 是 tasklet 的计数器,当 count 不是 0,tasklet 会被禁止,不允许执行;只有当 count 为 0 的时候,tasklet 才可以被执行。与之对应的是 tasklet_disable() 和 tasklet_enable() 两个函数,分别是禁用 tasklet 和使能 tasklet,在 tasklet_disable() 中增加 count 的值,在 tasklet_enable() 中减小 count 的值。
static inline void tasklet_disable(struct tasklet_struct *t)
{tasklet_disable_nosync(t);tasklet_unlock_wait(t);smp_mb();
}static inline void tasklet_enable(struct tasklet_struct *t)
{smp_mb__before_atomic();atomic_dec(&t->count);
}
2.2 tasklet 初始化
初始化一个 tasklet,就是定义一个 struct tasklet_struct 结构体,然后初始化其中的成员。可以通过宏 DECLARE_TASKLET 来定义一个 tasklet,name 是 struct tasklet_struct 的名字,_callback 是 tasklet 的处理函数;tasklet_setup() 可以初始化一个 tasklet,处理函数是 callback,tasklet_init() 也可以初始化一个 tasklet,处理函数是 func。
#define DECLARE_TASKLET(name, _callback) \
struct tasklet_struct name = { \.count = ATOMIC_INIT(0), \.callback = _callback, \.use_callback = true, \
}void tasklet_setup(struct tasklet_struct *t,void (*callback)(struct tasklet_struct *))
{t->next = NULL;t->state = 0;atomic_set(&t->count, 0);t->callback = callback;t->use_callback = true;t->data = 0;
}
EXPORT_SYMBOL(tasklet_setup);void tasklet_init(struct tasklet_struct *t,void (*func)(unsigned long), unsigned long data)
{t->next = NULL;t->state = 0;atomic_set(&t->count, 0);t->func = func;t->use_callback = false;t->data = data;
}
EXPORT_SYMBOL(tasklet_init);
2.3 tasklet_schedule
使用 TASKLET_SOFTIRQ 的 tasklet 通过 task_schedule 来调度;使用 HI_SOFTIRQ 的 tasklet 通过 tasklet_hi_schedule 来调度。
tasklet_schedule() 这样的命名并不是太好理解,触发软中断的时候使用的关键字是 raise,而触发 tasklet 的时候使用的关键字是 schedule。意思都是将任务设置到待执行的状态,可以执行了。
① 判断 tasklet 是不是处于 SCHED 状态
test_and_set_bit() 将这一 bit 设置为 1,并返回设置之前的值,如果设置之前是 0 那么就会调用__tasklet_schedule() 将 tasklet 设置到待执行状态;否则的话,说明 tasklet 已经调度了,直接返回。
static inline void tasklet_schedule(struct tasklet_struct *t)
{if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))__tasklet_schedule(t);
}
② __tasklet_schedule() 最终调用到函数 __tasklet_schedule_common(),在这个函数中主要做了两件事,一个是将这个 tasklet 加入到 tasklet_vec 链表中,这样在函数 tasklet_action 中就能遍历到这个 tasklet 并进行处理;第二个工作是触发 tasklet 软中断。
struct tasklet_head {struct tasklet_struct *head;struct tasklet_struct **tail;
};static DEFINE_PER_CPU(struct tasklet_head, tasklet_vec);
static DEFINE_PER_CPU(struct tasklet_head, tasklet_hi_vec);static void __tasklet_schedule_common(struct tasklet_struct *t,struct tasklet_head __percpu *headp,unsigned int softirq_nr)
{struct tasklet_head *head;unsigned long flags;local_irq_save(flags);head = this_cpu_ptr(headp);t->next = NULL;*head->tail = t;head->tail = &(t->next);raise_softirq_irqoff(softirq_nr);local_irq_restore(flags); tasklet_schedule
}
tasklet_vec 和 tasklet_hi_vec 是 per cpu 的变量,每个 cpu 都有一个对应的链表。这两个链表的维护非常绕,有两个成员,head 和 tail,head 是 struct tasklet_struct 指针,tail 是 struct task_struct 二级指针,在初始化的时候,使 tail 指向了 head。
在 tasklet_schedule() 中将 tasklet 加到链表中,只需要维护 tail 就可以了。
void __init softirq_init(void)
{int cpu;for_each_possible_cpu(cpu) {per_cpu(tasklet_vec, cpu).tail =&per_cpu(tasklet_vec, cpu).head;per_cpu(tasklet_hi_vec, cpu).tail =&per_cpu(tasklet_hi_vec, cpu).head;}open_softirq(TASKLET_SOFTIRQ, tasklet_action);open_softirq(HI_SOFTIRQ, tasklet_hi_action);
}
2.4 tasklet 执行
tasklet 的执行在函数 tasklet_action() 中进行。最终调用到函数 tasklet_action_common()。
(1) tl_head 是 tasklet_vec 链表,首先从 tasklet_vec 中把所有的 tasklet 取下来,然后将 tasklet_vec 链表设置为空
(2) 处理 tasklet 链表中的任务
① tasklet_trylock() 会判断当前 tasklet 是不是出于 RUN 状态,如果是的话那么不处理这个 tasklet
② atomic_read 读取 count 的值,来判断这个 tasklet 是不是被禁用,如果被禁用,则不处理
③ 清除 SCHED 标志,清除之后,可以再次调度这个 tasklet 了
④ 如果这个 tasklet 没有被处理,那么会被加回到 tasklet_vec 链表中
static void tasklet_action_common(struct softirq_action *a,struct tasklet_head *tl_head,unsigned int softirq_nr)
{struct tasklet_struct *list;local_irq_disable();list = tl_head->head;tl_head->head = NULL;tl_head->tail = &tl_head->head;local_irq_enable();while (list) {struct tasklet_struct *t = list;list = list->next;if (tasklet_trylock(t)) {if (!atomic_read(&t->count)) {if (!test_and_clear_bit(TASKLET_STATE_SCHED,&t->state))BUG();if (t->use_callback)t->callback(t);elset->func(t->data);tasklet_unlock(t);continue;}tasklet_unlock(t);}local_irq_disable();t->next = NULL;*tl_head->tail = t;tl_head->tail = &t->next;__raise_softirq_irqoff(softirq_nr);local_irq_enable();}
}
3 tasklet 使用示例
如下是在一个内核模块中使用 tasklet。
① 定义了一个 tasklet my_tasklet,在模块初始化函数中通过 tasklet_init() 进行初始化,
tasklet 的处理函数是 tasklet_handler() 在该函数中打印了一条日志 "Tasklet executed successfully!\n"。
② 在内核模块中创建了一个线程,在该线程中每隔一秒通过 tasklet_schedule() 调度一次。
#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/kthread.h>
#include <linux/delay.h>
#include <linux/interrupt.h>// 定义 Tasklet 处理函数
void tasklet_handler(unsigned long data);// 声明一个 Tasklet 结构体
static struct tasklet_struct my_tasklet;// 内核线程执行函数
static int thread_func(void *data)
{while (!kthread_should_stop()) {// 调度 Tasklet 执行tasklet_schedule(&my_tasklet);// 等待 1smsleep(1000);}return 0;
}// 模拟的 Tasklet 处理函数
void tasklet_handler(unsigned long data)
{printk(KERN_INFO "Tasklet executed successfully!\n");
}// 模块初始化函数
static int __init tasklet_example_init(void)
{// 初始化 Tasklet 结构体tasklet_init(&my_tasklet, tasklet_handler, 0);// 创建内核线程kthread_run(thread_func, NULL, "tasklet_thread");printk(KERN_INFO "Tasklet example module initialized\n");return 0;
}// 模块清理函数
static void __exit tasklet_example_exit(void)
{// 停止 Tasklet 的调度tasklet_kill(&my_tasklet);printk(KERN_INFO "Tasklet example module exited\n");
}// 注册模块初始化和清理函数
module_init(tasklet_example_init);
module_exit(tasklet_example_exit);MODULE_LICENSE("GPL");
MODULE_AUTHOR("wyl");
4 软中断,tasklet 的区别
软中断和 tasklet 的最主要区别,同一个软中断可以在多个 cpu 上并发处理,而对于一个 tasklet 来说,虽然也可以调度到不同的 cpu 上,但是对于同一个 tasklet 不会并发处理。
从 tasklet 的处理函数 tasklet_action_common() 中也可以看出来,如果当前这个 tasklet 处于 RUN 状态,那么就不会处理它。