歡迎您光臨本站 註冊首頁

Linux內核中工作隊列的操作

←手機掃碼閱讀     火星人 @ 2014-03-26 , reply:0

作者:yfydz

1. 前言

工作隊列(workqueue)的Linux內核中的定義的用來處理不是很緊急事件的回調方式處理方法.


以下代碼的linux內核版本為2.6.19.2, 源代碼文件主要為kernel/workqueue.c.


2. 數據結構

/* include/linux/workqueue.h */

// 工作節點結構

struct work_struct {

// 等待時間

unsigned long pending;

// 鏈表節點

struct list_head entry;

// workqueue回調函數

void (*func)(void *);

// 回調函數func的數據

void *data;

// 指向CPU相關數據, 一般指向struct cpu_workqueue_struct結構

void *wq_data;

// 定時器

struct timer_list timer;

};


struct execute_work {

struct work_struct work;

};


/* kernel/workqueue.c */

/*

* The per-CPU workqueue (if single thread, we always use the first

* possible cpu).

*

* The sequence counters are for flush_scheduled_work(). It wants to wait

* until all currently-scheduled works are completed, but it doesn't

* want to be livelocked by new, incoming ones. So it waits until

* remove_sequence is >= the insert_sequence which pertained when

* flush_scheduled_work() was called.

*/

// 這個結構是針對每個CPU的

struct cpu_workqueue_struct {

// 結構鎖

spinlock_t lock;

// 下一個要執行的節點序號

long remove_sequence; /* Least-recently added (next to run) */

// 下一個要插入節點的序號

long insert_sequence; /* Next to add */

// 工作機構鏈表節點

struct list_head worklist;

// 要進行處理的等待隊列

wait_queue_head_t more_work;

// 處理完的等待隊列

wait_queue_head_t work_done;

// 工作隊列節點

struct workqueue_struct *wq;

// 進程指針

struct task_struct *thread;

int run_depth; /* Detect run_workqueue() recursion depth */

} ____cacheline_aligned;

/*

* The externally visible workqueue abstraction is an array of

* per-CPU workqueues:

*/

// 工作隊列結構

struct workqueue_struct {

struct cpu_workqueue_struct *cpu_wq;

const char *name;

struct list_head list; /* Empty if single thread */

};


kernel/workqueue.c中定義了一個工作隊列鏈表, 所有工作隊列可以掛接到這個鏈表中:

static LIST_HEAD(workqueues);


3. 一些宏定義

/* include/linux/workqueue.h */

// 初始化工作隊列

#define __WORK_INITIALIZER(n, f, d) { \

// 初始化list

.entry = { &(n).entry, &(n).entry }, \

// 回調函數

.func = (f), \

// 回調函數參數

.data = (d), \

// 初始化定時器

.timer = TIMER_INITIALIZER(NULL, 0, 0), \

}


// 聲明工作隊列並初始化

#define DECLARE_WORK(n, f, d) \

struct work_struct n = __WORK_INITIALIZER(n, f, d)

/*

* initialize a work-struct's func and data pointers:

*/

// 重新定義工作結構參數

#define PREPARE_WORK(_work, _func, _data) \

do { \

(_work)->func = _func; \

(_work)->data = _data; \

} while (0)

/*

* initialize all of a work-struct:

*/

// 初始化工作結構, 和__WORK_INITIALIZER功能相同,不過__WORK_INITIALIZER用在

// 參數初始化定義, 而該宏用在程序之中對工作結構賦值

#define INIT_WORK(_work, _func, _data) \

do { \

INIT_LIST_HEAD(&(_work)->entry); \

(_work)->pending = 0; \

PREPARE_WORK((_work), (_func), (_data)); \

init_timer(&(_work)->timer); \

} while (0)


4. 操作函數


4.1 創建工作隊列


一般的創建函數是create_workqueue, 但這其實只是一個宏:

/* include/linux/workqueue.h */

#define create_workqueue(name) __create_workqueue((name), 0)

在workqueue的初始化函數中, 定義了一個針對內核中所有線程可用的事件工作隊列, 其他內核線程建立的事件工作結構就都掛接到該隊列:

void init_workqueues(void)

{

...

keventd_wq = create_workqueue("events");

...

}


核心創建函數是__create_workqueue:


struct workqueue_struct *__create_workqueue(const char *name,

int singlethread)

{

int cpu, destroy = 0;

struct workqueue_struct *wq;

struct task_struct *p;

// 分配工作隊列結構空間

wq = kzalloc(sizeof(*wq), GFP_KERNEL);

if (!wq)

return NULL;

// 為每個CPU分配單獨的工作隊列空間

wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);

if (!wq->cpu_wq) {

kfree(wq);

return NULL;

}

wq->name = name;

mutex_lock(&workqueue_mutex);

if (singlethread) {

// 使用create_workqueue宏時該參數始終為0

// 如果是單一線程模式, 在單線程中調用各個工作隊列

// 建立一個的工作隊列內核線程

INIT_LIST_HEAD(&wq->list);

// 建立工作隊列的線程

p = create_workqueue_thread(wq, singlethread_cpu);

if (!p)

destroy = 1;

else

// 喚醒該線程

wake_up_process(p);

} else {

// 鏈表模式, 將工作隊列添加到工作隊列鏈表

list_add(&wq->list, &workqueues);

// 為每個CPU建立一個工作隊列線程

for_each_online_cpu(cpu) {

p = create_workqueue_thread(wq, cpu);

if (p) {

// 綁定CPU

kthread_bind(p, cpu);

// 喚醒線程

wake_up_process(p);

} else

destroy = 1;

}

}

mutex_unlock(&workqueue_mutex);

/*

* Was there any error during startup? If yes then clean up:

*/

if (destroy) {

// 建立線程失敗, 釋放工作隊列

destroy_workqueue(wq);

wq = NULL;

}

return wq;

}

EXPORT_SYMBOL_GPL(__create_workqueue);


// 創建工作隊列線程

static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,

int cpu)

{

// 每個CPU的工作隊列

struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);

struct task_struct *p;

spin_lock_init(&cwq->lock);

// 初始化

cwq->wq = wq;

cwq->thread = NULL;

cwq->insert_sequence = 0;

cwq->remove_sequence = 0;

INIT_LIST_HEAD(&cwq->worklist);

// 初始化等待隊列more_work, 該隊列處理要執行的工作結構

init_waitqueue_head(&cwq->more_work);

// 初始化等待隊列work_done, 該隊列處理執行完的工作結構

init_waitqueue_head(&cwq->work_done);

// 建立內核線程work_thread

if (is_single_threaded(wq))

p = kthread_create(worker_thread, cwq, "%s", wq->name);

else

p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);

if (IS_ERR(p))

return NULL;

// 保存線程指針

cwq->thread = p;

return p;

}

static int worker_thread(void *__cwq)

{

struct cpu_workqueue_struct *cwq = __cwq;

// 聲明一個等待隊列

DECLARE_WAITQUEUE(wait, current);

// 信號

struct k_sigaction sa;

sigset_t blocked;

current->flags |= PF_NOFREEZE;

// 降低進程優先順序, 工作進程不是個很緊急的進程,不和其他進程搶佔CPU,通常在系統空閑時運行

set_user_nice(current, -5);

/* Block and flush all signals */

// 阻塞所有信號

sigfillset(&blocked);

sigprocmask(SIG_BLOCK, &blocked, NULL);

flush_signals(current);

/*

* We inherited MPOL_INTERLEAVE from the booting kernel.

* Set MPOL_DEFAULT to insure node local allocations.

*/

numa_default_policy();

/* SIG_IGN makes children autoreap: see do_notify_parent(). */

// 信號處理都是忽略

sa.sa.sa_handler = SIG_IGN;

sa.sa.sa_flags = 0;

siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));

do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);

// 進程可中斷

set_current_state(TASK_INTERRUPTIBLE);

// 進入循環, 沒明確停止該進程就一直運行

while (!kthread_should_stop()) {

// 設置more_work等待隊列, 當有新work結構鏈入隊列中時會激發此等待隊列

add_wait_queue(&cwq->more_work, &wait);

if (list_empty(&cwq->worklist))

// 工作隊列為空, 睡眠

schedule();

else

// 進行運行狀態

__set_current_state(TASK_RUNNING);

// 刪除等待隊列

remove_wait_queue(&cwq->more_work, &wait);

// 按鏈表遍歷執行工作任務

if (!list_empty(&cwq->worklist))

run_workqueue(cwq);

// 執行完工作, 設置進程是可中斷的, 重新循環等待工作

set_current_state(TASK_INTERRUPTIBLE);

}

__set_current_state(TASK_RUNNING);

return 0;

}


// 運行工作結構

static void run_workqueue(struct cpu_workqueue_struct *cwq)

{

unsigned long flags;

/*

* Keep taking off work from the queue until

* done.

*/

// 加鎖

spin_lock_irqsave(&cwq->lock, flags);

// 統計已經遞歸調用了多少次了

cwq->run_depth++;

if (cwq->run_depth > 3) {

// 遞歸調用此時太多

/* morton gets to eat his hat */

printk("%s: recursion depth exceeded: %d\n",

__FUNCTION__, cwq->run_depth);

dump_stack();

}

// 遍歷工作鏈表

while (!list_empty(&cwq->worklist)) {

// 獲取的是next節點的

struct work_struct *work = list_entry(cwq->worklist.next,

struct work_struct, entry);

void (*f) (void *) = work->func;

void *data = work->data;

// 刪除節點, 同時節點中的list參數清空

list_del_init(cwq->worklist.next);

// 解鎖

// 現在在執行以下代碼時可以中斷,run_workqueue本身可能會重新被調用, 所以要判斷遞歸深度

spin_unlock_irqrestore(&cwq->lock, flags);

BUG_ON(work->wq_data != cwq);

// 工作結構已經不在鏈表中

clear_bit(0, &work->pending);

// 執行工作函數

f(data);

// 重新加鎖

spin_lock_irqsave(&cwq->lock, flags);

// 執行完的工作序列號遞增

cwq->remove_sequence++;

// 喚醒工作完成等待隊列, 供釋放工作隊列

wake_up(&cwq->work_done);

}

// 減少遞歸深度

cwq->run_depth--;

// 解鎖

spin_unlock_irqrestore(&cwq->lock, flags);

}


4.2 釋放工作隊列

/**

* destroy_workqueue - safely terminate a workqueue

* @wq: target workqueue

*

* Safely destroy a workqueue. All work currently pending will be done first.

*/

void destroy_workqueue(struct workqueue_struct *wq)

{

int cpu;

// 清除當前工作隊列中的所有工作

flush_workqueue(wq);

/* We don't need the distraction of CPUs appearing and vanishing. */

mutex_lock(&workqueue_mutex);

// 結束該工作隊列的線程

if (is_single_threaded(wq))

cleanup_workqueue_thread(wq, singlethread_cpu);

else {

for_each_online_cpu(cpu)

cleanup_workqueue_thread(wq, cpu);

list_del(&wq->list);

}

mutex_unlock(&workqueue_mutex);

// 釋放工作隊列中對應每個CPU的工作隊列數據

free_percpu(wq->cpu_wq);

kfree(wq);

}

EXPORT_SYMBOL_GPL(destroy_workqueue);


/**

* flush_workqueue - ensure that any scheduled work has run to completion.

* @wq: workqueue to flush

*

* Forces execution of the workqueue and blocks until its completion.

* This is typically used in driver shutdown handlers.

*

* This function will sample each workqueue's current insert_sequence number and

* will sleep until the head sequence is greater than or equal to that. This

* means that we sleep until all works which were queued on entry have been

* handled, but we are not livelocked by new incoming ones.

*

* This function used to run the workqueues itself. Now we just wait for the

* helper threads to do it.

*/

void fastcall flush_workqueue(struct workqueue_struct *wq)

{

// 該進程可以睡眠

might_sleep();

// 清空每個CPU上的工作隊列

if (is_single_threaded(wq)) {

/* Always use first cpu's area. */

flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));

} else {

int cpu;

mutex_lock(&workqueue_mutex);

for_each_online_cpu(cpu)

flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));

mutex_unlock(&workqueue_mutex);

}

}

EXPORT_SYMBOL_GPL(flush_workqueue);


flush_workqueue的核心處理函數為flush_cpu_workqueue:

static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)

{

if (cwq->thread == current) {

// 如果是工作隊列進程正在被調度

/*

* Probably keventd trying to flush its own queue. So simply run

* it by hand rather than deadlocking.

*/

// 執行完該工作隊列

run_workqueue(cwq);

} else {

// 定義等待

DEFINE_WAIT(wait);

long sequence_needed;

// 加鎖

spin_lock_irq(&cwq->lock);

// 最新工作結構序號

sequence_needed = cwq->insert_sequence;

// 該條件是判斷隊列中是否還有沒有執行的工作結構

while (sequence_needed - cwq->remove_sequence > 0) {

// 有為執行的工作結構

// 通過work_done等待隊列等待

prepare_to_wait(&cwq->work_done, &wait,

TASK_UNINTERRUPTIBLE);

// 解鎖

spin_unlock_irq(&cwq->lock);

// 睡眠, 由wake_up(&cwq->work_done)來喚醒

schedule();

// 重新加鎖

spin_lock_irq(&cwq->lock);

}

// 等待清除

finish_wait(&cwq->work_done, &wait);

spin_unlock_irq(&cwq->lock);

}

}


4.3 調度工作


在大多數情況下, 並不需要自己建立工作隊列,而是只定義工作, 將工作結構掛接到內核預定義的事件工作隊列中調度, 在kernel/workqueue.c中定義了一個靜態全局量的工作隊列keventd_wq:

static struct workqueue_struct *keventd_wq;


4.3.1 立即調度

// 在其他函數中使用以下函數來調度工作結構, 是把工作結構掛接到工作隊列中進行調度

/**

* schedule_work - put work task in global workqueue

* @work: job to be done

*

* This puts a job in the kernel-global workqueue.

*/

// 調度工作結構, 將工作結構添加到事件工作隊列keventd_wq

int fastcall schedule_work(struct work_struct *work)

{

return queue_work(keventd_wq, work);

}

EXPORT_SYMBOL(schedule_work);


/**

* queue_work - queue work on a workqueue

* @wq: workqueue to use

* @work: work to queue

*

* Returns 0 if @work was already on a queue, non-zero otherwise.

*

* We queue the work to the CPU it was submitted, but there is no

* guarantee that it will be processed by that CPU.

*/

int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)

{

int ret = 0, cpu = get_cpu();

if (!test_and_set_bit(0, &work->pending)) {

// 工作結構還沒在隊列, 設置pending標誌表示把工作結構掛接到隊列中

if (unlikely(is_single_threaded(wq)))

cpu = singlethread_cpu;

BUG_ON(!list_empty(&work->entry));

// 進行具體的排隊

__queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);

ret = 1;

}

put_cpu();

return ret;

}

EXPORT_SYMBOL_GPL(queue_work);

/* Preempt must be disabled. */

// 不能被搶佔

static void __queue_work(struct cpu_workqueue_struct *cwq,

struct work_struct *work)

{

unsigned long flags;

// 加鎖

spin_lock_irqsave(&cwq->lock, flags);

// 指向CPU工作隊列

work->wq_data = cwq;

// 掛接到工作鏈表

list_add_tail(&work->entry, &cwq->worklist);

// 遞增插入的序列號

cwq->insert_sequence++;

// 喚醒等待隊列準備處理工作結構

wake_up(&cwq->more_work);

spin_unlock_irqrestore(&cwq->lock, flags);

}


4.3.2 延遲調度


4.3.2.1 schedule_delayed_work

/**

* schedule_delayed_work - put work task in global workqueue after delay

* @work: job to be done

* @delay: number of jiffies to wait

*

* After waiting for a given time this puts a job in the kernel-global

* workqueue.

*/

// 延遲調度工作, 延遲一定時間后再將工作結構掛接到工作隊列

int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay)

{

return queue_delayed_work(keventd_wq, work, delay);

}

EXPORT_SYMBOL(schedule_delayed_work);


/**

* queue_delayed_work - queue work on a workqueue after delay

* @wq: workqueue to use

* @work: work to queue

* @delay: number of jiffies to wait before queueing

*

* Returns 0 if @work was already on a queue, non-zero otherwise.

*/

int fastcall queue_delayed_work(struct workqueue_struct *wq,

struct work_struct *work, unsigned long delay)

{

int ret = 0;

// 定時器, 此時的定時器應該是不起效的, 延遲將通過該定時器來實現

struct timer_list *timer = &work->timer;

if (!test_and_set_bit(0, &work->pending)) {

// 工作結構還沒在隊列, 設置pending標誌表示把工作結構掛接到隊列中

// 如果現在定時器已經起效, 出錯

BUG_ON(timer_pending(timer));

// 工作結構已經掛接到鏈表, 出錯

BUG_ON(!list_empty(&work->entry));

/* This stores wq for the moment, for the timer_fn */

// 保存工作隊列的指針

work->wq_data = wq;

// 定時器初始化

timer->expires = jiffies + delay;

timer->data = (unsigned long)work;

// 定時函數

timer->function = delayed_work_timer_fn;

// 定時器生效, 定時到期后再添加到工作隊列

add_timer(timer);

ret = 1;

}

return ret;

}

EXPORT_SYMBOL_GPL(queue_delayed_work);



// 定時中斷函數

static void delayed_work_timer_fn(unsigned long __data)

{

struct work_struct *work = (struct work_struct *)__data;

struct workqueue_struct *wq = work->wq_data;

// 獲取CPU

int cpu = smp_processor_id();

if (unlikely(is_single_threaded(wq)))

cpu = singlethread_cpu;

// 將工作結構添加到工作隊列,注意這是在時間中斷調用

__queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);

}


4.3.2.2 schedule_delayed_work_on


指定CPU的延遲調度工作結構, 和schedule_delayed_work相比增加了一個CPU參數, 其他都相同

/**

* schedule_delayed_work_on - queue work in global workqueue on CPU after delay

* @cpu: cpu to use

* @work: job to be done

* @delay: number of jiffies to wait

*

* After waiting for a given time this puts a job in the kernel-global

* workqueue on the specified CPU.

*/

int schedule_delayed_work_on(int cpu,

struct work_struct *work, unsigned long delay)

{

return queue_delayed_work_on(cpu, keventd_wq, work, delay);

}


/**

* queue_delayed_work_on - queue work on specific CPU after delay

* @cpu: CPU number to execute work on

* @wq: workqueue to use

* @work: work to queue

* @delay: number of jiffies to wait before queueing

*

* Returns 0 if @work was already on a queue, non-zero otherwise.

*/

int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,

struct work_struct *work, unsigned long delay)

{

int ret = 0;

struct timer_list *timer = &work->timer;

if (!test_and_set_bit(0, &work->pending)) {

BUG_ON(timer_pending(timer));

BUG_ON(!list_empty(&work->entry));

/* This stores wq for the moment, for the timer_fn */

work->wq_data = wq;

timer->expires = jiffies + delay;

timer->data = (unsigned long)work;

timer->function = delayed_work_timer_fn;

add_timer_on(timer, cpu);

ret = 1;

}

return ret;

}

EXPORT_SYMBOL_GPL(queue_delayed_work_on);


5. 結論


工作隊列和定時器函數處理有點類似, 都是執行一定的回調函數, 但和定時器處理函數不同的是定時器回調函數只執行一次, 而且執行定時器回調函數的時候是在時鐘中斷中, 限制比較多, 因此回調程序不能太複雜; 而工作隊列是通過內核線程實現, 一直有效, 可重複執行, 由於執行時降低了線程的優先順序, 執行時可能休眠, 因此工作隊列處理的應該是那些不是很緊急的任務, 如垃圾回收處理等, 通常在系統空閑時執行,在xfrm庫中就廣泛使用了workqueue,使用時,只需要定義work結構,然後調用schedule_(delayed_)work即可。

[火星人 ] Linux內核中工作隊列的操作已經有880次圍觀

http://coctec.com/docs/linux/show-post-188558.html