「その2」でna_event_loopを読んだが、その中でこのようにスレッドを作ってna_event_observerを渡していた。
if (EventQueue == NULL) {
EventQueue = na_event_queue_create(env->conn_max);
}
th_workers = calloc(sizeof(pthread_t), env->worker_max);
for (int i=0;i<env->worker_max;++i) {
pthread_create(&th_workers[i], NULL, na_event_observer, env);
}
「その3」ではna_front_server_callbackで、ウェブアプリケーションからのリクエストがある度にna_event_queue_push(EventQueue, client)でクライアントをキューに入れていた。
今回はna_event_observerを読む。短いのであっさり終わる。
static void *na_event_observer(void *args)
{
struct ev_loop *loop;
na_env_t *env;
na_client_t *client;
static int tid_s = 0;
int tid;
env = (na_env_t *)args;
pthread_mutex_lock(&env->lock_loop);
loop = na_event_loop_create(env->event_model);
pthread_mutex_unlock(&env->lock_loop);
pthread_mutex_lock(&env->lock_tid);
tid = tid_s++;
pthread_mutex_unlock(&env->lock_tid);
argsをenvにキャストしなおしている。
while (true) {
client = na_event_queue_pop(EventQueue);
あとはwhileループでイベントキューをpopしている。
if (client == NULL) {
pthread_mutex_lock(&EventQueue->lock);
if (EventQueue->cnt == 0) {
pthread_cond_wait(&EventQueue->cond, &EventQueue->lock);
}
pthread_mutex_unlock(&EventQueue->lock);
continue;
}
キューが空だったらpthread_cond_waitを呼ぶ。
pthread_cond_waitは、pthread_cond_broadcast(またはpthread_cond_signal)が呼ばれるまでスレッドを止めるらしい。pthread_cond_broadcastはna_event_queue_pushの中に書いてあった。それだったらpthread_cond_waitもna_event_queue_popの中にあって抽象化されてればいいのにと思った。
さておき。
ev_io_init(&client->c_watcher, na_client_callback, client->cfd, EV_READ);
ev_io_init(&client->ts_watcher, na_target_server_callback, client->tsfd, EV_NONE);
ev_io_start(EV_A_ &client->c_watcher);
pthread_rwlock_wrlock(&env->lock_worker_busy[tid]);
env->is_worker_busy[tid] = true;
pthread_rwlock_unlock(&env->lock_worker_busy[tid]);
ev_loop(EV_A_ 0);
pthread_rwlock_wrlock(&env->lock_worker_busy[tid]);
env->is_worker_busy[tid] = false;
pthread_rwlock_unlock(&env->lock_worker_busy[tid]);
ウェブアプリケーション側とKVS側のソケットにコールバックを付けて、ウェブアプリケーション側のほうだけ開始している。
前回読んだところでもイベントキューがいっぱいだったり、ワーカースレッドがbusyだったら同じ事をしていた。
}
return NULL;
}
あとは無限ループするだけ。
今回のまとめ。ワーカースレッドはウェブアプリケーションからのリクエストを順番に処理するだけ。
前回も書いたように、neoagentは①ウェブアプリケーションからのリクエストを読み込んで、②KVSに書き出して、③KVSからのレスポンスをまた読み込んで、④またウェブアプリケーションに返す、という流れを繰り返す。それぞれのリクエストに対して自分が今どの状態にいるかなどを記録しておくのがclientらしい。
na_client_callbackは①と④を、na_target_server_callbackは②と③の処理を担当することになる。なので、それぞれの関数の中でREADとWRITEのコールバックにわかれている。
ウェブアプリケーションのリクエストをまず読み込むのはna_client_callbackのREADなので、次回はそこを見ることにする。
ここで面倒なのが、ステートマシンとしてのclientと、neoagentから見たウェブアプリケーションのリクエストという意味でna_client_callbackというネーミングがあるのだが、これはそういうものだと思うしかない。ステートマシンのほうはcontextという名前にすれば良かったと開発者の方も言っていた。