LoginSignup
19

More than 5 years have passed since last update.

helloworld で学ぶ gRPC の C++ におけるスレッド処理

Posted at

はじめに

前回の記事で、gRPC ライブラリを C++ から利用するときに必要になる初歩の部分 (ライブラリのビルド、ソースコードの検索、デバッグなど) を紹介しました。

サンプル プログラムをベースにして単に動くものを作るのは簡単ですが、 gRPC の内部動作を理解することで、自分の要件にあったパラメーター設定を正しく選べたり、トラブルシューティングが効率よく行えるようになります。helloworld サンプルの動作をデバッグしながら、gRPC の内部動作、特にスレッド操作の部分を調べたので本記事にまとめました。前回が「helloworld の次に書く...」とかいうタイトルのくせに、今回は「helloworld で学ぶ...」だと退行してるだろ、という突っ込みはご勘弁を。

デバッグしたのはサンプルの helloworld ですが、そのままだとビルドやデバッグが扱いにくかったので、一つの実行可能ファイルに 4 つの役割 (同期/非同期 x サーバー/クライアント) を統合しました。

gRPC ワーカースレッド

helloword の同期型サーバー プログラムを起動すると、スレッドが 2 つ実行される状態になります。

$ gdb ../bin/helloworld
(gdb) r -s
Starting program: /data/src/grpc-tutorial/bin/helloworld -s
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
I0430 10:43:54.640469173   12104 server_builder.cc:258]      Synchronous server. Num CQs: 1, Min pollers: 1, Max Pollers: 2, CQ timeout (msec): 10000
I0430 10:43:54.640622473   12104 ev_epoll_linux.c:95]        epoll engine will be using signal: 40
D0430 10:43:54.640639673   12104 ev_posix.c:107]             Using polling engine: epoll
D0430 10:43:54.640693573   12104 dns_resolver.c:316]         Using native dns resolver
[New Thread 0x7ffff5d1d700 (LWP 12108)]
[Thread 0x7ffff5d1d700 (LWP 12108) exited]
[New Thread 0x7ffff551c700 (LWP 12109)]
Server listening on 0.0.0.0:50051
^C
Thread 1 "helloworld" received signal SIGINT, Interrupt.
pthread_cond_wait@@GLIBC_2.3.2 () at ../sysdeps/unix/sysv/linux/x86_64/pthread_cond_wait.S:185
185     ../sysdeps/unix/sysv/linux/x86_64/pthread_cond_wait.S: No such file or directory.
=> 0x00007ffff65bd360 <pthread_cond_wait@@GLIBC_2.3.2+192>:     8b 3c 24        mov    (%rsp),%edi
(gdb) i threads
  Id   Target Id         Frame
* 1    Thread 0x7ffff7fd6900 (LWP 12104) "helloworld" pthread_cond_wait@@GLIBC_2.3.2 ()
    at ../sysdeps/unix/sysv/linux/x86_64/pthread_cond_wait.S:185
  3    Thread 0x7ffff551c700 (LWP 12109) "helloworld" 0x00007ffff68d3a37 in __GI_epoll_pwait (
    epfd=7, events=0x7ffff551b3a0, maxevents=100, timeout=1000, set=0x7ffff551c670)
    at ../sysdeps/unix/sysv/linux/epoll_pwait.c:42

上記ログにおけるスレッド #1 は main 関数を実行しているメイン スレッドで、 grpc::ServerBuilder::Wait で待機状態に入っています。より正確には、条件変数 Server::shutdown_cv_ によってスレッドの実行がブロックされています。

(gdb) bt
#0  pthread_cond_wait@@GLIBC_2.3.2 ()
    at ../sysdeps/unix/sysv/linux/x86_64/pthread_cond_wait.S:185
#1  0x00007ffff6e5f91c in std::condition_variable::wait(std::unique_lock<std::mutex>&) ()
   from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#2  0x00007ffff72360f2 in grpc::Server::Wait (this=0x640370) at src/cpp/server/server_cc.cc:602
#3  0x000000000041eb12 in RunServer (endpoint=0x0) at greeter_server.cpp:76
#4  0x000000000041f25a in main (argc=2, argv=0x7fffffffe558) at main.cpp:16

もう一つのスレッドは、grpc::ThreadManager::WorkerThread::Run というエントリポイントから始まるワーカースレッドで、コールスタックを見ると、epoll_pwait で I/O イベントを待機しています。デバッグすると分かりますが、メソッドの実体である GreeterServiceImpl::SayHello は、メインスレッドではなくこのワーカースレッドで実行されます。

(gdb) bt
#0  0x00007ffff68d3a37 in __GI_epoll_pwait (epfd=7, events=0x7ffff551b3a0, maxevents=100,
    timeout=1000, set=0x7ffff551c670) at ../sysdeps/unix/sysv/linux/epoll_pwait.c:42
#1  0x00007ffff753a434 in pollset_work_and_unlock (exec_ctx=0x7ffff551bab0, pollset=0x6402e0,
    worker=0x7ffff551b8c0, timeout_ms=1000, sig_mask=0x7ffff551c670, error=0x7ffff551b8b8)
    at src/core/lib/iomgr/ev_epoll_linux.c:1430
#2  0x00007ffff753aae4 in pollset_work (exec_ctx=0x7ffff551bab0, pollset=0x6402e0,
    worker_hdl=0x0, now=..., deadline=...) at src/core/lib/iomgr/ev_epoll_linux.c:1571
#3  0x00007ffff753fd26 in grpc_pollset_work (exec_ctx=0x7ffff551bab0, pollset=0x6402e0,
    worker=0x0, now=..., deadline=...) at src/core/lib/iomgr/ev_posix.c:207
#4  0x00007ffff7560d75 in grpc_completion_queue_next (cc=0x6401c0, deadline=..., reserved=0x0)
    at src/core/lib/surface/completion_queue.c:595
#5  0x00007ffff72205a0 in grpc::CompletionQueue::AsyncNextInternal (this=0x647a60,
    tag=0x7ffff551bc28, ok=0x7ffff551bc22, deadline=...)
    at src/cpp/common/completion_queue_cc.cc:71
#6  0x00007ffff7238e0b in grpc::CompletionQueue::AsyncNext<gpr_timespec> (this=0x647a60,
    tag=0x7ffff551bc28, ok=0x7ffff551bc22, deadline=...)
    at include/grpc++/impl/codegen/completion_queue.h:140
#7  0x00007ffff7238024 in grpc::Server::SyncRequestThreadManager::PollForWork (this=0x6404b0,
    tag=0x7ffff551bc28, ok=0x7ffff551bc22) at src/cpp/server/server_cc.cc:277
#8  0x00007ffff723f37d in grpc::ThreadManager::MainWorkLoop (this=0x6404b0)
    at src/cpp/thread_manager/thread_manager.cc:128
#9  0x00007ffff723ed9f in grpc::ThreadManager::WorkerThread::Run (this=0x641a80)
    at src/cpp/thread_manager/thread_manager.cc:48
#10 0x00007ffff7240e13 in std::_Mem_fn_base<void (grpc::ThreadManager::WorkerThread::*)(), true>::operator()<, void>(grpc::ThreadManager::WorkerThread*) const (this=0x641a60, __object=0x641a80)
    at /usr/include/c++/5/functional:600
#11 0x00007ffff7240da7 in std::_Bind_simple<std::_Mem_fn<void (grpc::ThreadManager::WorkerThread::*)()> (grpc::ThreadManager::WorkerThread*)>::_M_invoke<0ul>(std::_Index_tuple<0ul>) (
    this=0x641a58) at /usr/include/c++/5/functional:1531
#12 0x00007ffff7240cae in std::_Bind_simple<std::_Mem_fn<void (grpc::ThreadManager::WorkerThread::*)()> (grpc::ThreadManager::WorkerThread*)>::operator()() (this=0x641a58)
    at /usr/include/c++/5/functional:1520
#13 0x00007ffff7240c3e in std::thread::_Impl<std::_Bind_simple<std::_Mem_fn<void (grpc::ThreadManager::WorkerThread::*)()> (grpc::ThreadManager::WorkerThread*)> >::_M_run() (this=0x641a40)
    at /usr/include/c++/5/thread:115
#14 0x00007ffff6e64c80 in ?? () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#15 0x00007ffff65b76ba in start_thread (arg=0x7ffff551c700) at pthread_create.c:333
#16 0x00007ffff68d382d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:109

ワーカースレッドは基本的に grpc::ThreadManager::MainWorkLoop の PollForWork で I/O を待機しており、リクエストが来ると PollForWork から制御が返ってきて、同じく MainWorkLoop から呼ばれる grpc::Server::SyncRequestThreadManager::DoWork 経由で gRPC メソッドが呼ばれます。MainWorkLoop の処理が while (true) {...} で囲まれていることから分かるように、スレッドを終了するリクエストが来ない限りは、MainWorkLoop は PollForWork と DoWork をひたすら繰り返します。

上述の実行ログで、以下の行が出力されていました。ここで表示されている pollers はワーカースレッドのことであり、Min pollers: 1, Max Pollers: 2 という部分は、ワーカースレッド実行数の最小値 が 1、最大値が 2 になるようにサーバーが設定されていることを意味しています。

I0430 10:43:54.640469173 12104 server_builder.cc:258] Synchronous server. Num CQs: 1, Min pollers: 1, Max Pollers: 2, CQ timeout (msec): 10000

Min/Max pollers を含めたこれらの設定は、ServerBuilder のインスタンスが SyncServerSettings 型の メンバー変数 sync_server_settings_ として保持しています。具体的な数値はデフォルト コンストラクターが以下のように設定しています。

server_builder.h
  struct SyncServerSettings {
    SyncServerSettings()
        : num_cqs(1), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {}

    // Number of server completion queues to create to listen to incoming RPCs.
    int num_cqs;

    // Minimum number of threads per completion queue that should be listening
    // to incoming RPCs.
    int min_pollers;

    // Maximum number of threads per completion queue that can be listening to
    // incoming RPCs.
    int max_pollers;

    // The timeout for server completion queue's AsyncNext call.
    int cq_timeout_msec;
  };

個々の設定値は、ServerBuilder::SetSyncServerOption を使って変更できます。

builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 1);
builder.SetSyncServerOption(ServerBuilder::SyncServerOption::MIN_POLLERS, 1);
builder.SetSyncServerOption(ServerBuilder::SyncServerOption::MAX_POLLERS, 1);
builder.SetSyncServerOption(ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10000);

上記では、ワーカースレッドの最大値、最小値をともに 1 に設定しています。この場合、実行直後はワーカースレッドが 1 つだけ開始されます。しかしクライアントのプログラムを実行すると、必ずワーカースレッドが新しく作られ、メソッドの実行が終わってもワーカースレッドは 2 つのままになります。この動作は、ThreadManager::MainWorkLoop の中で PollForWork から制御が返ってきた後に num_pollers_ をデクリメントしているためです。

MainWorkLoop において、DoWork を実行する前に、ワーカースレッド数が不足していればスレッドを 1 つ作成する処理ブロックがあります。その判断のための比較は、デクリメントした値と、設定した最小値 min_pollers_ との比較で行うため、最初のメソッド実行時には必ずワーカースレッドが作成されます。PollForWork からの戻り値が CompletionQueue::TIMEOUT だった場合にも比較が行われ、ワーカースレッドが最大値を超えている場合にはスレッドを終了します。このときもデクリメント後の値と比較しているため、常に最大値 + 1 のスレッドが実行されることになります。デクリメントの処理は mu_ でロックされるため、デクリメントが 2 回以上行われることはありません。

PollForWork を実行してから一定時間を超えると、PollForWork は TIMEOUT という戻り値を返します。そのタイムアウト値が CQ_TIMEOUT_MSEC で設定する cq_timeout_msec で、デフォルトは 10 秒です。この時間ごとに PollForWork が制御を返してスレッド数を調整するため、あまり短いと MainWorkLoop() の中で余計な処理が走りすぎます。かといって長すぎる時間を設定すると、スレッド数の調整がその間一切行われないため、短時間に大量のリクエストが来るような場合、最大値を超えるスレッドがピーク時に作られて、ピークが過ぎても長い間余剰のスレッドが維持されることになります。

残る設定値の NUM_CQS は、名前の通り、作成する CompletionPort の数を指定するパラメーターです。CompletionPort は ServerBuilder::BuildAndStart 内で作成され、grpc::Server のデフォルト コンストラクターに渡されます。grpc::Server は CompletionQueue の配列を保持しており、デフォルト コンストラクターの中で CompletionPort の要素ごとに SyncRequestThreadManager という ThreadManager の派生クラスのインスタンスを作成します。スレッド数の制御が ThreadManager::MainWorkLoop で行われていたことから分かるように、それぞれの SyncRequestThreadManager インスタンスがワーカースレッドの管理を行います。num_cqs はその単位を増やすもので、結果的に、プログラム起動時に作られるワーカースレッドの合計数は num_cqs * min_pollers になります。

同じ CompletionPort を共有するスレッドを増やすか、新たに CompletionPort を作ってThreadManager を増やすかどうかは、送受信するデータのサイズによって決めることになると思います。データサイズが小さければ、仮にメソッドが長時間スレッドを占有していても CompletionPort は次のリクエストを受け取って同じ ThreadManager の別のワーカースレッドで処理を始めることができます。一方、もしデータサイズが大きい場合は、仮にメソッド自体の処理が軽いものだったとしても、そもそものデータの受信に時間がかかってしまい、受信中は他のリクエストが処理できなくなることが予想されます。実際に試していないので確証はありませんが、そのような場合には ThreadManager を増やす方がよい結果が得られるはずです。

非同期サーバー

非同期型の gRPC サーバーを使う場合、すなわち grpc::ServerBuilder::AddCompletionQueue を使って自分で CompletionQueue を追加する場合は、gRPC 側でワーカー スレッドを作るようなことはしません。例えば helloworld サンプルにおける非同期サーバーでは、全ての gRPC メソッドをメイン スレッドで処理します。

以下に示したものが、非同期型の helloword サーバーにおける待ち状態のコールスタックで、同期型サーバーの待ち状態とほとんど同じです。同期型サーバーでは、内部の PollForWork 関数が CompleletionPort の I/O を待っていましたが、非同期型の場合には、自分で CompleletionPort を追加し、メッセージ ループのようなものを書いて自分で CompletionPort で待機する必要があります。helloworld サンプルでは、これをメインスレッドでそのまま行っています。複数のリクエストを同時に処理したい場合には、結局は自分でワーカースレッドのような仕組みを自分で書かないといけません。

(gdb) bt
#0  0x00007ffff6851a37 in __GI_epoll_pwait (epfd=7, events=0x7fffffffda00, maxevents=100,
    timeout=1000, set=0x7ffff7fcd870) at ../sysdeps/unix/sysv/linux/epoll_pwait.c:42
#1  0x00007ffff748d0af in pollset_work_and_unlock (exec_ctx=0x7fffffffe110, pollset=0x645750,
    worker=0x7fffffffdf20, timeout_ms=1000, sig_mask=0x7ffff7fcd870, error=0x7fffffffdf18)
    at src/core/lib/iomgr/ev_epoll_linux.c:1430
#2  0x00007ffff748d75f in pollset_work (exec_ctx=0x7fffffffe110, pollset=0x645750, worker_hdl=0x0,
    now=..., deadline=...) at src/core/lib/iomgr/ev_epoll_linux.c:1571
#3  0x00007ffff74929a1 in grpc_pollset_work (exec_ctx=0x7fffffffe110, pollset=0x645750,
    worker=0x0, now=..., deadline=...) at src/core/lib/iomgr/ev_posix.c:207
#4  0x00007ffff74b39ed in grpc_completion_queue_next (cc=0x645630, deadline=..., reserved=0x0)
    at src/core/lib/surface/completion_queue.c:595
#5  0x00007ffff719e480 in grpc::CompletionQueue::AsyncNextInternal (this=0x64eca0,
    tag=0x7fffffffe220, ok=0x7fffffffe21f, deadline=...)
    at src/cpp/common/completion_queue_cc.cc:71
#6  0x000000000042262b in grpc::CompletionQueue::Next (this=0x64eca0, tag=0x7fffffffe220,
    ok=0x7fffffffe21f)
    at /data/bin/grpc/grpc-dev/include/grpc++/impl/codegen/completion_queue.h:151
#7  0x00000000004245fa in ServerImpl::HandleRpcs (this=0x7fffffffe3c0)
    at greeter_async_server.cpp:166
#8  0x0000000000424129 in ServerImpl::Run (this=0x7fffffffe3c0, endpoint=0x0)
    at greeter_async_server.cpp:80
#9  0x0000000000423c28 in RunAsyncServer (endpoint=0x0) at greeter_async_server.cpp:179
#10 0x00000000004235f6 in main (argc=2, argv=0x7fffffffe568) at main.cpp:42

gRPC においては、同期サーバーと言っても、内部ではワーカースレッドを作って複数スレッドで非同期に I/O を待機して処理しているため、「同期サーバー = 非同期サーバー + Google 実装のワーカースレッド モデル」という図式が成り立ちます。I/O 待機のループに何らかのカスタマイズを加える必要がない限りは、同期サーバーを使うのが楽で、効率もよくなると思います。

SIG40 の謎

同期型の helloworld サーバーを gdb から起動し、クライアント プログラムを何度か実行すると、リアルタイム シグナルを受信してブレークすることがあります。結論から言うと、これは gRPC の既定の動作の一つです。当初 SIG40 がプログラム側の実装ミスか何かだと思ってデバッグを行い、結果として gRPC の内部動作をより詳しく見るはめになったので、何の役に立つかは分かりませんが調査の過程を紹介します。

Server listening on 0.0.0.0:50051
[New Thread 0x7fffeffff700 (LWP 2473)]
Thread 3 "helloworld" received signal SIG40, Real-time event 40.
[Switching to Thread 0x7ffff4bec700 (LWP 2467)]
0x00007ffff6a6ea37 in __GI_epoll_pwait (epfd=7, events=0x7ffff4beafa0, maxevents=100, timeout=17,
    set=0x7ffff4bec670) at ../sysdeps/unix/sysv/linux/epoll_pwait.c:42
42      ../sysdeps/unix/sysv/linux/epoll_pwait.c: No such file or directory.
   0x00007ffff6a6ea14 <__GI_epoll_pwait+100>:   e8 c7 da 00 00  callq  0x7ffff6a7c4e0 <__libc_enable_asynccancel>
   0x00007ffff6a6ea19 <__GI_epoll_pwait+105>:   41 b9 08 00 00 00       mov    $0x8,%r9d
   0x00007ffff6a6ea1f <__GI_epoll_pwait+111>:   89 c5   mov    %eax,%ebp
   0x00007ffff6a6ea21 <__GI_epoll_pwait+113>:   4d 89 f8        mov    %r15,%r8
   0x00007ffff6a6ea24 <__GI_epoll_pwait+116>:   4d 63 d5        movslq %r13d,%r10
   0x00007ffff6a6ea27 <__GI_epoll_pwait+119>:   49 63 d4        movslq %r12d,%rdx
   0x00007ffff6a6ea2a <__GI_epoll_pwait+122>:   4c 89 f6        mov    %r14,%rsi
   0x00007ffff6a6ea2d <__GI_epoll_pwait+125>:   48 63 fb        movslq %ebx,%rdi
   0x00007ffff6a6ea30 <__GI_epoll_pwait+128>:   b8 19 01 00 00  mov    $0x119,%eax
   0x00007ffff6a6ea35 <__GI_epoll_pwait+133>:   0f 05   syscall
=> 0x00007ffff6a6ea37 <__GI_epoll_pwait+135>:   48 3d 00 f0 ff ff       cmp    $0xfffffffffffff000,%rax
(gdb) bt 11
#0  0x00007ffff6a6ea37 in __GI_epoll_pwait (epfd=7, events=0x7ffff4beafa0, maxevents=100,
    timeout=17, set=0x7ffff4bec670) at ../sysdeps/unix/sysv/linux/epoll_pwait.c:42
#1  0x00007ffff76aa0af in pollset_work_and_unlock (exec_ctx=0x7ffff4beb6c0,
    pollset=0x7ffff0001080, worker=0x7ffff4beb4c0, timeout_ms=17, sig_mask=0x7ffff4bec670,
    error=0x7ffff4beb4b8) at src/core/lib/iomgr/ev_epoll_linux.c:1430
#2  0x00007ffff76aa75f in pollset_work (exec_ctx=0x7ffff4beb6c0, pollset=0x7ffff0001080,
    worker_hdl=0x7ffff4beb620, now=..., deadline=...) at src/core/lib/iomgr/ev_epoll_linux.c:1571
#3  0x00007ffff76af9a1 in grpc_pollset_work (exec_ctx=0x7ffff4beb6c0, pollset=0x7ffff0001080,
    worker=0x7ffff4beb620, now=..., deadline=...) at src/core/lib/iomgr/ev_posix.c:207
#4  0x00007ffff76d1519 in grpc_completion_queue_pluck (cc=0x7ffff0000f60, tag=0x7ffff4beb8f0,
    deadline=..., reserved=0x0) at src/core/lib/surface/completion_queue.c:787
#5  0x00007ffff73bb65d in grpc::CoreCodegen::grpc_completion_queue_pluck (
    this=0x7ffff7668330 <grpc::internal::g_core_codegen>, cq=0x7ffff0000f60, tag=0x7ffff4beb8f0,
    deadline=..., reserved=0x0) at src/cpp/common/core_codegen.cc:87
#6  0x0000000000417b4d in grpc::CompletionQueue::Pluck (this=0x7ffff4beba80, tag=0x7ffff4beb8f0)
    at /data/bin/grpc/grpc-dev/include/grpc++/impl/codegen/completion_queue.h:238
#7  0x000000000041ce8c in grpc::RpcMethodHandler<helloworld::Greeter::Service, helloworld::HelloRequest, helloworld::HelloReply>::RunHandler (this=0x640820, param=...)
    at /data/bin/grpc/grpc-dev/include/grpc++/impl/codegen/method_handler_impl.h:76
#8  0x00007ffff73d2bc5 in grpc::Server::SyncRequest::CallData::Run (this=0x7ffff4beba80,
    global_callbacks=std::shared_ptr (count 4, weak 0) 0x63e590) at src/cpp/server/server_cc.cc:221
#9  0x00007ffff73d30af in grpc::Server::SyncRequestThreadManager::DoWork (this=0x642dd0,
    tag=0x647a90, ok=true) at src/cpp/server/server_cc.cc:311
#10 0x00007ffff73da371 in grpc::ThreadManager::MainWorkLoop (this=0x642dd0)
    at src/cpp/thread_manager/thread_manager.cc:158
(More stack frames follow...)
(gdb) p grpc_wakeup_signal
$1 = 40

上記例では SIG40 となっていますが、環境によっては別のシグナル番号が使われることも考えられます。この番号の正体は、ev_epoll_linux.c にある grpc_init_epoll_linux 関数で指定されている SIGRTMIN + 6 という値で、グローバル変数 grpc_wakeup_signal の値を見ることで実際の値を確認できます。

grpc/ev_epoll_linux.c at master · grpc/grpc
https://github.com/grpc/grpc/blob/master/src/core/lib/iomgr/ev_epoll_linux.c

シグナルの目的は、同じファイルの pollset_work 関数にコメントとして説明があります。簡潔に言えば、epoll_wait() を呼び出して待ち状態になっているスレッドに対して、epoll_wait() から強制的に制御を取り戻すのが目的です。

(epoll_wait と epoll_pwait の違いは、epoll_pwait のマニュアル によると select と pselect の違いと同じです。さらに pselect のマニュアル を見ると、epoll_wait の処理前後に pthread_sigmask を呼び出し、その 3 つのステップ pthread_sigmask-epoll_wait-pthread_sigmask をアトミックに実行する処理が epoll_pwait になります。)

ev_epoll_linux.c抜粋
...
  } else if (!pollset->shutting_down) {
    /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
       (i.e 'kicking') a worker in the pollset. A 'kick' is a way to inform the
       worker that there is some pending work that needs immediate attention
       (like an event on the completion queue, or a polling island merge that
       results in a new epoll-fd to wait on) and that the worker should not
       spend time waiting in epoll_pwait().

       A worker can be kicked anytime from the point it is added to the pollset
       via push_front_worker() (or push_back_worker()) to the point it is
       removed via remove_worker().
       If the worker is kicked before/during it calls epoll_pwait(), it should
       immediately exit from epoll_wait(). If the worker is kicked after it
       returns from epoll_wait(), then nothing really needs to be done.

       To accomplish this, we mask 'grpc_wakeup_signal' on this thread at all
       times *except* when it is in epoll_pwait(). This way, the worker never
       misses acting on a kick */

    if (!g_initialized_sigmask) {
      sigemptyset(&new_mask);
      sigaddset(&new_mask, grpc_wakeup_signal);
      pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask);
      sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
      g_initialized_sigmask = true;
      /* new_mask:       The new thread mask which blocks 'grpc_wakeup_signal'.
                         This is the mask used at all times *except during
                         epoll_wait()*"
         g_orig_sigmask: The thread mask which allows 'grpc_wakeup_signal' and
                         this is the mask to use *during epoll_wait()*

         The new_mask is set on the worker before it is added to the pollset
         (i.e before it can be kicked) */
    }

    push_front_worker(pollset, &worker); /* Add worker to pollset */

    pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
                            &g_orig_sigmask, &error);
...

待機中のワーカースレッドのコールスタックは、SIG40 を受信したときのコールスタックと全く同じです。pollset_work → pollset_work_and_unlock → epoll_pwait という流れで epoll_pwait が呼ばれており、上でコメントとともに引用したコードの末尾にある pollset_work_and_unlock 呼び出しが待機状態を作り出します。

pollset_work の実装を見ると、epoll_wait 実行時以外では grpc_wakeup_signal をシグナル マスクに加え、epoll_pwait にはシグナル マスクから grpc_wakeup_signal を抜いたセットをパラメーターとして渡すことで、epoll_pwait 内部でのみ grpc_wakeup_signal をハンドルするように実装されています。当たり前ですが上記コメントに書かれている通りです。

もう一つ重要な個所は、シグナルを送信するコードです。同じファイル ev_epoll_linux.c の pollset_worker_kick 関数で、pthread_kill を呼んでいる箇所がそれです。

(gdb) bt 16
#0  __pthread_kill (threadid=140737299531520, signo=40)
    at ../sysdeps/unix/sysv/linux/pthread_kill.c:39
#1  0x00007ffff76a9562 in pollset_worker_kick (worker=0x7ffff4beb4c0)
    at src/core/lib/iomgr/ev_epoll_linux.c:1136
#2  0x00007ffff76a9881 in pollset_kick (p=0x7fffe8005410, specific_worker=0x7ffff4beb4c0)
    at src/core/lib/iomgr/ev_epoll_linux.c:1201
#3  0x00007ffff76af9d2 in grpc_pollset_kick (pollset=0x7fffe8005410,
    specific_worker=0x7ffff4beb4c0) at src/core/lib/iomgr/ev_posix.c:212
#4  0x00007ffff76d00dc in grpc_cq_end_op (exec_ctx=0x7fffefffeab0, cc=0x7fffe80052f0,
    tag=0x7ffff4beb8f0, error=0x0, done=0x7ffff76caf1f <finish_batch_completion>,
    done_arg=0x7ffff000cbe8, storage=0x7ffff000cbf0) at src/core/lib/surface/completion_queue.c:422
#5  0x00007ffff76cb311 in post_batch_completion (exec_ctx=0x7fffefffeab0, bctl=0x7ffff000cbe8)
    at src/core/lib/surface/call.c:1192
#6  0x00007ffff76cb34f in finish_batch_step (exec_ctx=0x7fffefffeab0, bctl=0x7ffff000cbe8)
    at src/core/lib/surface/call.c:1200
#7  0x00007ffff76cbd76 in finish_batch (exec_ctx=0x7fffefffeab0, bctlp=0x7ffff000cbe8, error=0x0)
    at src/core/lib/surface/call.c:1424
#8  0x00007ffff76afe2f in exec_ctx_run (exec_ctx=0x7fffefffeab0, closure=0x7ffff000cc10, error=0x0)
    at src/core/lib/iomgr/exec_ctx.c:102
#9  0x00007ffff76a4258 in grpc_closure_run (exec_ctx=0x7fffefffeab0, c=0x7ffff000cc10, error=0x0)
    at src/core/lib/iomgr/closure.c:132
#10 0x00007ffff76e2581 in grpc_chttp2_complete_closure_step (exec_ctx=0x7fffefffeab0,
    t=0x7ffff0001910, s=0x7ffff000c148, pclosure=0x7ffff000c210, error=0x0,
    desc=0x7ffff7757c40 "send_trailing_metadata_finished")
    at src/core/ext/transport/chttp2/transport/chttp2_transport.c:1125
#11 0x00007ffff76f9097 in grpc_chttp2_end_write (exec_ctx=0x7fffefffeab0, t=0x7ffff0001910,
    error=0x0) at src/core/ext/transport/chttp2/transport/writing.c:418
#12 0x00007ffff76e1d3b in write_action_end_locked (exec_ctx=0x7fffefffeab0, tp=0x7ffff0001910,
    error=0x0) at src/core/ext/transport/chttp2/transport/chttp2_transport.c:960
#13 0x00007ffff76a4d11 in grpc_combiner_continue_exec_ctx (exec_ctx=0x7fffefffeab0)
    at src/core/lib/iomgr/combiner.c:325
#14 0x00007ffff76afd76 in grpc_exec_ctx_flush (exec_ctx=0x7fffefffeab0)
    at src/core/lib/iomgr/exec_ctx.c:83
#15 0x00007ffff76aa76e in pollset_work (exec_ctx=0x7fffefffeab0, pollset=0x63e790, worker_hdl=0x0,
    now=..., deadline=...) at src/core/lib/iomgr/ev_epoll_linux.c:1573
(More stack frames follow...)

フレーム #15 を見ると、pollset_work において、待ち状態に入るときに呼ばれていた pollset_work_and_unlock から制御が返ってきた直後に呼ばれる grpc_exec_ctx_flush の延長上でシグナルが送信されています。grpc_pollset 構造体で管理される pollset は、利用可能なワーカースレッドを双方向の循環リンクトリストとして保持しています。下記に引用した部分では、1) pollset に自分を追加、2) ワークアイテムを取得するまで待機、3) ワークアイテムを実行、4) pollset から自分を削除、していると考えられます。ワークアイテムは、exec_ctx という変数を介してやり取りされています。

ev_epoll_linux.c抜粋
    push_front_worker(pollset, &worker); /* Add worker to pollset */ //<<<< pollset にスレッドを追加

    pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
                            &g_orig_sigmask, &error); //<<<< 待機
    grpc_exec_ctx_flush(exec_ctx);  //<<<< タスクを実行 (SIG40 もここから来る)

    gpr_mu_lock(&pollset->po.mu);

    /* Note: There is no need to reset worker.is_kicked to 0 since we are no
       longer going to use this worker */
    remove_worker(pollset, &worker);  //<<<< pollset からスレッドを削除

helloworld クライアントを実行したとき、SIG40 が発生しないこともあります。上で示した __pthread_kill のコールスタックにおいて、シグナルが来ない場合にどのフレームまで実行されているかを調べると、フレーム #2 の pollset_kick までは実行されて、#1 の pollset_worker_kick は実行されないことが分かります。pollset_kick の実装を見ると、概ね以下のような構造をしています。

ev_epoll_linux.c抜粋&簡略化
  if (worker != NULL) {
    if (worker == GRPC_POLLSET_KICK_BROADCAST) {
      // パターン 1: 全ワーカースレッドにブロードキャスト
      for (worker = p->root_worker.next; worker != &p->root_worker;
           worker = worker->next) {
        if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
          append_error(&error, pollset_worker_kick(worker), err_desc);
        }
      }
    } else {
      // パターン 2: 特定のワーカースレッドで処理
      if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
        append_error(&error, pollset_worker_kick(worker), err_desc);
      }
    }
  } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
    // パターン 3: 任意のワーカースレッドで処理
    worker = pop_front_worker(p);
    if (worker != NULL) {
      push_back_worker(p, worker);
      append_error(&error, pollset_worker_kick(worker), err_desc);
    }
  }

SIG40 が送信されたときのスタックを見ると、上記で「パターン 2」と注釈を入れた部分から pollset_worker_kick が呼ばれています。SIG40 が送信されないときでもパターン 2 のブロックを実行しますが、引数の specific_worker が指し示すスレッドと実行スレッドとが同一であり、スレッドを覚醒させる必要がないため、シグナルは送られません。specific_worker がどこから来るかと言えば、grpc_cq_end_op の中で、CompletionQueue に関連付けられた cc->pluckers という配列から tag を元に検索して見つけています。CompletionQueue の中に Pluck というコンセプトがあるようですが、その意味は深追いしていません。

以上から分かるのは、ワーカースレッドが処理するワークアイテムには、全スレッドにブロードキャストするもの、特定のスレッドで実行しなければならないもの、どのスレッドで実行してよいもの、の 3 種類があり、キューから取り出したアイテムが別スレッドで実行しなければならないワークアイテムだった場合に、シグナルを使ってその対象スレッドを覚醒させているということです。

ワークアイテムを実行する場所の一つが grpc_exec_ctx_flush です。実装を見ると、grpc_exec_ctx は複数のワークアイテムをリンクトリストの構造で保持しており、ループで順次実行しています。各ワークアイテムは、gRPC では grpc_closure という構造体で管理されています。具体的にどのようなワークアイテムが存在するのかという点については、ワークアイテムを初期化する grpc_closure_init が呼ばれる箇所を検索すると分かります。grpc_closure_init の第二引数でコールバック関数を指定しており、例えば、TCP パケットを読み書きすると思われる tcp_handle_read や tcp_handle_write などがあります。

おわりに

だらだらと書いてきましたが、まとめると以下の通りです。ストリーミングのメソッドはまだ試していないので分かりませんが、今回の内容を調べて得られた一つの知見としては、非同期サーバーを使うべき状況は限られる印象です。

  • 同期サーバーでは、ServerBuilder::SyncServerOption を使って CompletionPort 数、ワーカー スレッド数を設定できる
  • 同期サーバーも、内部的には非同期でリクエストを処理している
  • ワークアイテムを特定のスレッドで実行する際、待機状態のスレッドを覚醒させるためにリアルタイム シグナルが使われる

今回デバッグした gRPC のコードは、Linux におけるスレッドやシグナルの実装例としても参考になるものでした。解明しきれなかった点も多々あり抜け漏れだらけですが、目下のところ知りたかった情報は得られたので、とりあえず記事としてはここまでにします。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
19