LoginSignup
5
0

ITRONプログラミングTIPS 第2回:セマフォを用いたリングバッファの実装

Last updated at Posted at 2023-12-06

前回に引き続き,リングバッファの実装方法についてです。第1回では,OSのAPIを使わない範囲を説明しましたが,今回は,μITRON仕様のAPIを用いて,リングバッファへの書き込み/読み出し関数を複数のタスクから並行に呼べるように排他制御を行ったり,リングバッファが満杯や空の場合に待つ機能を追加します。

なお,この記事は第1回を読んでいることを前提に書いていますので,第1回をお読みでない方は,まずはこちらをお読みください。

この記事に載せているコードは,テストしていません。不具合があった場合にはご容赦ください。

ミューテックスによる排他制御

最初に,write_ringbufを複数のタスクから並行に呼び出せるようにしましょう。そのためにまず,第1回の最後の実装が,write_ringbufを複数のタスクから並行に呼び出した場合に正しく動作しない理由を明らかにしたいと思います。

第1回の最後の実装.
bool
write_ringbuf(DATA_T data)
{
	if (write_idx > read_idx ? write_idx - read_idx == RINGBUF_SIZE
							 : read_idx - write_idx == RINGBUF_SIZE) {
		/* リングバッファが満杯の場合 */
		return(false);
	}
	else {
		ringbuf[write_idx] = data;
		write_idx = NEXT_IDX(write_idx);
		return(true);
	}
}

最初にタスク1がwrite_ringbuf(data1)を呼び出したとします。リングバッファが満杯でなければ,ringbuf[write_idx]data1を書き込みます。ちょうどこの直後に,優先度の高いタスク2が起動され,タスク2もwrite_ringbuf(data2)を呼び出したとします。そうすると,タスク2もringbuf[write_idx]に自分のdata2を書き込みますので,タスク1のdata1は上書きされてしまいます。その後,タスク2もタスク1もwrite_idxを1つ進めますので,write_idxは2つ進みますが,write_idxが1つ進んだ時のringbuf[write_idx]にはデータが書き込まれません。理解できたでしょうか?

これを解決するには,単純に排他制御すればOKです。排他制御には,ミューテックスを使うことにします。まず,システムコンフィギュレーションファイルに以下の記述を追加し,ミューテックスを1つ生成します。

システムコンフィギュレーションファイル.
CRE_MTX(WRITE_MTX, { TA_CEILING, WRITE_HIGH_PRIORITY });

排他制御にはセマフォを使うこともできますが,単なる排他制御であれば,ミューテックスを使うのがお勧めです。上の記述中のTA_CEILINGは,優先度上限プロトコルを使用することを指示するものです。これにより,ミューテックスをロックしたタスクの優先度は,一時的にWRITE_HIGH_PRIORITYまで引き上げられ,優先度逆転を防止することができます(優先度上限プロトコルによる優先度逆転の防止については,ネットで調べてください)。WRITE_HIGH_PRIORITYは,write_ringbufを呼び出すタスクの中で,最も優先度の高いタスクの優先度に定義します。

このミューテックスを用いて,同時に1つのタスクのみがwrite_ringbufの本体を実行できるように排他制御します。

bool
write_ringbuf(DATA_T data)
{
	ER		rercd;
	bool	result;

	rercd = loc_mtx(WRITE_MTX);
	assert(rercd == E_OK);
	if (write_idx > read_idx ? write_idx - read_idx == RINGBUF_SIZE
							 : read_idx - write_idx == RINGBUF_SIZE) {
		/* リングバッファが満杯の場合 */
		result = false;
	}
	else {
		ringbuf[write_idx] = data;
		write_idx = NEXT_IDX(write_idx);
		result = true;
	}
	rercd = unl_mtx(WRITE_MTX);
	assert(rercd == E_OK);
	return(result);
}

前のコードでは,if文の中から直接returnしていましたが,そのようにするとミューテックスの解放漏れが起こりますので,注意しましょう。

一般に,OSのAPIを用いる場合には,サービスコールがエラーとなる場合を考えることが必要です。が,エラーの処理を網羅的に行うのは面倒で,この記事の本質から外れるため別の機会に譲ることとして,上のコードではエラーは起こらないと仮定しました。ただし,エラーが起こっているのにそれに気付かないのは危険なので,エラーが起こった場合にはassertを用いてプログラムを停止させるようにしました。

エラーに気付かないと危険なのは,例えば上のコードでloc_mtxunl_mtxの返り値を無視すると,排他制御ができていないのに,それなりに動くプログラムができてしまいます。それなりに動くが,排他制御ができていないために時々誤動作するというわけです。このような不具合は,再現性が低く,見つけるのが大変です。

write_ringbufに加えて,read_ringbufも複数のタスクから並行に呼び出せるようにするには,もう1つミューテックスを用意して,同時に1つのタスクのみがread_ringbufの本体を実行できるように排他制御します。

システムコンフィギュレーションファイル.
CRE_MTX(READ_MTX, { TA_CEILING, READ_HIGH_PRIORITY });
bool
read_ringbuf(DATA_T *p_data)
{
	ER		rercd;
	bool	result;

	rercd = loc_mtx(READ_MTX);
	assert(rercd == E_OK);
	if (write_idx == read_idx) {
		/* リングバッファが空の場合 */
		result = false;
	}
	else {
		*p_data = ringbuf[read_idx];
		read_idx = NEXT_IDX(read_idx);
		result = true;
	}
	rercd = unl_mtx(READ_MTX);
	assert(rercd == E_OK);
	return(result);
}

ここでは,書き込み側と読み出し側がなるべく同時に実行できるように,write_ringbufread_ringbufに別のミューテックスを用意しましたが,性能にこだわりがないなら,1つのミューテックスを両方の関数で使用することもできます。ちなみに,第1回のdata_countを用いた実装では,data_countを書き込み側と読み出し側の両方からアクセスするため,むしろ1つのミューテックスを両方のサービスに使用し,write_ringbufread_ringbufが並行に実行されないように排他制御する必要があります。

セマフォによる満杯や空の場合への対処

いよいよ本論です。

ここまでの実装では,リングバッファが満杯の時にwrite_ringbufを呼んだ場合と,空の場合にread_ringbufを呼んだ場合には,関数からfalseを返す仕様としていました。これを,リングバッファへの書き込みや読み出しができるようになるまで待つように変更したいと思います。

まずは,write_ringbufからです。write_ringbufは,リングバッファが満杯の時に待ちたいわけですが,これを,リングバッファの各要素を資源とみなして,それを取り合うと考えると,セマフォを使って制御することができます。具体的には,初期値がRINGBUF_SIZEのセマフォを用意し,リングバッファに書き込む時に,セマフォを取得するようにします。これにより,write_ringbufRINGBUF_SIZE回を超えて呼び出されると,セマフォが取得できなくなり,待ち状態となります。一方,セマフォを返却するのは,リングバッファに空きができた時ですから,read_ringbufの中ということになります。

コードを作ってみましょう。

システムコンフィギュレーションファイル.
CRE_MTX(WRITE_MTX, { TA_CEILING, WRITE_HIGH_PRIORITY });
CRE_MTX(READ_MTX, { TA_CEILING, READ_HIGH_PRIORITY });
CRE_SEM(WRITE_SEM, { TA_TPRI, BINGBUF_SIZE, RINGBUF_SIZE });

TA_TPRIはセマフォの待ちキューを優先度順とすることを,2つのRINGBUF_SIZEはセマフォの初期値と最大値を指定しています。TA_TPRIは,指定しなくても問題ありません(代わりに,TA_NULLを指定してください)。

void
write_ringbuf(DATA_T data)
{
	ER		rercd;

	rercd = wai_sem(WRITE_SEM);
	assert(rercd == E_OK);
	rercd = loc_mtx(WRITE_MTX);
	assert(rercd == E_OK);

	ringbuf[write_idx] = data;
	write_idx = NEXT_IDX(write_idx);

	rercd = unl_mtx(WRITE_MTX);
	assert(rercd == E_OK);
}

bool
read_ringbuf(DATA_T *p_data)
{
	ER		rercd;
	bool	result;

	rercd = loc_mtx(READ_MTX);
	assert(rercd == E_OK);
	if (write_idx == read_idx) {
		/* リングバッファが空の場合 */
		result = false;
	}
	else {
		*p_data = ringbuf[read_idx];
		read_idx = NEXT_IDX(read_idx);
		result = true;
	}
	rercd = sig_sem(WRITE_SEM);
	assert(rercd == E_OK);
	rercd = unl_mtx(READ_MTX);
	assert(rercd == E_OK);
	return(result);
}

write_ringbufの中のバッファが満杯かどうかを判断する複雑なif文がなくなり,コードが読みやすくなりました!これは,セマフォが取得できたということは,バッファが空いていることが保証されるためです。ついでに,write_ringbufの返り値をなくしました。

write_ringbuf中のwai_semloc_mtxの順序が気になる方がいるかもしれませんが,これはどちらが先でも大丈夫です(振る舞いは少し変わります)。read_ringbuf中のsig_semunl_mtxも同様です。

同様に,リングバッファが空の場合に,read_ringbufを待つように変更してみましょう。今度は,リングバッファに入っているデータを資源とみなして,それを取り合うと考えて,セマフォで制御します。

具体的には,初期値が0のセマフォを用意し,リングバッファから読み出す時に,セマフォを取得するようにします。これにより,セマフォが空の時にread_ringbufが呼び出されると,セマフォが取得できず,待ち状態となります。セマフォを返却するのは,リングバッファにデータが入った時ですから,write_ringbufの中ということになります。

システムコンフィギュレーションファイル.
CRE_MTX(WRITE_MTX, { TA_CEILING, WRITE_HIGH_PRIORITY });
CRE_MTX(READ_MTX, { TA_CEILING, READ_HIGH_PRIORITY });
CRE_SEM(WRITE_SEM, { TA_TPRI, BINGBUF_SIZE, RINGBUF_SIZE });
CRE_SEM(READ_SEM, { TA_TPRI, 0, RINGBUF_SIZE });
void
write_ringbuf(DATA_T data)
{
	ER		rercd;

	rercd = wai_sem(WRITE_SEM);
	assert(rercd == E_OK);
	rercd = loc_mtx(WRITE_MTX);
	assert(rercd == E_OK);

	ringbuf[write_idx] = data;
	write_idx = NEXT_IDX(write_idx);

	rercd = sig_sem(READ_SEM);
	assert(rercd == E_OK);
	rercd = unl_mtx(WRITE_MTX);
	assert(rercd == E_OK);
}

void
read_ringbuf(DATA_T *p_data)
{
	ER		rercd;

	rercd = wai_sem(READ_SEM);
	assert(rercd == E_OK);
	rercd = loc_mtx(READ_MTX);
	assert(rercd == E_OK);

	*p_data = ringbuf[read_idx];
	read_idx = NEXT_IDX(read_idx);

	rercd = sig_sem(WRITE_SEM);
	assert(rercd == E_OK);
	rercd = unl_mtx(READ_MTX);
	assert(rercd == E_OK);
}

これで,やりたいことは一通りできました。

トリッキーなセマフォの使い方

やりたいことは一通りできましたが,セマフォ2つとミューテックス2つを使う結構重いコードになってしまいました(前述の通り,ミューテックス1つは節約できます)。実は,セマフォ1つで,セマフォ1つとミューテックス1つの役割を兼ねさせることができます。ちょっとトリッキーで,私も人に教えてもらいました。

原理がやや複雑なので,先にコードをお見せします。まずは,書き込み側のセマフォとミューテックスを,1つのセマフォで実現します。

システムコンフィギュレーションファイル.
CRE_MTX(READ_MTX, { TA_CEILING, READ_HIGH_PRIORITY });
CRE_SEM(WRITE_SEM, { TA_TPRI, 1, 1 });
CRE_SEM(READ_SEM, { TA_TPRI, 0, RINGBUF_SIZE });
void
write_ringbuf(DATA_T data)
{
	ER		rercd;

	rercd = wai_sem(WRITE_SEM);
	assert(rercd == E_OK);

	ringbuf[write_idx] = data;
	write_idx = NEXT_IDX(write_idx);

	rercd = sig_sem(READ_SEM);
	assert(rercd == E_OK);
	if (write_idx > read_idx ? write_idx - read_idx != RINGBUF_SIZE
							 : read_idx - write_idx != RINGBUF_SIZE) {
		/* リングバッファが満杯にならなかった場合 */
		rercd = sig_sem(WRITE_SEM);
		assert(rercd == E_OK);
	}
}

void
read_ringbuf(DATA_T *p_data)
{
	bool	was_full;
	ER		rercd;

	rercd = wai_sem(READ_SEM);
	assert(rercd == E_OK);
	rercd = loc_mtx(READ_MTX);
	assert(rercd == E_OK);

	was_full = (write_idx > read_idx ? write_idx - read_idx == RINGBUF_SIZE
									 : read_idx - write_idx == RINGBUF_SIZE);
	*p_data = ringbuf[read_idx];
	read_idx = NEXT_IDX(read_idx);

	if (was_full) {
		/* リングバッファが満杯だった場合 */
		rercd = sig_sem(WRITE_SEM);
		assert(rercd == E_OK);
	}
	rercd = unl_mtx(READ_MTX);
	assert(rercd == E_OK);
}

使用するセマフォ(WRITE_SEM)は,初期値,最大値とも1です。これは,排他制御に使用する場合と同じ設定です。セマフォを排他制御に用いるのであれば,write_ringbufの先頭で取得して,リターン前に返却することになります。上のコードでは,write_ringbufの先頭で取得していますが,リターン前にはリングバッファが満杯にならなかった場合のみ返却しています。逆に言うと,リングバッファが満杯になった時は,セマフォを返却しません。これにより,リングバッファが満杯の時は,新たに呼ばれたwrite_ringbufがセマフォを取得できず,待ち状態になります。この待ち状態を解除するのは,read_ringbufでリングバッファに新たに空きができた時ということになります。

整理すると,このセマフォ(WRITE_SEM)は,リングバッファに空きがあり,かつ,write_ringbuf中のクリティカルセクションを処理中のタスクがない場合に,値が1になります。逆に言うと,リングバッファが満杯か,write_ringbufを処理中のタスクがある場合には,0になります。

同様に,読み出し側のセマフォとミューテックスも,1つのセマフォで実現することができます。読み出し側のセマフォは,初期値を0にしておきます。

システムコンフィギュレーションファイル.
CRE_SEM(WRITE_SEM, { TA_TPRI, 1, 1 });
CRE_SEM(READ_SEM, { TA_TPRI, 0, 1 });
void
write_ringbuf(DATA_T data)
{
	bool	was_empty;
	ER		rercd;

	rercd = wai_sem(WRITE_SEM);
	assert(rercd == E_OK);

	was_empty = (write_idx == read_idx);
	ringbuf[write_idx] = data;
	write_idx = NEXT_IDX(write_idx);

	if (was_empty) {
		/* リングバッファが空だった場合 */
		rercd = sig_sem(READ_SEM);
		assert(rercd == E_OK);
	}
	if (write_idx > read_idx ? write_idx - read_idx != RINGBUF_SIZE
							 : read_idx - write_idx != RINGBUF_SIZE) {
		/* リングバッファが満杯にならなかった場合 */
		rercd = sig_sem(WRITE_SEM);
		assert(rercd == E_OK);
	}
}

void
read_ringbuf(DATA_T *p_data)
{
	bool	was_full;
	ER		rercd;

	rercd = wai_sem(READ_SEM);
	assert(rercd == E_OK);

	was_full = (write_idx > read_idx ? write_idx - read_idx == RINGBUF_SIZE
									 : read_idx - write_idx == RINGBUF_SIZE);
	*p_data = ringbuf[read_idx];
	read_idx = NEXT_IDX(read_idx);

	if (was_full) {
		/* リングバッファが満杯だった場合 */
		rercd = sig_sem(WRITE_SEM);
		assert(rercd == E_OK);
	}
	if (write_idx != read_idx) {
		/* リングバッファが空にならなかった場合 */
		rercd = sig_sem(READ_SEM);
		assert(rercd == E_OK);
	}
}

セマフォのこの使い方は,排他制御と似ていますが,ミューテックスでは実現できません。ミューテックスは,ロックしたタスクが解放しなければならないためです。

ちなみに,TOPPERSカーネルに同梱されているシリアルインタフェースドライバでは,セマフォをこのパターンで使っています。シリアルインタフェースドライバでは,タスクと割込みハンドラの間でリングバッファを使っていますので,コードの構成は少し違っています。

データキュー:カーネルの提供するリングバッファ機能

μITRON仕様のAPIを用いたリングバッファの実装TIPSは以上ですが,リングバッファはしばしば使われるタスク間通信機構であるため,μITRON仕様には,カーネルの機能としてリングバッファがサポートされています。それがデータキューです。

データキューでは,リングバッファに格納するデータ型(DATA_T)は,intptr_tに決められています。それより大きいデータを扱いたい場合には,共有メモリ上にデータを置いて,その先頭番地をデータキュー送受信する必要があります。

5
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
0