TOPPERSカーネルのAPI仕様は,μITRON仕様を拡張(一部,削除した機能もあります)したものになっていますので,TOPPERSカーネルを使いこなすには,μITRON仕様のAPIの使いこなしが重要です。そこで,今年のAdvent Calendarでは,μITRON仕様のAPIを用いたプログラミングTIPSをいくつか紹介したいと思っています(何回書くことができるか,自信がありませんが…)。
最初の題材として,セマフォを使ってリングバッファを実現する方法を紹介しようと思ったのですが,その前に,リングバッファの実装方法のTIPSが意外とネット上に見つからなかったので,まずはそこから紹介したいと思います。というわけで,今回はμITRON仕様のAPIは登場しません。
リングバッファとは?
リングバッファ(循環バッファとも呼ばれる)は,タスク間の通信や,デバイスドライバ内で良く使われるデータ構造で,今さら説明するまでもないと思います。ご存じのない方は,ネット上で検索すれば,解説したページが見つかるものと思います。私が見つけたページを2つ挙げておきます。
https://future-architect.github.io/articles/20210705a/
https://ufcpp.net/study/algorithm/col_circular.html
リングバッファのシンプルな実装
リングバッファを実装する時に迷うのが,バッファへの書き込み位置(リングバッファ中の次にデータを書く場所)と読み出し位置(リングバッファ中の次にデータを読む場所)の持ち方です。最もシンプルな方法は,次の方法でしょう(DATA_T
は,リングバッファに入れるデータの型です)。
DATA_T ringbuf[RINGBUF_SIZE];
unsigned int write_idx, read_idx;
void
init_ringbuf(void)
{
write_idx = 0;
read_idx = 0;
}
bool
write_ringbuf(DATA_T data)
{
if (write_idx - read_idx == RINGBUF_SIZE) {
/* リングバッファが満杯の場合 */
return(false);
}
else {
ringbuf[write_idx % RINGBUF_SIZE] = data;
write_idx += 1;
return(true);
}
}
bool
read_ringbuf(DATA_T *p_data)
{
if (write_idx == read_idx) {
/* リングバッファが空の場合 */
return(false);
}
else {
*p_data = ringbuf[read_idx % RINGBUF_SIZE];
read_idx += 1;
return(true);
}
}
この方法は,シンプルな上に,後で述べるような良い性質があるのですが,write_idx
とread_idx
がオーバーフローした場合が問題です。具体的には,RINGBUF_SIZE
が2の冪乗でない場合には,write_idx
とread_idx
がUINT_MAX
から0に戻った時に,正しく動作しません。
インデックスのオーバーフローへの対処
write_idx
とread_idx
のオーバーフローに対処するために,これらがRINGBUF_SIZE
に達したら,0に戻す方法が考えられます。
DATA_T ringbuf[RINGBUF_SIZE];
unsigned int write_idx, read_idx;
void
init_ringbuf(void)
{
write_idx = 0;
read_idx = 0;
}
#define NEXT_IDX(idx) (((idx) + 1) % RINGBUF_SIZE)
bool
write_ringbuf(DATA_T data)
{
if (NEXT_IDX(write_idx) == read_idx) {
/* リングバッファが満杯の場合 */
return(false);
}
else {
ringbuf[write_idx] = data;
write_idx = NEXT_IDX(write_idx);
return(true);
}
}
bool
read_ringbuf(DATA_T *p_data)
{
if (write_idx == read_idx) {
/* リングバッファが空の場合 */
return(false);
}
else {
*p_data = ringbuf[read_idx];
read_idx = NEXT_IDX(read_idx);
return(true);
}
}
このコードでは,write_idx
を1つ進めるとread_idx
に一致する場合に,リングバッファが満杯と判断しています。そのため,リングバッファに入れられるデータの数がRINGBUF_SIZE-1
となってしまい,リングバッファをすべて使い切ることができません。これは,メモリ制約が厳しい組込みシステムでは,避けたいところです。
リングバッファをすべて使い切ろうとすると,write_idx
とread_idx
が一致してしまい,リングバッファが満杯なのか空なのかが,区別できなくなります。write_idx
とread_idx
の差は,0〜RINGBUF_SIZE-1
のRINGBUF_SIZE-1
種類の状態しか取れないのに対して,リングバッファに入るデータの数は,0〜RINGBUF_SIZE
のRINGBUF_SIZE
種類の状態があるので,どうやっても表し切れません。
リングバッファをすべて使い切る
リングバッファをすべて使い切るために,リングバッファに入っているデータの数を保持する変数を持つ方法が考えられます。
DATA_T ringbuf[RINGBUF_SIZE];
unsigned int write_idx, read_idx;
unsigned int data_count;
void
init_ringbuf(void)
{
write_idx = 0;
read_idx = 0;
data_count = 0;
}
#define NEXT_IDX(idx) (((idx) + 1) % RINGBUF_SIZE)
bool
write_ringbuf(DATA_T data)
{
if (data_count == RINGBUF_SIZE) {
/* リングバッファが満杯の場合 */
return(false);
}
else {
ringbuf[write_idx] = data;
write_idx = NEXT_IDX(write_idx);
data_count += 1;
return(true);
}
}
bool
read_ringbuf(DATA_T *p_data)
{
if (data_count == 0) {
/* リングバッファが空の場合 */
return(false);
}
else {
*p_data = ringbuf[read_idx];
read_idx = NEXT_IDX(read_idx);
data_count -= 1;
return(true);
}
}
このコードは,変数は1つ増えてしまいますが,リングバッファをすべて使い切ることができますし,コードもシンプルで読みやすいです。そのため,TOPPERSカーネル内のリングバッファには,この方法を用いています。
ところが,このコードは,write_ringbuf
とread_ringbuf
を別のタスクから並行に呼び出すと問題が起こります(なぜかは,わかりますね?)。逆に言うと,これまでのコードは,何の排他制御もしていなかったのに,write_ringbuf
とread_ringbuf
を別のタスクから並行に呼び出すことができたのです!これは,write_idx
はwrite_ringbuf
のみが書き込み,read_idx
はread_ringbuf
のみが書き込むためです。また,ringbuf
への書き込み/読み出しの後に,write_idx
/read_idx
を更新していることも重要です。
なお,ここでは,シングルプロセッサに話しを限ります。マルチプロセッサになると話しは違ってきますが,それについては後述します。
ちなみに,カーネル内のリングバッファは,排他制御した上でアクセスしますので,このことは問題になりません。
ちょっとトリッキーな方法
実は,write_idx
とread_idx
のオーバーフローに対処でき,リングバッファをすべて使い切ることができ,write_ringbuf
とread_ringbuf
を並行に呼び出せる方法があります。それは,write_idx
とread_idx
を,RINGBUF_SIZE
の2倍まで回す方法です。
DATA_T ringbuf[RINGBUF_SIZE];
unsigned int write_idx, read_idx;
void
init_ringbuf(void)
{
write_idx = 0;
read_idx = 0;
}
#define NEXT_IDX(idx) (((idx) + 1) % (RINGBUF_SIZE * 2))
bool
write_ringbuf(DATA_T data)
{
if (write_idx > read_idx ? write_idx - read_idx == RINGBUF_SIZE
: read_idx - write_idx == RINGBUF_SIZE) {
/* リングバッファが満杯の場合 */
return(false);
}
else {
ringbuf[write_idx] = data;
write_idx = NEXT_IDX(write_idx);
return(true);
}
}
bool
read_ringbuf(DATA_T *p_data)
{
if (write_idx == read_idx) {
/* リングバッファが空の場合 */
return(false);
}
else {
*p_data = ringbuf[read_idx];
read_idx = NEXT_IDX(read_idx);
return(true);
}
}
write_idx
とread_idx
を,RINGBUF_SIZE
の2倍まで回すことで,write_idx
とread_idx
の差でRINGBUF_SIZE
種類の状態を表すことが可能になったわけです。
マルチプロセッサでは?
先ほど,マルチプロセッサになると話しは違ってきますと書きましたが,どう違ってくるのでしょうか?
マルチプロセッサのアーキテクチャにもよりますが,最近のマルチプロセッサシステムでは,プロセッサが実行した命令の順序で,メモリアクセスがされるとは限りません。そのため例えば,write_ringbuf
の中で,ringbuf[write_idx]
への書き込み前に,write_idx
が書き込まれると,read_ringbuf
側で,書き込み前のringbuf
を読んでしまう可能性があります。同様に,read_ringbuf
の中で,ringbuf[read_idx]
からの読み出しとread_idx
の書き込みが逆順になるのも不都合です。そこで,それらの間にメモリバリア命令を入れる必要があります。
前述のコードは,メモリバリア命令を入れることで,マルチプロセッサでも正しく動作するようになりますが,性能を上げようとすると,さらに,write_idx
とread_idx
へのアクセス競合が問題になります。この対処方法を以下のページに見つけましたので,この議論はそちらに譲りたいと思います。
次回予告
今回紹介したリングバッファの実装は,いずれも,write_ringbuf
を複数のタスクから並行に呼び出した場合や,read_ringbuf
を複数のタスクから並行に呼び出した場合には,正しく動作しません。このような場合には,タスク間で排他制御が必要になり,いよいよOSのAPIの出番です。
また,リングバッファが満杯の場合にwrite_ringbuf
を待ち状態にしたり,リングバッファが空の場合にread_ringbuf
を待ち状態にする場合にも,OSのAPIを使うことが必要です。
次回は,μITRON仕様のAPIを用いて,これらの課題を解決します。