概要
gRPCにはクライアントからストリーミングをキャンセルする機能が備わっている。
RESTだとクライアント側からキャンセルするために、イベントキャンセル用のAPIを作成したりするのだろうが、gRPCだと各言語に適した形でキャンセルが簡単にできる1。
ただし、ドキュメントや記事があまりないので、pythonのgrpcioで簡単な例を書いてみた。
実装したもの
ユーザーからキーを送ると、サーバー側でそのキーと時間経過のメッセージが作成され、サーバー側のプロンプトで表示、ユーザー側にメッセージをストリーミングで送るというもの。
ただし、クライアントがリクエストしたキーと同じキーがもう一度サーバに送られた場合、ストリーミングがキャンセルされる。
- ユーザーがキー入力
- サーバーでリクエストされてからの経過時間に応じたメッセージ作成
- 逐次サーバーからクライアントにメッセージ送信
- クライアントでリクエストしたキーと同じキーが入力された場合、ストリーミングをキャンセル

実装
protoファイル
gRPCではサービスの定義をprotoファイルに書く。
API定義が実装に必ず必要なので、ドキュメント作成が面倒なズボラ人間に最適(私のこと)。
サービス内のリクエスト、レスポンス側にstreamを付けるとストリーム処理になる。
ちなみに、ストリームの付け方で次のように定義される(ドキュメントより)
- Unary RPC: 1リクエスト、1レスポンス
- Server Streaming RPC: 1リクエスト、Nレスポンス(サーバー側がストリームを送る)
- Client Streaming RPC: Nリクエスト、1レスポンス(クライアント側がストリームを送る)
- Bidirectional Streaming RPC: Nリクエスト、Nレスポンス(両方ともストリームを送る)
つまり今回のはServer Streaming RPC。
protoファイル書いたら、各言語のツール(pythonならgrpc_tools)を使ってコードを自動生成する。
syntax = 'proto3';
message Request {
string key = 1;
}
message Response {
string message = 1;
}
service Message {
rpc Stream(Request) returns (stream Response) {}
}
サーバー
grpcioのフレームワークに則って実装するだけでいいので簡単だった。
チュートリアルも充実しているので悩むところはあまりない。
ちなみに、サーバーから通信をキャンセルするにはcontext.cancel()すればいい。
import time
from concurrent import futures
import grpc
import message_pb2
import message_pb2_grpc
class MessageServicer(message_pb2_grpc.MessageServicer):
def Stream(self, request, context):
for i in range(1, 11):
time.sleep(1)
message = 'key: {}, progress: {}'.format(request.key, '+' * i)
print(message)
yield message_pb2.Response(message=message)
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
message_pb2_grpc.add_MessageServicer_to_server(
MessageServicer(),
server)
server.add_insecure_port('[::]:50051')
server.start()
print('Server started !')
try:
while True:
time.sleep(24 * 60 * 60)
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
serve()
クライアント
これもチュートリアルが充実しているの悩むポイントは少なかった。
しかし、チュートリアルやドキュメントにクライアントからキャンセルするAPIに関する説明がない。
grpcioのドキュメントからcontextとfutureにcancelが実装されていることがわかるが、スタブから受け取るストリームは_Rendezvousオブジェクトでドキュメントにはその説明が全くない。
どこからキャンセルすればいいのか分からなくなったが、ソースを見ると_Rendezvous2オブジェクトがFutureクラスを継承しておりcancelを実装していたので一安心した。
_Rendezvousの実装
戒め: ソースをみろ。
from concurrent import futures
import grpc
import message_pb2
import message_pb2_grpc
def stream(stub, key, key_stack):
# _Rendezvousオブジェクトを受け取る
responses = stub.Stream(message_pb2.Request(key=key))
for res in responses:
if key not in key_stack:
responses.cancel()
# これでもいい
# res.cancel()
key_stack.discard(key)
def run():
executor = futures.ThreadPoolExecutor(max_workers=10)
key_stack = set()
with grpc.insecure_channel('localhost:50051') as channel:
stub = message_pb2_grpc.MessageStub(channel)
while True:
print('Stacking keys: {}'.format(key_stack))
user_input = input('Input key: > ')
if user_input not in key_stack:
print('Create stream, key: ', user_input)
executor.submit(lambda: stream(stub, user_input, key_stack))
key_stack.add(user_input)
else:
print('Cancel stream, key: ', user_input)
key_stack.discard(user_input)
if __name__ == '__main__':
run()
まとめ
pythonには型ヒントが基本的にないので、protoファイルからの自動生成コードの利点が薄れた。
やはりGoで書くのが一番わかりやすいのかもしれない(Goはサーバーもクライアントもcontextでcancelなどを制御する)。
gRPCの体験的には定義ファイル書けばおおよそのコードが自動生成されるので、思ったより楽だった。
全実装は以下のリポジトリに置いた。
https://github.com/crambon/grpc-stream-cancel-example
※gRPCの勉強の真っ最中なので間違っていたら、教えていただけると嬉しいです。