LoginSignup
1
1

More than 5 years have passed since last update.

neoagent1人読書会 その3

Last updated at Posted at 2013-01-27

前回はna_event_loopを読んだ。その中でna_front_server_callbackを下のようにイベントループに登録していた。

    env->fs_watcher.data = env;
    ev_io_init(&env->fs_watcher, na_front_server_callback, env->fsfd, EV_READ);
    ev_io_start(EV_A_ &env->fs_watcher);
    ev_loop(EV_A_ 0);

evn->fs_watcherはev_io型で、コールバック関数の引数として渡される。このdataメンバーには(void *)を格納できて、コールバック関数に好きなものを渡すことができる。

今回はこのna_front_server_callbackを読んでいく。

void na_front_server_callback (EV_P_ struct ev_io *w, int revents)
{
    int fsfd, cfd, tsfd, cur_pool, cur_cli;
    int th_ret;
    na_env_t *env;
    na_client_t *client;
    na_connpool_t *connpool;

    th_ret   = 0;
    fsfd     = w->fd;
    env      = (na_env_t *)w->data;
    cfd      = -1;
    tsfd     = -1;
    cur_pool = -1;
    cur_cli  = -1;

libevのコールバック関数の型。ev_io wというのはさっき見たfs_watcherが入っているので、w->dataをna_env_tにキャストしなおしてenvを得ている。

..

    pthread_rwlock_rdlock(&env->lock_refused);
    if (env->is_refused_accept) {
        pthread_rwlock_unlock(&env->lock_refused);
        return;
    }
    pthread_rwlock_unlock(&env->lock_refused);

    if (env->error_count_max > 0 && (env->error_count > env->error_count_max)) {
        env->error_count = 0;
        return;
    }

    pthread_mutex_lock(&env->lock_current_conn);
    if (env->current_conn >= env->conn_max) {
        pthread_mutex_unlock(&env->lock_current_conn);
        return;
    }
    pthread_mutex_unlock(&env->lock_current_conn);

env(≒グローバル)の状態によってこのコールバックは終了。ウェブアプリケーション側からはKVSに接続できなかったという状態に見えるのかな…?(よくわからない)

    connpool = na_connpool_select(env);

na_connpool_selectはconnpool.cにある。アクティブのKVS用のコネクションプールかスタンバイのKVS用のコネクションプールを選択するらしい。基本はアクティブで、env->is_refused_activeだったらスタンバイということになっている。

    if (env->is_connpool_only) {
        pthread_mutex_lock(&env->lock_current_conn);
        if (env->current_conn >= connpool->max) {
            pthread_mutex_unlock(&env->lock_current_conn);
            return;
        }
        pthread_mutex_unlock(&env->lock_current_conn);
        if (!na_connpool_assign(env, &cur_pool, &tsfd)) {
            na_error_count_up(env);
            NA_STDERR("failed assign connection from connpool.");
            return;
        }
    } else {

コネクションプールのみの設定になっている場合、コネクション数の上限を超えてたりna_connpool_assign(env, &cur_pool, &tsfd)できなければreturn。

na_connpool_assignはコネクションプールから適当に一つ選んで、番号をcur_poolに、ソケットをtsfdに書きだして返すらしい。コネクションプールがいっぱいならfalseを返す。

次のelseはコネクションプールのみじゃない設定のとき。その場合、コネクションプールがいっぱになったら新たにKVSへのコネクションを開くことになるらしい。

    if ((cfd = na_server_accept(fsfd)) < 0) {
        if (cur_pool == -1) {
            close(tsfd);
        } else {
            pthread_mutex_lock(&env->lock_connpool);
            connpool->mark[cur_pool] = 0;
            pthread_mutex_unlock(&env->lock_connpool);
        }
        NA_STDERR_MESSAGE(NA_ERROR_INVALID_FD);
        return;
    }

ここではfsfdをacceptしてcfdを作っている。

最初はこれがよく分からなかったのだけど、socketプログラミングにおいては常識の概念だそうで、サーバー側のソケットというのはsocket()で生成したソケットをbind()でポートに登録して、listen()で待ち受け状態にしてaccept()を呼ぶことで接続を確立するというフローになるらしい。ただし、このソケットに直接read()/write()することはできない。実はaccept()は別のソケットを生成して返してくるので、accept()によって作られたソケットに対して読み書きするらしい。

つまり、fsfdはウェブアプリケーションからのリクエスト待ち専用のソケット、cfdは読み書き専用のソケットということになる。

    na_set_nonblock(cfd);

これはそのソケットに対してNONBLOCKフラグをつけている。これを立てると、readしようとしたときにまだ読むべきデータが届いてない場合に(待ち続けずに)失敗してくれるらしい。入力待ちになるのをepollとかで監視することで、IO待ちの発生しないプログラムになる。そのへんの面倒なことをうまくやってくれるのがlibevなのだと思う(自信ない)。

    cur_cli = na_client_assign(env);

    if (cur_cli >= 0) {
        client = &ClientPool[cur_cli];
        if (client->tsfd > 0) {
            close(client->tsfd);
        }
    } else {

na_client_assignはクライアントプールから空いてるクライアントを適当に一つ選ぶ関数。空きがあればそのクライアントを使う。

neoagentでは、①ウェブアプリケーションからのリクエストを読み込んで、②KVSに書き出して、③KVSからのレスポンスをまた読み込んで、④またウェブアプリケーションに返すというのを行うのに「クライアント」(na_client_t)というものを割り当てている。クライアントはウェブアプリケーションからのリクエストそれぞれに対して一つずつあり、現在自分が①〜④のどの状態にあるかを管理するステートマシンとなる。

        client = (na_client_t *)malloc(sizeof(na_client_t));
        if (client == NULL) {
            close(cfd);
            if (cur_pool == -1) {
                close(tsfd);
            } else {
                pthread_mutex_lock(&env->lock_connpool);
                connpool->mark[cur_pool] = 0;
                pthread_mutex_unlock(&env->lock_connpool);
            }
            na_error_count_up(env);
            NA_STDERR_MESSAGE(NA_ERROR_OUTOF_MEMORY);
            return;
        }
        memset(client, 0, sizeof(*client));
        client->crbuf = (char *)malloc(env->request_bufsize + 1);
        client->srbuf = (char *)malloc(env->response_bufsize + 1);
        if (client->crbuf == NULL ||
            client->srbuf == NULL) {
            NA_FREE(client->crbuf);
            NA_FREE(client->srbuf);
            NA_FREE(client);
            close(cfd);
            if (cur_pool == -1) {
                close(tsfd);
            } else {
                pthread_mutex_lock(&env->lock_connpool);
                connpool->mark[cur_pool] = 0;
                pthread_mutex_unlock(&env->lock_connpool);
            }
            na_error_count_up(env);
            NA_STDERR_MESSAGE(NA_ERROR_OUTOF_MEMORY);
            return;
        }

クライアントプールに空いてるのがなければmallocする。

    client->cfd               = cfd;
    client->tsfd              = tsfd;
    client->env               = env;
    client->c_watcher.data    = client;
    client->ts_watcher.data   = client;
    pthread_rwlock_rdlock(&env->lock_refused);
    client->is_refused_active = env->is_refused_active;
    pthread_rwlock_unlock(&env->lock_refused);
    client->is_use_connpool   = cur_pool != -1 ? true : false;
    client->is_use_client_pool = cur_cli != -1 ? true : false;
    client->cur_pool          = cur_pool;
    client->crbufsize         = 0;
    client->cwbufsize         = 0;
    client->srbufsize         = 0;
    client->swbufsize         = 0;
    client->request_bufsize   = env->request_bufsize;
    client->response_bufsize  = env->response_bufsize;
    client->event_state       = NA_EVENT_STATE_CLIENT_READ;
    client->req_cnt           = 0;
    client->res_cnt           = 0;
    client->loop_cnt          = 0;
    client->cmd               = NA_MEMPROTO_CMD_NOT_DETECTED;
    client->connpool          = na_connpool_select(env);

..

クライアントはcfd(ウェブアプリケーションと通信するソケット)とtsfd(KVSと通信するソケット)を持ち、それぞれのソケットをイベントループで監視するためのc_watcherとts_watcherを持つ。

いよいよ今回の最後。

    if (!na_is_worker_busy(env)) {
        if (!na_event_queue_push(EventQueue, client)) {
            na_error_count_up(env);
            NA_STDERR("Too Many Connections!");
            ev_io_init(&client->c_watcher,  na_client_callback,        client->cfd,  EV_READ);
            ev_io_init(&client->ts_watcher, na_target_server_callback, client->tsfd, EV_NONE);
            ev_io_start(EV_A_ &client->c_watcher);
        }
    } else {
        ev_io_init(&client->c_watcher,  na_client_callback,        client->cfd,  EV_READ);
        ev_io_init(&client->ts_watcher, na_target_server_callback, client->tsfd, EV_NONE);
        ev_io_start(EV_A_ &client->c_watcher);
    }
}

まずna_is_worker_busyというのがある。workerは前回出てきた、na_event_observerのスレッドのこと。workerスレッドがbusyならメインスレッドでウェブアプリケーションからリクエストを読み込むイベントループ(c_watcher)を開始する。

workerがbusyじゃなければクライアントをイベントキューに追加。

キューがいっぱいならメインスレッドでイベントループをまわす。

長かった。まとめると、na_front_server_callbackはウェブアプリケーションからの接続があると呼ばれるコールバックで、クライアントというものを初期化して、KVSに接続して(あるいはコネクションプールから使って)、イベントキューに追加していた。

次回はna_event_observerを読む。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1