はじめに
「C言語でトライ! デザインパターン」
今回はマルチスレッドのパターンです。Threadpoolパターン。今回も自分が使いたいように解釈し、ざっくりいうと「FDイベントのロードバランサー」といった形で表現しました。
デザインパターン一覧
作成したライブラリパッケージの説明
公開コードサンプルはこちら, ライブラリパッケージはこちら
##Threadpoolパターン
wikipedia英語版のgoogle翻訳
スレッドプールは、監視プログラムによって並行して実行するためにタスクが割り当てられるのを待つ複数のスレッドを維持します(レプリケートされたワーカーまたはワーカークルーモデルとも呼ばれます)。
裏でいくつかスレッドを常駐させておいて、非同期処理をしたい時に利用しよう!という感じですね。ちょっとした非同期処理を沢山行う場合には、スレッドを作成する手間が省けるのでいいんですかね?
ただ、Cでコマンド実行系のプログラムを非同期で沢山作るなら、スクリプト言語でサクッと書いた方がいいと思います。前に上記のような形のプールを頑張って作った人がいましたが、使い勝手はう~ん、って感じでした。
そんな中、ちょうどサーバーの抱えるThe C10K problemというものを目にして、実際の1スレッドで頑張っているlighttpdを見ながらどうするのといいんだろうって考えていたところ、このスレッドプールっぽい表現でのコネクション管理がよさそうだと思い、今回のライブラリ作成に至りました。
ライブラリ
概要
基本方針としてはこう。
- epollのようなイベントを待ち受けするスレッドを複数プールさせる。
- ユーザーは、そのスレッドプールに対してイベント待ちしたいファイルディスクリプタをコールバック関数を登録する。
- イベントトリガーが発生すると、非同期でコールバック関数が登録される。
といった形です。epoll等の登録先を複数にして、偏らないようバランスをとるだけのライブラリです。
これはスレッドプールか?と思われるかもしれませんが、有意義な使い方だしいいかなと思ってこうしました。
(実は今まとめながら調べてたら.Netのスレッドプールなんかもこんな感じっぽいですね。)
クラス設計
Threadpoolの管理クラスをEventPoolManagerと、そのAPIをevent_threadpoolと表現しました。
ユーザーはEventPoolManagerのインスタンスを作成した後は、add/add_threadのAPIを使ってEventSubscriberを登録する形となります。
いい点
- 必ずと言っていいほど実装することになるイベント待ち受けスレッドを作ってくれる。
- 追加、削除もAPI一つなので楽
- スレッド指定も可能なので、特定のFD(例えば特定ユーザーのコネクション)を同じスレッドに登録して、疑似シングルスレッドな実装も可能
使いどころ
- イベントの多いシステム
- 接続クライアントの多いサーバー
欠点
- 非同期処理前提
- libevent 2以上を利用する為、ディストリビューションによっては構築が大変かも
動作環境:
Ubuntu 18.04 Desktop
libevent-devel 2.1.8-stable-4build1
、で動作確認済み
(sudo apt install libevent-devでインストール)
Cent OS5.1 Desktopはlibevent2の導入に少し時間がかかりそうなので未確認。
詳細
API定義
#include <event2/event.h>
struct event_tpool_manager_t;
typedef event_tpool_manager_t * EventTPollManager;
struct event_subscriber_t {
int fd;/*!< file descriptor of this subscriber */
int eventflag;/**< event flag related to event.h
*(or of EV_READ, EV_WRITE and EV_PERSIST.
*If you want to keep notification many times, please set EV_PERSIST.)
*/
event_callback_fn event_callback;/*!< defined in event.h, void (*event_callback_fn)(socketfd, short eventflag, void * event_arg);*/
} event_subscriber_t;
typedef event_subscriber_t * EventSubscriber;
EventTPoolManager event_tpool_manager_new(int thread_num, int is_threadsafe);
void event_tpool_manager_free(EventTPoolManager this);
size_t event_tpool_manager_get_threadnum(EventTPoolManager this);
int event_tpool_add(EventTPoolManager this, EventSubscriber subscriber, void * arg);
int event_tpool_add_thread(EventTPoolManager this, int threadid, EventSubscriber subscriber, void * arg);
void event_tpool_del(EventTPoolManager this, int fd);
#endif
使い方:
-
event_tpool_manager_new
でスレッドプールの管理インスタンスを作成します。thread_numが負の場合はデフォルト(とりあえずある記事を参考にCPU数×2としています)。数を知りたい場合はevent_tpool_manager_get_threadnum
を利用してみてください。また、利用する待ち受け処理はプラグイン化されており、ユーザー選択が可能です。※ -
event_subscriber_t
にFDとコールバック関数、イベント発行フラグを設定し、event_tpool_add
で登録。event_tpool_add_thread
でスレッド指定も出来ます。 -
event_tpool_del
で登録したイベントを削除。上書きもevent_tpool_add
で同じFDに対して上書きすることが出来ます。 - 使い終わったら
event_tpool_manager_free
で解放。
※libevを使用する場合のライセンスはGPLになるので注意です。
###コード
以下に置いてあります。
https://github.com/developer-kikikaikai/design_pattern_for_c/tree/master/threadpool
###サンプル
まずnewします。とりあえず3つスレッドをプールします。
EventTPoolManager tpool = event_tpool_manager_new(3, 0);
event_subscriber_tを用意してevent_tpool_add
していきます。登録イベント数は分散するので、3つevent_subscriber_tを登録すると各スレッドに1つずつ登録される形になります。
event_subscriber_t subscriber[TESTDATA]={
{.fd = testdata[0].sockpair[SUBSCRIBER_FD], .eventflag=EV_READ, .event_callback = test_1},
{.fd = testdata[1].sockpair[SUBSCRIBER_FD], .eventflag=EV_READ|EV_PERSIST, .event_callback = test_2},
{.fd = testdata[2].sockpair[SUBSCRIBER_FD], .eventflag=EV_READ|EV_PERSIST, .event_callback = test_3},
{.fd = testdata[3].sockpair[SUBSCRIBER_FD], .eventflag=EV_READ|EV_PERSIST, .event_callback = test_4},
};
int tid[TESTDATA];
for(int i=0; i<TESTDATA-1; i++) {
printf("add[%d] fd:%d\n", i, subscriber[i].fd);
tid[i] = event_tpool_add(tpool, &subscriber[i], &testdata[i]);
if(tid[i] < 0) {
DEBUG_ERRPRINT("####Failed to call event_tpool_add[%d]\n", i);
return -1;
}
}
「subscriber[3]はsubscriber[1]と同じスレッドにしたい!」という場合は、event_tpool_add_thread
を使って、同じスレッドに登録することも出来ます。
if(event_tpool_add_thread(tpool, tid[1], &subscriber[3], &testdata[3]) != tid[1]) {
DEBUG_ERRPRINT("####Failed to add event_tpool_add_thread[%d]\n", 3);
return -1;
}
登録が完了したら、そのFDに対してイベントを飛ばすとコールバックtest_1~test_4が呼ばれます。
//call write
for(int i=0;i<TESTDATA;i++) {
int tmp=0;
write(testdata[i].sockpair[TEST_FD], &tmp, sizeof(tmp));
}
###API変更履歴
2018/07/21 newのインターフェイスをプラグイン指定形式に変更
##参考
libeventでの実装時参考:
[manページ](http://www.manpagez.com/man/3/event_del/
公式のテストコード
苦戦したところは以下にメモって行きます。あまりにも遅かったらライブラリを差し替えます。
https://qiita.com/developer-kikikaikai/items/69f889e03133441ffbe0