ワークキューとは
一言でいうと予め登録していた処理(関数)を後から実行する遅延実行の仕組みです。名前の通り、処理をキューイングしておき、実行要求があった時点で処理を行います。
Linuxでもワークキューは存在し、役割もほぼ同じです。
ワークキューの種類
ワークキューを大別すると以下の3種類があります。
1)自分自身でワークキューを定義する方法
2)時刻を指定して遅延実行する方法
3)システムが用意しているワークキューを利用する方法
1)と3)はワークキューを自分で用意するか、システムが用意してくれているワークキュー(k_sys_work_q)を使用するかの違いのみです。
いきなり内部構造を見ても理解し辛いと思いますので、まずはワークキューの使用例を次節で示します。
なお、3)の構造を図にすると下記のようになります。この構造の仕組みは1)でも同様です。
ワークキューの使用例
「1)自分自身でワークキューを定義する方法」の例
本節では、Zephyrのソースコードに含まれているテストコード(tests/kernel/critical/src/critical.c)を例に使用方法を見てみます。
a)ワークキューの初期化
下記の通り、k_work_q_start ( )でワークキューoffload_work_qを初期化します。
offload_work_qは上図のk_sys_work_qに相当します。
本関数を実行すると内部でcooperativeクラスの最低優先度 (-1)で実行するoffload_work_q用のスレッドを生成します。(スレッドの生成部については後述します)
static void init_objects(void)
{
critical_var = 0;
alt_task_iterations = 0;
k_work_q_start(&offload_work_q, //ワークキュー
offload_work_q_stack, //ワークキュー用スタックアドレス
K_THREAD_STACK_SIZEOF(offload_work_q_stack), //ワークキュー用スタックサイズ
CONFIG_OFFLOAD_WORKQUEUE_PRIORITY);//本ワークキューを実行するスレッドの優先度
}
b) ワークアイテムの初期化
下記(★)でワークアイテムwork_itemとこのワークアイテム用の関数critical_rtn ( )を登録しています。☆についてはc) で述べます。
u32_t critical_loop(u32_t count)
{
s64_t mseconds;
mseconds = k_uptime_get();
while (k_uptime_get() < mseconds + NUM_MILLISECONDS) {
struct k_work work_item;
k_work_init(&work_item, critical_rtn); ★
k_work_submit_to_queue(&offload_work_q, &work_item);☆
:
}
c) ワークアイテムの実行要求と実行
b)の★の段階ではワークキューoffload_work_qとワークアイテムwork_itemは関連付けられていません。b)の☆でワークキューoffload_work_qにワークアイテムwork_itemをキューイングし、critical_rtn ( )の実行を要求します。
「2)時刻を指定して遅延実行する方法」の例
本節では、Zephyrのソースコードに含まれているテストコード(tests/kernel/workq/work_queue/src/main.c)を例に見ていきます。
a)対象の遅延ワークアイテムと関数の登録
下記★でワークアイテムtests[ i ].workとそれに対応する関数delayed_work_handler ( )を登録します。
static void test_delayed_init(void)
{
int i;
for (i = 0; i < NUM_TEST_ITEMS; i++) {
tests[i].key = i + 1;
k_delayed_work_init(&tests[i].work, delayed_work_handler); ★
}
}
b) ワークアイテムの実行要求と実行
下記★で実行要求を発行します。
第二引数が待ち時間(ms)です。この例ではWORK_ITEM_WAITが100で定義されているので、実行するごとに100 × ( i + 1 ) msの待ち時間を設定します。
static void coop_delayed_work_main(int arg1, int arg2)
{
int i;
ARG_UNUSED(arg1);
ARG_UNUSED(arg2);
/* Let the preempt thread submit the first work item. */
k_sleep(SUBMIT_WAIT / 2);
for (i = 1; i < NUM_TEST_ITEMS; i += 2) {
TC_PRINT(" - Submitting delayed work %d from"
" coop thread\n", i + 1);
k_delayed_work_submit(&tests[i].work,
(i + 1) * WORK_ITEM_WAIT);★
}
}
「3)システムが用意しているワークキューを利用する方法」の例
ここではアラートを受信し、配送する関数を例に見ます。
a)対象関数の登録
下記★の_K_WORK_INITIALIZER ( )で__alert_deliver ( )を登録し、ワークアイテムalert->work_itemに設定します。
void k_alert_init(struct k_alert *alert, k_alert_handler_t handler,
unsigned int max_num_pending_alerts)
{
:
alert->handler = handler;
alert->send_count = ATOMIC_INIT(0);
alert->work_item = (struct k_work)_K_WORK_INITIALIZER(_alert_deliver);★
:
}
b)ワークアイテムの実行要求と実行
下記のk_work_submit_to_queueによってk_sys_work_qワークキューで__alert_deliver ( )を実行します。
void _impl_k_alert_send(struct k_alert *alert)
{
:
/* add alert's work item to system work queue */
k_work_submit_to_queue(&k_sys_work_q,
&alert->work_item);
:
}
ワークキューの内部構造
サッとですが使用方法について一通り見ました。
では、内部でこれらをどのように管理しているのでしょうか。
上記1)、2)、3)を順に見ていきます。
内部構造「1)自分自身でワークキューを定義する方法」
a) 対象ワークキューの初期化
void k_work_q_start(struct k_work_q *work_q, k_thread_stack_t *stack,
size_t stack_size, int prio)
{
k_queue_init(&work_q->queue);
k_thread_create(&work_q->thread, stack, stack_size, work_q_main,
work_q, 0, 0, prio, 0, 0);
_k_object_init(work_q);
}
1行目:k_queue_init ( )で対象のワークキューのqueueメンバーを初期化します。
2行目:対象のワークキュー用のスレッドを生成します。引数で与えられた優先度で実行し、work_q_main ( )を実行します。
3行目:k_objectについては本稿では割愛します。
static void work_q_main(void *work_q_ptr, void *p2, void *p3)
{
struct k_work_q *work_q = work_q_ptr;
ARG_UNUSED(p2);
ARG_UNUSED(p3);
while (1) {
struct k_work *work;
k_work_handler_t handler;
work = k_queue_get(&work_q->queue, K_FOREVER);
if (!work) {
continue;
}
handler = work->handler;
/* Reset pending state so it can be resubmitted by handler */
if (atomic_test_and_clear_bit(work->flags,
K_WORK_STATE_PENDING)) {
handler(work);
}
/* Make sure we don't hog up the CPU if the FIFO never (or
* very rarely) gets empty.
*/
k_yield();
}
}
下記の処理をwhileループで実行し続けます。
- k_queue_get ( )でワークアイテムを取り出す(k_queue_get ( )を下記に示します)
- ワークアイテムの状態がペンディングであれば、ハンドラを実行する
3)次のループのk_queue_get ( )で★の部分で休眠する。(CONFIG_POLLが無効の場合)。この休眠状態から起床する契機はk_work_submit_to_queue ( )を発行された時になります。(厳密にはk_work_submit_to_queue ( )の内部で呼び出されるk_queue_insert ( )で起床させます)
void *k_queue_get(struct k_queue *queue, s32_t timeout)
{
unsigned int key;
void *data;
key = irq_lock();
if (likely(!sys_slist_is_empty(&queue->data_q))) {
data = sys_slist_get_not_empty(&queue->data_q);
irq_unlock(key);
return data;
}
if (timeout == K_NO_WAIT) {
irq_unlock(key);
return NULL;
}
_pend_current_thread(&queue->wait_q, timeout);
return _Swap(key) ? NULL : _current->base.swap_data; ★
}
b)対象ワークキューと関数の登録
static inline void k_work_init(struct k_work *work, k_work_handler_t handler)
{
1 atomic_clear_bit(work->flags, K_WORK_STATE_PENDING);
2 work->handler = handler;
3 _k_object_init(work);
}
1行目で対象ワークキューのflagsのK_WORK_STATE_PENDINGをクリアします。
このビットは、設定されている = 実行が保留されている状態を示します。
2行目で引数で指定したハンドラをワークアイテムのハンドラに設定します。
3行目でkobjの初期化をしています。
c) キューの登録と実行
static inline void k_work_submit_to_queue(struct k_work_q *work_q,
struct k_work *work)
{
if (!atomic_test_and_set_bit(work->flags, K_WORK_STATE_PENDING)) {
k_queue_append(&work_q->queue, work);
}
}
対象ワークアイテムのflagに保留フラグ (K_WORK_STATE_PENDING) がセットされていない場合、保留フラグをセットして該当処理をキューに繋ぎます。
すでに保留フラグが設定されている時はキューには繋ぎません。つまり、ワークアイテムは同時に一度しか登録できません。
ちなみに、k_work_submit_to_queue ( )以外にも実行要求を行うk_work_submit ( )というAPIがありますが、この関数は下記の通りワークキューk_sys_work_qを指定してk_work_submit_to_queue ( )を呼び出すのみです。
static inline void k_work_submit(struct k_work *work)
{
k_work_submit_to_queue(&k_sys_work_q, work);
}
次に、k_work_submit_to_queue ( )で使用されているk_queue_append ( )を見てみましょう。
void k_queue_append(struct k_queue *queue, void *data)
{
return k_queue_insert(queue, queue->data_q.tail, data);
}
この関数はk_queue_insert ( )を呼び出すだけです。
k_queue_insertは汎用関数ですが、サッと見てみましょう。
void k_queue_insert(struct k_queue *queue, void *prev, void *data)
{
1 unsigned int key = irq_lock();
2 struct k_thread *first_pending_thread;
3 first_pending_thread = _unpend_first_thread(&queue->wait_q);
4 if (first_pending_thread) {
5 prepare_thread_to_run(first_pending_thread, data);
6 if (!_is_in_isr() && _must_switch_threads()) {
7 (void)_Swap(key);
8 return;
9 }
10 irq_unlock(key);
11 return;
12 }
13 sys_slist_insert(&queue->data_q, prev, data);
14 irq_unlock(key);
}
3行目:ペンディングされている最初のスレッドを設定しています。
6~9行目:割込みコンテキストでない、かつ_currentスレッドがpreemptibleスレッドで、first_pending_threadが_currentスレッドより優先度が高い場合、_Swap ( )でfirst_pending_threadを実行します。
内部構造「2)時刻を指定して遅延実行する方法」
a)対象の遅延ワークキューと関数の登録
void k_delayed_work_init(struct k_delayed_work *work, k_work_handler_t handler)
{
k_work_init(&work->work, handler);
_init_timeout(&work->timeout, work_timeout);
work->work_q = NULL;
_k_object_init(work);
}
k_work_init ( )は 「通常のワークキュー」のa)で述べた通りでワークアイテムwork->workのhandlerに指定したハンドラを設定します。
その後、_init_timeout ( )でワークアイテムwork->timeoutのハンドラwork_timeout ( )を設定します。
そしてワークキューworkのワークアイテムwork_qをNULLで初期化します。
b)キューの登録と実行
static inline int k_delayed_work_submit(struct k_delayed_work *work,
s32_t delay)
{
return k_delayed_work_submit_to_queue(&k_sys_work_q, work, delay);
}
実体は以下の通りです。
int k_delayed_work_submit_to_queue(struct k_work_q *work_q,
struct k_delayed_work *work,
s32_t delay)
{
1 int key = irq_lock();
2 int err;
/* Work cannot be active in multiple queues */
3 if (work->work_q && work->work_q != work_q) {
4 err = -EADDRINUSE;
5 goto done;
6 }
/* Cancel if work has been submitted */
7 if (work->work_q == work_q) {
8 err = k_delayed_work_cancel(work);
9 if (err < 0) {
10 goto done;
11 }
12 }
/* Attach workqueue so the timeout callback can submit it */
13 work->work_q = work_q;
14 if (!delay) {
/* Submit work if no ticks is 0 */
15 k_work_submit_to_queue(work_q, &work->work);
16 } else {
/* Add timeout */
17 _add_timeout(NULL, &work->timeout, NULL,
18 _TICK_ALIGN + _ms_to_ticks(delay));
19 }
20 err = 0;
21 done:
22 irq_unlock(key);
23 return err;
24 }
ポイントを絞って説明します。
本関数の核は14-19行目です。
引数delayが0の場合は即座に実行要求を発行します。
引数delayが0以外の場合はタイムアウトキューに設定します。タイマー満了後に実行する関数は第二引数&work->timeoutのfuncメンバ(下記★)となります。
struct _timeout {
sys_dnode_t node;
struct k_thread *thread;
sys_dlist_t *wait_q;
s32_t delta_ticks_from_prev;
_timeout_func_t func; ★
};
内部構造「3)システムが用意しているワークキューを利用する方法」
a)ハンドラの登録
K_WORK_INTIALIZER (_K_WORK_INITIALIZERと同等)でハンドラの登録やその他メンバを初期化した状態のものをワークアイテムに設定します。(■に使用例を示します)
#define _K_WORK_INITIALIZER(work_handler) \
{ \
._reserved = NULL, \
.handler = work_handler, \
.flags = { 0 } \
}
work_handlerをhandlerメンバを設定します。
_reservedメンバはNULL、flagsには0で初期化しています。
■ K_WORK_INITIALIZERの使用例
void k_alert_init(struct k_alert *alert, k_alert_handler_t handler,
unsigned int max_num_pending_alerts)
{
: (略)
alert->work_item = (struct k_work)_K_WORK_INITIALIZER(_alert_deliver);
:(略)
}
ワークアイテムのハンドラ_aleart_deliverを用いて初期化したものをalert->work_itemに設定します。
b)キューの登録と実行
static inline void k_work_submit_to_queue(struct k_work_q *work_q,
struct k_work *work)
{
if (!atomic_test_and_set_bit(work->flags, K_WORK_STATE_PENDING)) {
k_queue_append(&work_q->queue, work);
}
}
「通常のワークキュー」の実行要求と同じ関数です。k_sys_work_qの場合は、第一引数でk_sys_work_qを指定します。
ここまで述べた関数以外にも
- k_work_pending ( ):引数で指定したワークアイテムがペンディング状態か否かを返す
- k_delayed_work_cancel ( ):対象のワークアイテムがキューに繋がっている場合にキューから削除する
などの関数などがあります。
少し駆け足でしたが、理解できたでしょうか。
説明不足の部分もあると思うので、要望があれば遠慮せずにコメントしてください。
次回は、スケジューラにも関連しますが、排他制御について投稿したいと思っています。
それでは、また。
前回:Zephyr入門(スケジューラ:システムスレッド編)
次回:Zephyr入門(排他制御:mutex編)
『各種製品名は、各社の製品名称、商標または登録商標です。本記事に記載されているシステム名、製品名には、必ずしも商標表示((R)、TM)を付記していません。』