前回はna_event_loopを読んだ。その中でna_front_server_callbackを下のようにイベントループに登録していた。
env->fs_watcher.data = env;
ev_io_init(&env->fs_watcher, na_front_server_callback, env->fsfd, EV_READ);
ev_io_start(EV_A_ &env->fs_watcher);
ev_loop(EV_A_ 0);
evn->fs_watcherはev_io型で、コールバック関数の引数として渡される。このdataメンバーには(void *)を格納できて、コールバック関数に好きなものを渡すことができる。
今回はこのna_front_server_callbackを読んでいく。
void na_front_server_callback (EV_P_ struct ev_io *w, int revents)
{
int fsfd, cfd, tsfd, cur_pool, cur_cli;
int th_ret;
na_env_t *env;
na_client_t *client;
na_connpool_t *connpool;
th_ret = 0;
fsfd = w->fd;
env = (na_env_t *)w->data;
cfd = -1;
tsfd = -1;
cur_pool = -1;
cur_cli = -1;
libevのコールバック関数の型。ev_io wというのはさっき見たfs_watcherが入っているので、w->dataをna_env_tにキャストしなおしてenvを得ている。
..
pthread_rwlock_rdlock(&env->lock_refused);
if (env->is_refused_accept) {
pthread_rwlock_unlock(&env->lock_refused);
return;
}
pthread_rwlock_unlock(&env->lock_refused);
if (env->error_count_max > 0 && (env->error_count > env->error_count_max)) {
env->error_count = 0;
return;
}
pthread_mutex_lock(&env->lock_current_conn);
if (env->current_conn >= env->conn_max) {
pthread_mutex_unlock(&env->lock_current_conn);
return;
}
pthread_mutex_unlock(&env->lock_current_conn);
env(≒グローバル)の状態によってこのコールバックは終了。ウェブアプリケーション側からはKVSに接続できなかったという状態に見えるのかな…?(よくわからない)
connpool = na_connpool_select(env);
na_connpool_selectはconnpool.cにある。アクティブのKVS用のコネクションプールかスタンバイのKVS用のコネクションプールを選択するらしい。基本はアクティブで、env->is_refused_activeだったらスタンバイということになっている。
if (env->is_connpool_only) {
pthread_mutex_lock(&env->lock_current_conn);
if (env->current_conn >= connpool->max) {
pthread_mutex_unlock(&env->lock_current_conn);
return;
}
pthread_mutex_unlock(&env->lock_current_conn);
if (!na_connpool_assign(env, &cur_pool, &tsfd)) {
na_error_count_up(env);
NA_STDERR("failed assign connection from connpool.");
return;
}
} else {
コネクションプールのみの設定になっている場合、コネクション数の上限を超えてたりna_connpool_assign(env, &cur_pool, &tsfd)できなければreturn。
na_connpool_assignはコネクションプールから適当に一つ選んで、番号をcur_poolに、ソケットをtsfdに書きだして返すらしい。コネクションプールがいっぱいならfalseを返す。
次のelseはコネクションプールのみじゃない設定のとき。その場合、コネクションプールがいっぱになったら新たにKVSへのコネクションを開くことになるらしい。
if ((cfd = na_server_accept(fsfd)) < 0) {
if (cur_pool == -1) {
close(tsfd);
} else {
pthread_mutex_lock(&env->lock_connpool);
connpool->mark[cur_pool] = 0;
pthread_mutex_unlock(&env->lock_connpool);
}
NA_STDERR_MESSAGE(NA_ERROR_INVALID_FD);
return;
}
ここではfsfdをacceptしてcfdを作っている。
最初はこれがよく分からなかったのだけど、socketプログラミングにおいては常識の概念だそうで、サーバー側のソケットというのはsocket()で生成したソケットをbind()でポートに登録して、listen()で待ち受け状態にしてaccept()を呼ぶことで接続を確立するというフローになるらしい。ただし、このソケットに直接read()/write()することはできない。実はaccept()は別のソケットを生成して返してくるので、accept()によって作られたソケットに対して読み書きするらしい。
つまり、fsfdはウェブアプリケーションからのリクエスト待ち専用のソケット、cfdは読み書き専用のソケットということになる。
na_set_nonblock(cfd);
これはそのソケットに対してNONBLOCKフラグをつけている。これを立てると、readしようとしたときにまだ読むべきデータが届いてない場合に(待ち続けずに)失敗してくれるらしい。入力待ちになるのをepollとかで監視することで、IO待ちの発生しないプログラムになる。そのへんの面倒なことをうまくやってくれるのがlibevなのだと思う(自信ない)。
cur_cli = na_client_assign(env);
if (cur_cli >= 0) {
client = &ClientPool[cur_cli];
if (client->tsfd > 0) {
close(client->tsfd);
}
} else {
na_client_assignはクライアントプールから空いてるクライアントを適当に一つ選ぶ関数。空きがあればそのクライアントを使う。
neoagentでは、①ウェブアプリケーションからのリクエストを読み込んで、②KVSに書き出して、③KVSからのレスポンスをまた読み込んで、④またウェブアプリケーションに返すというのを行うのに「クライアント」(na_client_t)というものを割り当てている。クライアントはウェブアプリケーションからのリクエストそれぞれに対して一つずつあり、現在自分が①〜④のどの状態にあるかを管理するステートマシンとなる。
client = (na_client_t *)malloc(sizeof(na_client_t));
if (client == NULL) {
close(cfd);
if (cur_pool == -1) {
close(tsfd);
} else {
pthread_mutex_lock(&env->lock_connpool);
connpool->mark[cur_pool] = 0;
pthread_mutex_unlock(&env->lock_connpool);
}
na_error_count_up(env);
NA_STDERR_MESSAGE(NA_ERROR_OUTOF_MEMORY);
return;
}
memset(client, 0, sizeof(*client));
client->crbuf = (char *)malloc(env->request_bufsize + 1);
client->srbuf = (char *)malloc(env->response_bufsize + 1);
if (client->crbuf == NULL ||
client->srbuf == NULL) {
NA_FREE(client->crbuf);
NA_FREE(client->srbuf);
NA_FREE(client);
close(cfd);
if (cur_pool == -1) {
close(tsfd);
} else {
pthread_mutex_lock(&env->lock_connpool);
connpool->mark[cur_pool] = 0;
pthread_mutex_unlock(&env->lock_connpool);
}
na_error_count_up(env);
NA_STDERR_MESSAGE(NA_ERROR_OUTOF_MEMORY);
return;
}
クライアントプールに空いてるのがなければmallocする。
client->cfd = cfd;
client->tsfd = tsfd;
client->env = env;
client->c_watcher.data = client;
client->ts_watcher.data = client;
pthread_rwlock_rdlock(&env->lock_refused);
client->is_refused_active = env->is_refused_active;
pthread_rwlock_unlock(&env->lock_refused);
client->is_use_connpool = cur_pool != -1 ? true : false;
client->is_use_client_pool = cur_cli != -1 ? true : false;
client->cur_pool = cur_pool;
client->crbufsize = 0;
client->cwbufsize = 0;
client->srbufsize = 0;
client->swbufsize = 0;
client->request_bufsize = env->request_bufsize;
client->response_bufsize = env->response_bufsize;
client->event_state = NA_EVENT_STATE_CLIENT_READ;
client->req_cnt = 0;
client->res_cnt = 0;
client->loop_cnt = 0;
client->cmd = NA_MEMPROTO_CMD_NOT_DETECTED;
client->connpool = na_connpool_select(env);
..
クライアントはcfd(ウェブアプリケーションと通信するソケット)とtsfd(KVSと通信するソケット)を持ち、それぞれのソケットをイベントループで監視するためのc_watcherとts_watcherを持つ。
いよいよ今回の最後。
if (!na_is_worker_busy(env)) {
if (!na_event_queue_push(EventQueue, client)) {
na_error_count_up(env);
NA_STDERR("Too Many Connections!");
ev_io_init(&client->c_watcher, na_client_callback, client->cfd, EV_READ);
ev_io_init(&client->ts_watcher, na_target_server_callback, client->tsfd, EV_NONE);
ev_io_start(EV_A_ &client->c_watcher);
}
} else {
ev_io_init(&client->c_watcher, na_client_callback, client->cfd, EV_READ);
ev_io_init(&client->ts_watcher, na_target_server_callback, client->tsfd, EV_NONE);
ev_io_start(EV_A_ &client->c_watcher);
}
}
まずna_is_worker_busyというのがある。workerは前回出てきた、na_event_observerのスレッドのこと。workerスレッドがbusyならメインスレッドでウェブアプリケーションからリクエストを読み込むイベントループ(c_watcher)を開始する。
workerがbusyじゃなければクライアントをイベントキューに追加。
キューがいっぱいならメインスレッドでイベントループをまわす。
長かった。まとめると、na_front_server_callbackはウェブアプリケーションからの接続があると呼ばれるコールバックで、クライアントというものを初期化して、KVSに接続して(あるいはコネクションプールから使って)、イベントキューに追加していた。
次回はna_event_observerを読む。