前回に引き続き,リングバッファの実装方法についてです。第1回では,OSのAPIを使わない範囲を説明しましたが,今回は,μITRON仕様のAPIを用いて,リングバッファへの書き込み/読み出し関数を複数のタスクから並行に呼べるように排他制御を行ったり,リングバッファが満杯や空の場合に待つ機能を追加します。
なお,この記事は第1回を読んでいることを前提に書いていますので,第1回をお読みでない方は,まずはこちらをお読みください。
この記事に載せているコードは,テストしていません。不具合があった場合にはご容赦ください。
ミューテックスによる排他制御
最初に,write_ringbuf
を複数のタスクから並行に呼び出せるようにしましょう。そのためにまず,第1回の最後の実装が,write_ringbuf
を複数のタスクから並行に呼び出した場合に正しく動作しない理由を明らかにしたいと思います。
bool
write_ringbuf(DATA_T data)
{
if (write_idx > read_idx ? write_idx - read_idx == RINGBUF_SIZE
: read_idx - write_idx == RINGBUF_SIZE) {
/* リングバッファが満杯の場合 */
return(false);
}
else {
ringbuf[write_idx] = data;
write_idx = NEXT_IDX(write_idx);
return(true);
}
}
最初にタスク1がwrite_ringbuf(data1)
を呼び出したとします。リングバッファが満杯でなければ,ringbuf[write_idx]
にdata1
を書き込みます。ちょうどこの直後に,優先度の高いタスク2が起動され,タスク2もwrite_ringbuf(data2)
を呼び出したとします。そうすると,タスク2もringbuf[write_idx]
に自分のdata2
を書き込みますので,タスク1のdata1
は上書きされてしまいます。その後,タスク2もタスク1もwrite_idx
を1つ進めますので,write_idx
は2つ進みますが,write_idx
が1つ進んだ時のringbuf[write_idx]
にはデータが書き込まれません。理解できたでしょうか?
これを解決するには,単純に排他制御すればOKです。排他制御には,ミューテックスを使うことにします。まず,システムコンフィギュレーションファイルに以下の記述を追加し,ミューテックスを1つ生成します。
CRE_MTX(WRITE_MTX, { TA_CEILING, WRITE_HIGH_PRIORITY });
排他制御にはセマフォを使うこともできますが,単なる排他制御であれば,ミューテックスを使うのがお勧めです。上の記述中のTA_CEILING
は,優先度上限プロトコルを使用することを指示するものです。これにより,ミューテックスをロックしたタスクの優先度は,一時的にWRITE_HIGH_PRIORITY
まで引き上げられ,優先度逆転を防止することができます(優先度上限プロトコルによる優先度逆転の防止については,ネットで調べてください)。WRITE_HIGH_PRIORITY
は,write_ringbuf
を呼び出すタスクの中で,最も優先度の高いタスクの優先度に定義します。
このミューテックスを用いて,同時に1つのタスクのみがwrite_ringbuf
の本体を実行できるように排他制御します。
bool
write_ringbuf(DATA_T data)
{
ER rercd;
bool result;
rercd = loc_mtx(WRITE_MTX);
assert(rercd == E_OK);
if (write_idx > read_idx ? write_idx - read_idx == RINGBUF_SIZE
: read_idx - write_idx == RINGBUF_SIZE) {
/* リングバッファが満杯の場合 */
result = false;
}
else {
ringbuf[write_idx] = data;
write_idx = NEXT_IDX(write_idx);
result = true;
}
rercd = unl_mtx(WRITE_MTX);
assert(rercd == E_OK);
return(result);
}
前のコードでは,if文の中から直接returnしていましたが,そのようにするとミューテックスの解放漏れが起こりますので,注意しましょう。
一般に,OSのAPIを用いる場合には,サービスコールがエラーとなる場合を考えることが必要です。が,エラーの処理を網羅的に行うのは面倒で,この記事の本質から外れるため別の機会に譲ることとして,上のコードではエラーは起こらないと仮定しました。ただし,エラーが起こっているのにそれに気付かないのは危険なので,エラーが起こった場合にはassert
を用いてプログラムを停止させるようにしました。
エラーに気付かないと危険なのは,例えば上のコードでloc_mtx
とunl_mtx
の返り値を無視すると,排他制御ができていないのに,それなりに動くプログラムができてしまいます。それなりに動くが,排他制御ができていないために時々誤動作するというわけです。このような不具合は,再現性が低く,見つけるのが大変です。
write_ringbuf
に加えて,read_ringbuf
も複数のタスクから並行に呼び出せるようにするには,もう1つミューテックスを用意して,同時に1つのタスクのみがread_ringbuf
の本体を実行できるように排他制御します。
CRE_MTX(READ_MTX, { TA_CEILING, READ_HIGH_PRIORITY });
bool
read_ringbuf(DATA_T *p_data)
{
ER rercd;
bool result;
rercd = loc_mtx(READ_MTX);
assert(rercd == E_OK);
if (write_idx == read_idx) {
/* リングバッファが空の場合 */
result = false;
}
else {
*p_data = ringbuf[read_idx];
read_idx = NEXT_IDX(read_idx);
result = true;
}
rercd = unl_mtx(READ_MTX);
assert(rercd == E_OK);
return(result);
}
ここでは,書き込み側と読み出し側がなるべく同時に実行できるように,write_ringbuf
とread_ringbuf
に別のミューテックスを用意しましたが,性能にこだわりがないなら,1つのミューテックスを両方の関数で使用することもできます。ちなみに,第1回のdata_count
を用いた実装では,data_count
を書き込み側と読み出し側の両方からアクセスするため,むしろ1つのミューテックスを両方のサービスに使用し,write_ringbuf
とread_ringbuf
が並行に実行されないように排他制御する必要があります。
セマフォによる満杯や空の場合への対処
いよいよ本論です。
ここまでの実装では,リングバッファが満杯の時にwrite_ringbuf
を呼んだ場合と,空の場合にread_ringbuf
を呼んだ場合には,関数からfalse
を返す仕様としていました。これを,リングバッファへの書き込みや読み出しができるようになるまで待つように変更したいと思います。
まずは,write_ringbuf
からです。write_ringbuf
は,リングバッファが満杯の時に待ちたいわけですが,これを,リングバッファの各要素を資源とみなして,それを取り合うと考えると,セマフォを使って制御することができます。具体的には,初期値がRINGBUF_SIZE
のセマフォを用意し,リングバッファに書き込む時に,セマフォを取得するようにします。これにより,write_ringbuf
がRINGBUF_SIZE
回を超えて呼び出されると,セマフォが取得できなくなり,待ち状態となります。一方,セマフォを返却するのは,リングバッファに空きができた時ですから,read_ringbuf
の中ということになります。
コードを作ってみましょう。
CRE_MTX(WRITE_MTX, { TA_CEILING, WRITE_HIGH_PRIORITY });
CRE_MTX(READ_MTX, { TA_CEILING, READ_HIGH_PRIORITY });
CRE_SEM(WRITE_SEM, { TA_TPRI, BINGBUF_SIZE, RINGBUF_SIZE });
TA_TPRI
はセマフォの待ちキューを優先度順とすることを,2つのRINGBUF_SIZE
はセマフォの初期値と最大値を指定しています。TA_TPRI
は,指定しなくても問題ありません(代わりに,TA_NULL
を指定してください)。
void
write_ringbuf(DATA_T data)
{
ER rercd;
rercd = wai_sem(WRITE_SEM);
assert(rercd == E_OK);
rercd = loc_mtx(WRITE_MTX);
assert(rercd == E_OK);
ringbuf[write_idx] = data;
write_idx = NEXT_IDX(write_idx);
rercd = unl_mtx(WRITE_MTX);
assert(rercd == E_OK);
}
bool
read_ringbuf(DATA_T *p_data)
{
ER rercd;
bool result;
rercd = loc_mtx(READ_MTX);
assert(rercd == E_OK);
if (write_idx == read_idx) {
/* リングバッファが空の場合 */
result = false;
}
else {
*p_data = ringbuf[read_idx];
read_idx = NEXT_IDX(read_idx);
result = true;
}
rercd = sig_sem(WRITE_SEM);
assert(rercd == E_OK);
rercd = unl_mtx(READ_MTX);
assert(rercd == E_OK);
return(result);
}
write_ringbuf
の中のバッファが満杯かどうかを判断する複雑なif文がなくなり,コードが読みやすくなりました!これは,セマフォが取得できたということは,バッファが空いていることが保証されるためです。ついでに,write_ringbuf
の返り値をなくしました。
write_ringbuf
中のwai_sem
とloc_mtx
の順序が気になる方がいるかもしれませんが,これはどちらが先でも大丈夫です(振る舞いは少し変わります)。read_ringbuf
中のsig_sem
とunl_mtx
も同様です。
同様に,リングバッファが空の場合に,read_ringbuf
を待つように変更してみましょう。今度は,リングバッファに入っているデータを資源とみなして,それを取り合うと考えて,セマフォで制御します。
具体的には,初期値が0のセマフォを用意し,リングバッファから読み出す時に,セマフォを取得するようにします。これにより,セマフォが空の時にread_ringbuf
が呼び出されると,セマフォが取得できず,待ち状態となります。セマフォを返却するのは,リングバッファにデータが入った時ですから,write_ringbuf
の中ということになります。
CRE_MTX(WRITE_MTX, { TA_CEILING, WRITE_HIGH_PRIORITY });
CRE_MTX(READ_MTX, { TA_CEILING, READ_HIGH_PRIORITY });
CRE_SEM(WRITE_SEM, { TA_TPRI, BINGBUF_SIZE, RINGBUF_SIZE });
CRE_SEM(READ_SEM, { TA_TPRI, 0, RINGBUF_SIZE });
void
write_ringbuf(DATA_T data)
{
ER rercd;
rercd = wai_sem(WRITE_SEM);
assert(rercd == E_OK);
rercd = loc_mtx(WRITE_MTX);
assert(rercd == E_OK);
ringbuf[write_idx] = data;
write_idx = NEXT_IDX(write_idx);
rercd = sig_sem(READ_SEM);
assert(rercd == E_OK);
rercd = unl_mtx(WRITE_MTX);
assert(rercd == E_OK);
}
void
read_ringbuf(DATA_T *p_data)
{
ER rercd;
rercd = wai_sem(READ_SEM);
assert(rercd == E_OK);
rercd = loc_mtx(READ_MTX);
assert(rercd == E_OK);
*p_data = ringbuf[read_idx];
read_idx = NEXT_IDX(read_idx);
rercd = sig_sem(WRITE_SEM);
assert(rercd == E_OK);
rercd = unl_mtx(READ_MTX);
assert(rercd == E_OK);
}
これで,やりたいことは一通りできました。
トリッキーなセマフォの使い方
やりたいことは一通りできましたが,セマフォ2つとミューテックス2つを使う結構重いコードになってしまいました(前述の通り,ミューテックス1つは節約できます)。実は,セマフォ1つで,セマフォ1つとミューテックス1つの役割を兼ねさせることができます。ちょっとトリッキーで,私も人に教えてもらいました。
原理がやや複雑なので,先にコードをお見せします。まずは,書き込み側のセマフォとミューテックスを,1つのセマフォで実現します。
CRE_MTX(READ_MTX, { TA_CEILING, READ_HIGH_PRIORITY });
CRE_SEM(WRITE_SEM, { TA_TPRI, 1, 1 });
CRE_SEM(READ_SEM, { TA_TPRI, 0, RINGBUF_SIZE });
void
write_ringbuf(DATA_T data)
{
ER rercd;
rercd = wai_sem(WRITE_SEM);
assert(rercd == E_OK);
ringbuf[write_idx] = data;
write_idx = NEXT_IDX(write_idx);
rercd = sig_sem(READ_SEM);
assert(rercd == E_OK);
if (write_idx > read_idx ? write_idx - read_idx != RINGBUF_SIZE
: read_idx - write_idx != RINGBUF_SIZE) {
/* リングバッファが満杯にならなかった場合 */
rercd = sig_sem(WRITE_SEM);
assert(rercd == E_OK);
}
}
void
read_ringbuf(DATA_T *p_data)
{
bool was_full;
ER rercd;
rercd = wai_sem(READ_SEM);
assert(rercd == E_OK);
rercd = loc_mtx(READ_MTX);
assert(rercd == E_OK);
was_full = (write_idx > read_idx ? write_idx - read_idx == RINGBUF_SIZE
: read_idx - write_idx == RINGBUF_SIZE);
*p_data = ringbuf[read_idx];
read_idx = NEXT_IDX(read_idx);
if (was_full) {
/* リングバッファが満杯だった場合 */
rercd = sig_sem(WRITE_SEM);
assert(rercd == E_OK);
}
rercd = unl_mtx(READ_MTX);
assert(rercd == E_OK);
}
使用するセマフォ(WRITE_SEM)は,初期値,最大値とも1です。これは,排他制御に使用する場合と同じ設定です。セマフォを排他制御に用いるのであれば,write_ringbuf
の先頭で取得して,リターン前に返却することになります。上のコードでは,write_ringbuf
の先頭で取得していますが,リターン前にはリングバッファが満杯にならなかった場合のみ返却しています。逆に言うと,リングバッファが満杯になった時は,セマフォを返却しません。これにより,リングバッファが満杯の時は,新たに呼ばれたwrite_ringbuf
がセマフォを取得できず,待ち状態になります。この待ち状態を解除するのは,read_ringbuf
でリングバッファに新たに空きができた時ということになります。
整理すると,このセマフォ(WRITE_SEM)は,リングバッファに空きがあり,かつ,write_ringbuf
中のクリティカルセクションを処理中のタスクがない場合に,値が1になります。逆に言うと,リングバッファが満杯か,write_ringbuf
を処理中のタスクがある場合には,0になります。
同様に,読み出し側のセマフォとミューテックスも,1つのセマフォで実現することができます。読み出し側のセマフォは,初期値を0にしておきます。
CRE_SEM(WRITE_SEM, { TA_TPRI, 1, 1 });
CRE_SEM(READ_SEM, { TA_TPRI, 0, 1 });
void
write_ringbuf(DATA_T data)
{
bool was_empty;
ER rercd;
rercd = wai_sem(WRITE_SEM);
assert(rercd == E_OK);
was_empty = (write_idx == read_idx);
ringbuf[write_idx] = data;
write_idx = NEXT_IDX(write_idx);
if (was_empty) {
/* リングバッファが空だった場合 */
rercd = sig_sem(READ_SEM);
assert(rercd == E_OK);
}
if (write_idx > read_idx ? write_idx - read_idx != RINGBUF_SIZE
: read_idx - write_idx != RINGBUF_SIZE) {
/* リングバッファが満杯にならなかった場合 */
rercd = sig_sem(WRITE_SEM);
assert(rercd == E_OK);
}
}
void
read_ringbuf(DATA_T *p_data)
{
bool was_full;
ER rercd;
rercd = wai_sem(READ_SEM);
assert(rercd == E_OK);
was_full = (write_idx > read_idx ? write_idx - read_idx == RINGBUF_SIZE
: read_idx - write_idx == RINGBUF_SIZE);
*p_data = ringbuf[read_idx];
read_idx = NEXT_IDX(read_idx);
if (was_full) {
/* リングバッファが満杯だった場合 */
rercd = sig_sem(WRITE_SEM);
assert(rercd == E_OK);
}
if (write_idx != read_idx) {
/* リングバッファが空にならなかった場合 */
rercd = sig_sem(READ_SEM);
assert(rercd == E_OK);
}
}
セマフォのこの使い方は,排他制御と似ていますが,ミューテックスでは実現できません。ミューテックスは,ロックしたタスクが解放しなければならないためです。
ちなみに,TOPPERSカーネルに同梱されているシリアルインタフェースドライバでは,セマフォをこのパターンで使っています。シリアルインタフェースドライバでは,タスクと割込みハンドラの間でリングバッファを使っていますので,コードの構成は少し違っています。
データキュー:カーネルの提供するリングバッファ機能
μITRON仕様のAPIを用いたリングバッファの実装TIPSは以上ですが,リングバッファはしばしば使われるタスク間通信機構であるため,μITRON仕様には,カーネルの機能としてリングバッファがサポートされています。それがデータキューです。
データキューでは,リングバッファに格納するデータ型(DATA_T
)は,intptr_t
に決められています。それより大きいデータを扱いたい場合には,共有メモリ上にデータを置いて,その先頭番地をデータキュー送受信する必要があります。