2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

gRPCでクライアント側からストリーミングキャンセル

Posted at

概要

gRPCにはクライアントからストリーミングをキャンセルする機能が備わっている。
RESTだとクライアント側からキャンセルするために、イベントキャンセル用のAPIを作成したりするのだろうが、gRPCだと各言語に適した形でキャンセルが簡単にできる1
ただし、ドキュメントや記事があまりないので、pythonのgrpcioで簡単な例を書いてみた。

実装したもの

ユーザーからキーを送ると、サーバー側でそのキーと時間経過のメッセージが作成され、サーバー側のプロンプトで表示、ユーザー側にメッセージをストリーミングで送るというもの。
ただし、クライアントがリクエストしたキーと同じキーがもう一度サーバに送られた場合、ストリーミングがキャンセルされる。

  1. ユーザーがキー入力
  2. サーバーでリクエストされてからの経過時間に応じたメッセージ作成
  3. 逐次サーバーからクライアントにメッセージ送信
  4. クライアントでリクエストしたキーと同じキーが入力された場合、ストリーミングをキャンセル
Screen Shot 2019-09-08 at 14.10.09.png

実装

protoファイル

gRPCではサービスの定義をprotoファイルに書く。
API定義が実装に必ず必要なので、ドキュメント作成が面倒なズボラ人間に最適(私のこと)。
サービス内のリクエスト、レスポンス側にstreamを付けるとストリーム処理になる。
ちなみに、ストリームの付け方で次のように定義される(ドキュメントより)

  • Unary RPC: 1リクエスト、1レスポンス
  • Server Streaming RPC: 1リクエスト、Nレスポンス(サーバー側がストリームを送る)
  • Client Streaming RPC: Nリクエスト、1レスポンス(クライアント側がストリームを送る)
  • Bidirectional Streaming RPC: Nリクエスト、Nレスポンス(両方ともストリームを送る)

つまり今回のはServer Streaming RPC。
protoファイル書いたら、各言語のツール(pythonならgrpc_tools)を使ってコードを自動生成する。

message.proto
syntax = 'proto3';

message Request {
  string key = 1;
}

message Response {
  string message = 1;
}

service Message {
  rpc Stream(Request) returns (stream Response) {}
}

サーバー

grpcioのフレームワークに則って実装するだけでいいので簡単だった。
チュートリアルも充実しているので悩むところはあまりない。
ちなみに、サーバーから通信をキャンセルするにはcontext.cancel()すればいい。

grpc_server.py
import time
from concurrent import futures

import grpc
import message_pb2
import message_pb2_grpc


class MessageServicer(message_pb2_grpc.MessageServicer):
    def Stream(self, request, context):
        for i in range(1, 11):
            time.sleep(1)
            message = 'key: {}, progress: {}'.format(request.key, '+' * i)
            print(message)
            yield message_pb2.Response(message=message)


def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    message_pb2_grpc.add_MessageServicer_to_server(
        MessageServicer(),
        server)
    server.add_insecure_port('[::]:50051')
    server.start()
    print('Server started !')
    try:
        while True:
            time.sleep(24 * 60 * 60)
    except KeyboardInterrupt:
        server.stop(0)


if __name__ == '__main__':
    serve()

クライアント

これもチュートリアルが充実しているの悩むポイントは少なかった。
しかし、チュートリアルやドキュメントにクライアントからキャンセルするAPIに関する説明がない。
grpcioのドキュメントからcontextとfutureにcancelが実装されていることがわかるが、スタブから受け取るストリームは_Rendezvousオブジェクトでドキュメントにはその説明が全くない。
どこからキャンセルすればいいのか分からなくなったが、ソースを見ると_Rendezvous2オブジェクトがFutureクラスを継承しておりcancelを実装していたので一安心した。
_Rendezvousの実装

戒め: ソースをみろ。

grpc_client.py
from concurrent import futures

import grpc
import message_pb2
import message_pb2_grpc


def stream(stub, key, key_stack):
    # _Rendezvousオブジェクトを受け取る
    responses = stub.Stream(message_pb2.Request(key=key))
    for res in responses:
        if key not in key_stack:
            responses.cancel()
            # これでもいい
            # res.cancel()
    key_stack.discard(key)


def run():
    executor = futures.ThreadPoolExecutor(max_workers=10)
    key_stack = set()
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = message_pb2_grpc.MessageStub(channel)
        while True:
            print('Stacking keys: {}'.format(key_stack))
            user_input = input('Input key: > ')
            if user_input not in key_stack:
                print('Create stream, key: ', user_input)
                executor.submit(lambda: stream(stub, user_input, key_stack))
                key_stack.add(user_input)
            else:
                print('Cancel stream, key: ', user_input)
                key_stack.discard(user_input)


if __name__ == '__main__':
    run()

まとめ

pythonには型ヒントが基本的にないので、protoファイルからの自動生成コードの利点が薄れた。
やはりGoで書くのが一番わかりやすいのかもしれない(Goはサーバーもクライアントもcontextでcancelなどを制御する)。
gRPCの体験的には定義ファイル書けばおおよそのコードが自動生成されるので、思ったより楽だった。
全実装は以下のリポジトリに置いた。
https://github.com/crambon/grpc-stream-cancel-example

※gRPCの勉強の真っ最中なので間違っていたら、教えていただけると嬉しいです。

  1. 各言語によってcancelの実装は結構まちまちな印象。

  2. ちなみにRandezvous(ランデブー)の意味は待ち合わせなど。

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?