はじめに
たまたま pthread_cond_wait(3) の内部実装を調べる機会がありました。 glibc と bionic の実装を見てみましたが、どちらも肝心なところで futex(2) を使っています。futex(2) のオンライン・マニュアル (日本語, 英語) には以下のように書かれていて敷居が高そうです。
裸の futex はエンドユーザーが容易に使うことのできる概念として
意図されたものではない (glibc にはこのシステムコールに対する
ラッパー関数はない)。実装者は、アセンブリ言語に慣れており、
以下に挙げる futex ユーザー空間ライブラリのソースを読み終えて
いることが要求される。
ここでは以下のことを試みたいと思います。
- マルチスレッドな環境で futex() の動きがわかる簡単なサンプルプログラムを用意しました。まずはこれについて説明します。
- futex() が pthread_cond_wait() や pthread_cond_signal() でどのように使われているかその概要をざっくり説明します。
以下のことについては今回説明しません。よそに行ってください。
- FUTEX_WAIT と FUTEX_WAKE __以外__の使い方
- __atomic_fetch_add() などのアトミック操作
futex(2) にはサンプルプログラムとして futex_demo.c が掲載されています。これは 2 つのプロセス間で futex() を使うものです。これがわかる方はこれ以降の説明は必要がないかもしれません。
サンプルプログラム
ここでは FUTEX_WAIT をシグナル待ち、FUTEX_WAKE をシグナルの送信と表現します。
サンプルプログラムを以下に示します。シグナル待ちのスレッドを 1 個、シグナル送信スレッドを 2 個同時に走らせるプログラムです。スレッドの数はあまり重要ではないです。1 秒走った後に計測した数値を表示して終了します。
重要でない部分のエラー処理は省略しています。
#define _GNU_SOURCE
#include <stdio.h>
#include <errno.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/syscall.h>
#include <linux/futex.h>
int mem = 0;
int quit = 0;
int waitquit = 0;
struct {
int interrupted;
int woken;
} wait = { 0, 0 };
#define N 2
struct {
int sent;
int lost;
int received;
} sig[N] = { { 0, }, };
int sys_futex(int *uaddr, int op, int val)
{
return syscall(SYS_futex, uaddr, op, val, NULL, NULL, 0);
}
void *wait_thread(void *unused)
{
int ret;
int old;
while (!quit) {
/* old = mem */
old = __atomic_load_n(&mem, __ATOMIC_RELAXED); /* [A] */
ret = sys_futex(&mem, FUTEX_WAIT_PRIVATE, old); /* [B] */
if (ret == -1) {
if (errno == EAGAIN) {
wait.interrupted++;
} else {
perror("futex(WAIT) returns unexpected error: ");
abort();
}
} else {
wait.woken++;
}
}
waitquit = 1;
return NULL;
}
void *signal_thread(void *arg)
{
int ret;
int i = (intptr_t) arg;
struct timespec delay = { 0, 1 }; /* 1 nsec :p */
while (!waitquit) {
/* mem++ */
__atomic_fetch_add(&mem, 1, __ATOMIC_RELAXED);
ret = sys_futex(&mem, FUTEX_WAKE_PRIVATE, 1);
sig[i].sent++;
if (ret == 0) {
sig[i].lost++;
} else if (ret == 1) {
sig[i].received++;
} else {
perror("futex(WAKE) returns unexpected error: ");
abort();
}
nanosleep(&delay, NULL);
}
return NULL;
}
int main()
{
int i;
pthread_t w;
pthread_t s[N];
struct timespec sleep = { 1, 0 }; /* 1.0 sec */
pthread_create(&w, NULL, wait_thread, NULL);
for (i = 0; i < N; i++)
pthread_create(&s[i], NULL, signal_thread, (void *) (intptr_t) i);
nanosleep(&sleep, NULL);
quit = 1;
pthread_join(w, NULL);
for (i = 0; i < N; i++)
pthread_join(s[i], NULL);
printf(" woken intr sent recv lost\n");
printf("---------------------------------------------------\n");
printf("wait %8d %8d\n", wait.woken, wait.interrupted);
for (i = 0; i < N; i++)
printf("sigal %d %8d %8d %8d\n",
i, sig[i].sent, sig[i].received, sig[i].lost);
return 0;
}
コンパイルして実行すると例えば以下のような結果が得られます。
$ gcc sample.c -pthread
$ ./a.out
woken intr sent recv lost
---------------------------------------------------
wait 21639 11
sigal 0 16932 9499 7433
sigal 1 16943 12140 4803
サンプルプログラムを見ればわかると思いますが、wati_thread() がシグナル待ちのスレッド、signal_thread() がシグナル送信スレッドです。ユーザープログラムから直接 futex() をリンクできないので sys_futex() を用意してあります。以降、sys_futex(第1引数, FUTEX_WAIT_PRIVATE, 第3引数) を futex(WAIT) と、sys_futex(第1引数, FUTEX_WAKE_PRIVATE, 第3引数) を futex(WAKE) と略記します。
FUTEX_WAIT_PRIVATE は (FUTEX_WAIT | FUTEX_PRIVATE_FLAG) と定義されています。スレッド間で futex() を用いる場合は FUTEX_PRIVATE_FLAG を付加します。FUTEX_WAKE_PRIVATE も同様です。
シグナル送信側から説明します。futex(WAKE) の前にグローバル変数 mem の値をインクリメントしています。ここは mem の値が変わればいいので +1 でなくても構いません。futex(WAKE) した後は以下の状態を判別することが出来ます。
- シグナルが futex(WAIT) にて受信された (received++)
- シグナルを外した (lost++)
外したというのはシグナル受信スレッドが futex(WAIT) で待ちになっていない時にシグナルが送信されたということです。
シグナルの送受信を行う futex() は第 1 引数で示されるアドレスが同じでなくてはなりません。
次にシグナル受信側です。futex(WAIT) の前に変数 mem の値を old に保存しておきます。futex(WAIT) の引数には &mem アドレスと old 整数値が指定されています。なので、futex() の内部では、mem の値が変化しているかどうかを検知することが出来ます。
mem の値が変化していない場合 futex() はシグナル待ちに入ります。また、シグナル待ちに入った時点で mem の値が変化していないことを保証してくれます。器用ですね。
futex(WAIT) から戻った後は以下の状態を判別することが出来ます。
- mem の値が old と異なるので処理を中断 (interrupted++)
- シグナル待ちに入りシグナルを受信した (woken++)
つまりシグナル受信側ではソースコードの [A] - [B] の区間で変数 mem が書き換えられた場合と、[B] でシグナル待ちになっている状態でシグナルを受信したことを検知できます。[B] から while() をぐるっと回って [A] までの区間では何も検知できません。
以上がサンプルプログラムの説明です。なにも面白くないですね futex(WAIT) では old = mem という単純な操作を起点として、途中で邪魔された場合を含めて、シグナルの検出が可能だということがわかればいいと思います。
pthread_cond_wait() の内部
bionic での pthread_cond_wait() の実装は futex() をそのまま利用しています。一方、glibc のそれはかなり複雑です。bionic では pthread_cancel() がサポートされていないのでその関係もあるのでしょう。glibc 版はバージョンとともにその実装が大きく変わっています。古いバージョンを参照したほうが理解しやすいかもしれません。私はバージョン 2.19 と 2.26 を見てみました。2.26 は現時点での最新版です。
ここでは、bionic の pthread_cond_wait() をざっくり説明したいと思います。以下のコードは bionic の pthread_cond_wait() と pthread_cond_signal() を擬似的に示したものです。
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) {
int old = cond->state;
pthread_mutex_unlock(mutex); /* [C] */
futex(&cond->state, FUTEX_WAIT, old, NULL, NULL, 0); /* [D] */
pthread_mutex_lock(mutex); /* [E] */
return 0;
}
int pthread_cond_signal(pthread_cond_t *cond) {
cond->state++;
futex(&cond->state, FUTEX_WAKE, 1, NULL, NULL, 0);
return 0;
}
ここで cond->state は int 型の変数です。pthread_cond_signal() は上記のサンプルプログラムとほぼ同じですので説明は省略します。futex() の第 3 引数に INT_MAX を指定すると pthread_cond_broadcast() が実現できます。
pthread_cond_wait() は old = cond->state と futex() の間に pthread_mutex_unlock() が入っています。上記のサンプルプログラムではこの部分は空でした。
擬似コードの [C] のところで mutex をアンロックします。その瞬間 mutex でロック解除待ちしていたスレッドが走り出します。pthread_cond_wait() の使い方ではよくあるパターンだと思います。このスレッドが pthread_cond_signal() を呼び出したとします。[D] で futex() がシグナル待ちに入る前にシグナルが送信されたりすると、そのシグナルは受信されません。破棄されてしまいます。つまり [D] で待ちに入ったらそこから脱出できません。シグナルが 1 回しか送信されない場合はとても困ったことになります。
しかし実際には [D] の futex() で old と cond->state が比較されるのでこの問題は回避されます。
ちなみに [D] と [E] の間でシグナルが送信されてもなにも起こりません。無視されるだけです。プログラムは止まらないので pthread_cond_wait() の後の処理でうまく対処してね。ということでしょう。
最後に
futex(2) の使い方をサンプルプログラムと bionic の pthread_cond_wait() を参考に説明してみました。futex(2) の使い方の一例を示せたかと思います。