前回までは、ウェブアプリケーションからの接続をイベントループで監視して、キューに突っ込んで、ワーカースレッドがキューから取り出していくところだった。
neoagentの中では接続元という意味でのclientと、リクエスト一つ一つに対して生成する状態管理用の構造体のclientとがあるので最初戸惑うけど、慣れるとどうってことない。
そんなわけで今回は接続元からのリクエストを読み出すコールバックna_client_callbackを見ていく。
static void na_client_callback(EV_P_ struct ev_io *w, int revents)
{
int cfd, tsfd, size;
na_client_t *client;
na_env_t *env;
cfd = w->fd;
client = (na_client_t *)w->data;
env = client->env;
tsfd = client->tsfd;
pthread_rwlock_rdlock(&env->lock_refused);
if ((client->is_refused_active != env->is_refused_active) || env->is_refused_accept) {
pthread_rwlock_unlock(&env->lock_refused);
NA_EVENT_FAIL(NA_ERROR_INVALID_CONNPOOL, EV_A, w, client, env);
return; // request fail
}
pthread_rwlock_unlock(&env->lock_refused);
if (env->loop_max > 0 && client->loop_cnt++ > env->loop_max) {
NA_EVENT_FAIL(NA_ERROR_OUTOF_LOOP, EV_A, w, client, env);
return; // request fail
}
このあたりはいつものlibevのコールバックと同じ。
if (revents & EV_READ) {
もしウェブアプリケーションからのリクエスト読み出しフェーズだったらこの節に入る。今回はこちらしか読まない。
if (!env->is_extensible_request_buf) {
if (client->crbufsize >= client->request_bufsize) {
NA_EVENT_FAIL(NA_ERROR_OUTOF_BUFFER, EV_A, w, client, env);
return; // request fail
}
} else if (client->request_bufsize >= env->request_bufsize_max) {
NA_EVENT_FAIL(NA_ERROR_OUTOF_BUFFER, EV_A, w, client, env);
return; // request fail
} else if (client->crbufsize >= client->request_bufsize) {
size_t es;
es = (client->request_bufsize - 1) * 2;
if (es >= env->request_bufsize_max) {
es = env->request_bufsize_max;
}
client->crbuf = (char *)realloc(client->crbuf, es + 1);
client->request_bufsize = es;
}
リクエストバッファーを動的に確保するか固定長にするかというオプションがあるらしい。動的確保ならrequest_bufsize_maxになるまでreallocする。
- crbuf: リクエストバッファーの先頭を指すポインター
- crbufsize: リクエストバッファーのうち実際に使われているバイト数
- request_bufsize: リクエストバッファーのバイト数
という理解でいいみたい。
size = read(cfd,
client->crbuf + client->crbufsize,
client->request_bufsize - client->crbufsize);
if (size == 0) {
na_event_stop(EV_A_ w, client, env);
return; // request success
} else if (size < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
return; // not ready yet
}
NA_EVENT_FAIL(NA_ERROR_FAILED_READ, EV_A, w, client, env);
return; // request fail
}
client->crbufsize += size;
client->crbuf[client->crbufsize] = '\0';
if (client->crbufsize > env->request_bufsize_current_max) {
env->request_bufsize_current_max = client->crbufsize;
}
リクエストを読み込む。libev (epoll)で監視しているので読み込みできる状態になってるはずだけど、errno == EAGAIN || errno == EWOULDBLOCKが起こりうるのだろうか?
client->cmd = na_memproto_detect_command(client->crbuf);
if (client->cmd == NA_MEMPROTO_CMD_QUIT) {
na_event_stop(EV_A_ w, client, env);
return; // request success
} else if (client->cmd == NA_MEMPROTO_CMD_GET) {
client->req_cnt = na_memproto_count_request_get(client->crbuf, client->crbufsize);
}
ここではmemcachedプロトコル特有の処理をしている。na_memproto_detect_commandでどのコマンドなのか判断して、quitだったらリクエストを終了。getの場合はmulti getの場合があるので、その場合はgetするキーの数を数えて最後に返ってきたvalueの数と照らし合わせていると開発者の方が言っていた。
if (client->crbufsize < 2) {
return; // not ready yet
} else if (client->crbuf[client->crbufsize - 2] == '\r' &&
client->crbuf[client->crbufsize - 1] == '\n')
{
if (client->cmd == NA_MEMPROTO_CMD_UNKNOWN) {
na_event_stop(EV_A_ w, client, env);
return; // request fail
}
client->event_state = NA_EVENT_STATE_TARGET_WRITE;
na_event_switch(EV_A_ w, &client->ts_watcher, tsfd, EV_WRITE);
return;
}
\r\nで終わっていればリクエスト部分が終わったとみなしてクライアントの状態をNA_EVENT_STATE_TARGET_WRITEにして、最後にna_event_switchを呼び出してKVS側ソケットへの書き出しに移っている。(setのvalue部分に\r\nが出てきていたらどうなるんだろうか?)
ちなみにna_event_switchはevent.cにあって、こういうコードになっていた。
inline static void na_event_switch (EV_P_ struct ev_io *old, struct ev_io *new, int fd, int revent)
{
ev_io_stop(EV_A_ old);
ev_io_set(new, fd, revent);
ev_io_start(EV_A_ new);
}
次回はna_target_server_callbackのWRITEを見ていく。