はじめに
おじさんはgRPC でサービスを実装しているのですが、今回、「非同期に」処理することを求められました。非同期でのリクエストの処理とはなんぞ?と思い調べ、動作確認したのでメモします。
はじめに、内容に対してタイトルがミスリーディングだったら申し訳ありません、と謝っておきます。
gRPCの使い方。
RPC=remote procedure controlで、g = google だったと思います。おじさんにはjsonとWEB API が、 protocol buffer と gRPCに置き換わったようなイメージです。
定義ファイルでメッセージ(protocol buffer)の定義とサービス(gRPC)の定義を作ったあと、コンパイルツールを使うと
- 定義したデータを送るためのProtocol bufferのライブラリ
- それらを受け渡しするサービスのひな形
が作られます。プログラマはそのひな形を継承したクラスを実装するという感じです。
何をしたいか(非同期通信?)
実は、おじさんも「クライアントが何をしたいのか」内容を理解していないのですが、どうやら、一つの処理について、実行中の中間状態や、状態遷移したときの状態の情報を取得するために使いたい、ということのようです。クライアント・サーバの通信で考えると、一つのリクエストにより処理が開始し、実行中の状態を逐次レスポンスで返すのかなとイメージしました。
情報整理とテスト実装
というわけで、以下メモです。
最初にサンプルコードを調べた
まず、サンプルコードを読みました。
ここのgrpc/examples/protos/ にproto ファイルの実装例があります。どうやら、やろうとしていることは "responce-streaming gRPC" というものらしいです。
サンプルのhelloworld.proto を見ると、
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
ですが、これが hellostreamingworld.proto では、
// The greeting service definition.
service MultiGreeter {
// Sends multiple greetings
rpc sayHello (HelloRequest) returns (stream HelloReply) {}
}
となっています。これを使った実装を探しましたが、、、良く分からず。
とりあえず、試しに実装してみることにしました。
なお、hello world のpython 実装は grpc/examples/python/helloworld にあり、定義は grpc/proto/helloworld.proto にあります。
実装例
自分でも、動かしてみました。定義は、
- リクエストを受けるとサーバ側で処理が始まる
- サーバ側の処理は状態遷移をして、状態遷移ごとに状態をクライアントに返す
というイメージで作りました。
メッセージとサービスの定義(*.proto)
まず、proto ファイルですが、以下のようにしました。
request と responce で送るメッセージを定義します。サービスは、一つのリクエストに対して、状態遷移ごとにresponce を返すので、stream で定義しています。
syntax = "proto3";
package statedef;
service Worker{
rpc doThis(WorkRequest) returns (stream WorkResponce){}
}
message WorkRequest{
string name = 1;
string val = 2;
}
message WorkResponce{
string report_message = 1;
}
このファイルをコンパイルします。
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. statedef.proto
サーバ側の実装
ひとつのリクエストに対して一つのresponce を返すには、responce のインスタンスを return するだけでした。これを複数返すには、yield で返すようにします。
from concurrent import futures
from logging import basicConfig, getLogger, DEBUG
from time import sleep
from datetime import datetime
import grpc
import statedef_pb2
import statedef_pb2_grpc
class Worker(statedef_pb2_grpc.WorkerServicer):
_logger = getLogger()
def doThis(self, request, context):
Worker._logger.debug("start job")
sleep(3)
yield statedef_pb2.WorkResponce(report_message="Now in 1st state")
sleep(3)
yield statedef_pb2.WorkResponce(report_message="Now in 2nd state ")
sleep(3)
yield statedef_pb2.WorkResponce(report_message="Now in final state.")
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
statedef_pb2_grpc.add_WorkerServicer_to_server(Worker(), server)
server.add_insecure_port('[::]:50051')
server.start()
server.wait_for_termination()
if __name__ == '__main__':
basicConfig( level=DEBUG, format='{asctime} [{levelname}] {name}: {message}', style='{')
serve()
yield で返す以外、実装面で特筆すべきことはなく。
実行例
server側を実行します。
$ python worker_server.py
別ターミナルでclient 側を動かします。
$ python client_test.py
2021-08-29 10:31:05,374 [INFO] __main__: report_message: "Now in 1st state"
2021-08-29 10:31:08,379 [INFO] __main__: report_message: "Now in 2nd state "
2021-08-29 10:31:11,384 [INFO] __main__: report_message: "Now in final state."
このとき、サーバ側のターミナルには以下のようにログが出力されます。
2021-08-29 10:31:02,368 [DEBUG] root: start job
2021-08-29 10:31:05,372 [DEBUG] root: 1st task finished.
2021-08-29 10:31:08,377 [DEBUG] root: 2nd task finished.
2021-08-29 10:31:11,382 [DEBUG] root: 3rd task finished.
実際には3秒寝ているだけですが、タスクを終了して進捗している感があります。^^;
(おまけ)gRPC の準備
以下、仮想環境で実行するための準備です。
$ virtualenv venv
$ source ./venv/bin/actovate
$ python -m pip install --upgrade pip
$ python -m pip install grpcio
$ python -m pip install grpcio-tools
終わったら
(venv) $ deactivate
昔の自分向けに書きました。^^;)
参考情報
ぐぐっていたら asynio というのが出てきたが、今回はこれは関係なさそう。
GO言語での実装を紹介されています。
まとめ
gRPCで、一つのリクエストに対して、進捗状況の各ステップを返すようなレスポンスの返し方ができるようになった。これで仕事の方も乗り切れると良いが。
TODO:
- gRPC とprotocol buffer について情報整理してメモしておきたい
- 記事の中で通信方式を表にして分かり易くしたいな。
おじさん、こんなことに使って人生が終わらないか心配。誰かの役に立てばよいが、とりあえずは自分メモにはなりそう。
(2021/08/29)