10
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

TOPPERSAdvent Calendar 2023

Day 1

ITRONプログラミングTIPS 第1回:リングバッファの実装

Last updated at Posted at 2023-11-26

TOPPERSカーネルのAPI仕様は,μITRON仕様を拡張(一部,削除した機能もあります)したものになっていますので,TOPPERSカーネルを使いこなすには,μITRON仕様のAPIの使いこなしが重要です。そこで,今年のAdvent Calendarでは,μITRON仕様のAPIを用いたプログラミングTIPSをいくつか紹介したいと思っています(何回書くことができるか,自信がありませんが…)。

最初の題材として,セマフォを使ってリングバッファを実現する方法を紹介しようと思ったのですが,その前に,リングバッファの実装方法のTIPSが意外とネット上に見つからなかったので,まずはそこから紹介したいと思います。というわけで,今回はμITRON仕様のAPIは登場しません。

リングバッファとは?

リングバッファ(循環バッファとも呼ばれる)は,タスク間の通信や,デバイスドライバ内で良く使われるデータ構造で,今さら説明するまでもないと思います。ご存じのない方は,ネット上で検索すれば,解説したページが見つかるものと思います。私が見つけたページを2つ挙げておきます。

https://future-architect.github.io/articles/20210705a/
https://ufcpp.net/study/algorithm/col_circular.html

リングバッファのシンプルな実装

リングバッファを実装する時に迷うのが,バッファへの書き込み位置(リングバッファ中の次にデータを書く場所)と読み出し位置(リングバッファ中の次にデータを読む場所)の持ち方です。最もシンプルな方法は,次の方法でしょう(DATA_Tは,リングバッファに入れるデータの型です)。

DATA_T ringbuf[RINGBUF_SIZE];
unsigned int write_idx, read_idx;

void
init_ringbuf(void)
{
	write_idx = 0;
	read_idx = 0;
}

bool
write_ringbuf(DATA_T data)
{
	if (write_idx - read_idx == RINGBUF_SIZE) {
		/* リングバッファが満杯の場合 */
		return(false);
	}
	else {
		ringbuf[write_idx % RINGBUF_SIZE] = data;
		write_idx += 1;
		return(true);
	}
}

bool
read_ringbuf(DATA_T *p_data)
{
	if (write_idx == read_idx) {
		/* リングバッファが空の場合 */
		return(false);
	}
	else {
		*p_data = ringbuf[read_idx % RINGBUF_SIZE];
		read_idx += 1;
		return(true);
	}
}

この方法は,シンプルな上に,後で述べるような良い性質があるのですが,write_idxread_idxがオーバーフローした場合が問題です。具体的には,RINGBUF_SIZEが2の冪乗でない場合には,write_idxread_idxUINT_MAXから0に戻った時に,正しく動作しません。

インデックスのオーバーフローへの対処

write_idxread_idxのオーバーフローに対処するために,これらがRINGBUF_SIZEに達したら,0に戻す方法が考えられます。

DATA_T ringbuf[RINGBUF_SIZE];
unsigned int write_idx, read_idx;

void
init_ringbuf(void)
{
	write_idx = 0;
	read_idx = 0;
}

#define NEXT_IDX(idx)	(((idx) + 1) % RINGBUF_SIZE)

bool
write_ringbuf(DATA_T data)
{
	if (NEXT_IDX(write_idx) == read_idx) {
		/* リングバッファが満杯の場合 */
		return(false);
	}
	else {
		ringbuf[write_idx] = data;
		write_idx = NEXT_IDX(write_idx);
		return(true);
	}
}

bool
read_ringbuf(DATA_T *p_data)
{
	if (write_idx == read_idx) {
		/* リングバッファが空の場合 */
		return(false);
	}
	else {
		*p_data = ringbuf[read_idx];
		read_idx = NEXT_IDX(read_idx);
		return(true);
	}
}

このコードでは,write_idxを1つ進めるとread_idxに一致する場合に,リングバッファが満杯と判断しています。そのため,リングバッファに入れられるデータの数がRINGBUF_SIZE-1となってしまい,リングバッファをすべて使い切ることができません。これは,メモリ制約が厳しい組込みシステムでは,避けたいところです。

リングバッファをすべて使い切ろうとすると,write_idxread_idxが一致してしまい,リングバッファが満杯なのか空なのかが,区別できなくなります。write_idxread_idxの差は,0〜RINGBUF_SIZE-1RINGBUF_SIZE-1種類の状態しか取れないのに対して,リングバッファに入るデータの数は,0〜RINGBUF_SIZERINGBUF_SIZE種類の状態があるので,どうやっても表し切れません。

リングバッファをすべて使い切る

リングバッファをすべて使い切るために,リングバッファに入っているデータの数を保持する変数を持つ方法が考えられます。

DATA_T ringbuf[RINGBUF_SIZE];
unsigned int write_idx, read_idx;
unsigned int data_count;

void
init_ringbuf(void)
{
	write_idx = 0;
	read_idx = 0;
	data_count = 0;
}

#define NEXT_IDX(idx)	(((idx) + 1) % RINGBUF_SIZE)

bool
write_ringbuf(DATA_T data)
{
	if (data_count == RINGBUF_SIZE) {
		/* リングバッファが満杯の場合 */
		return(false);
	}
	else {
		ringbuf[write_idx] = data;
		write_idx = NEXT_IDX(write_idx);
		data_count += 1;
		return(true);
	}
}

bool
read_ringbuf(DATA_T *p_data)
{
	if (data_count == 0) {
		/* リングバッファが空の場合 */
		return(false);
	}
	else {
		*p_data = ringbuf[read_idx];
		read_idx = NEXT_IDX(read_idx);
		data_count -= 1;
		return(true);
	}
}

このコードは,変数は1つ増えてしまいますが,リングバッファをすべて使い切ることができますし,コードもシンプルで読みやすいです。そのため,TOPPERSカーネル内のリングバッファには,この方法を用いています。

ところが,このコードは,write_ringbufread_ringbufを別のタスクから並行に呼び出すと問題が起こります(なぜかは,わかりますね?)。逆に言うと,これまでのコードは,何の排他制御もしていなかったのに,write_ringbufread_ringbufを別のタスクから並行に呼び出すことができたのです!これは,write_idxwrite_ringbufのみが書き込み,read_idxread_ringbufのみが書き込むためです。また,ringbufへの書き込み/読み出しの後に,write_idxread_idxを更新していることも重要です。

なお,ここでは,シングルプロセッサに話しを限ります。マルチプロセッサになると話しは違ってきますが,それについては後述します。

ちなみに,カーネル内のリングバッファは,排他制御した上でアクセスしますので,このことは問題になりません。

ちょっとトリッキーな方法

実は,write_idxread_idxのオーバーフローに対処でき,リングバッファをすべて使い切ることができ,write_ringbufread_ringbufを並行に呼び出せる方法があります。それは,write_idxread_idxを,RINGBUF_SIZEの2倍まで回す方法です。

DATA_T ringbuf[RINGBUF_SIZE];
unsigned int write_idx, read_idx;

void
init_ringbuf(void)
{
	write_idx = 0;
	read_idx = 0;
}

#define NEXT_IDX(idx)	(((idx) + 1) % (RINGBUF_SIZE * 2))

bool
write_ringbuf(DATA_T data)
{
	if (write_idx > read_idx ? write_idx - read_idx == RINGBUF_SIZE
							 : read_idx - write_idx == RINGBUF_SIZE) {
		/* リングバッファが満杯の場合 */
		return(false);
	}
	else {
		ringbuf[write_idx] = data;
		write_idx = NEXT_IDX(write_idx);
		return(true);
	}
}

bool
read_ringbuf(DATA_T *p_data)
{
	if (write_idx == read_idx) {
		/* リングバッファが空の場合 */
		return(false);
	}
	else {
		*p_data = ringbuf[read_idx];
		read_idx = NEXT_IDX(read_idx);
		return(true);
	}
}

write_idxread_idxを,RINGBUF_SIZEの2倍まで回すことで,write_idxread_idxの差でRINGBUF_SIZE種類の状態を表すことが可能になったわけです。

マルチプロセッサでは?

先ほど,マルチプロセッサになると話しは違ってきますと書きましたが,どう違ってくるのでしょうか?

マルチプロセッサのアーキテクチャにもよりますが,最近のマルチプロセッサシステムでは,プロセッサが実行した命令の順序で,メモリアクセスがされるとは限りません。そのため例えば,write_ringbufの中で,ringbuf[write_idx]への書き込み前に,write_idxが書き込まれると,read_ringbuf側で,書き込み前のringbufを読んでしまう可能性があります。同様に,read_ringbufの中で,ringbuf[read_idx]からの読み出しとread_idxの書き込みが逆順になるのも不都合です。そこで,それらの間にメモリバリア命令を入れる必要があります。

前述のコードは,メモリバリア命令を入れることで,マルチプロセッサでも正しく動作するようになりますが,性能を上げようとすると,さらに,write_idxread_idxへのアクセス競合が問題になります。この対処方法を以下のページに見つけましたので,この議論はそちらに譲りたいと思います。

次回予告

今回紹介したリングバッファの実装は,いずれも,write_ringbufを複数のタスクから並行に呼び出した場合や,read_ringbufを複数のタスクから並行に呼び出した場合には,正しく動作しません。このような場合には,タスク間で排他制御が必要になり,いよいよOSのAPIの出番です。

また,リングバッファが満杯の場合にwrite_ringbufを待ち状態にしたり,リングバッファが空の場合にread_ringbufを待ち状態にする場合にも,OSのAPIを使うことが必要です。

次回は,μITRON仕様のAPIを用いて,これらの課題を解決します。

10
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?