概要
- gRPCの双方向なストリーミング機能をRubyで利用する方法
前提
- 以下でgRPC用のgemをインストール
$ gem install grpc
$ gem install grpc-tools
方法
protoファイル
- service定義で、パラメータと戻り値に"stream"を付ける
sample.proto
syntax = 'proto3';
package sample;
service Echo {
rpc Talk (stream Request) returns (stream Response);
}
message Request {
string message = 1;
}
message Response {
string message = 1;
}
コンパイル
grpc_tools_ruby_protoc --ruby_out=lib --grpc_out=lib sample.proto
- 上記protoファイルをコンパイル
- sample_pb.rbとsample_services_pb.rbが生成される
プログラム実装(例)
- サーバ側、クライアント側双方、Enumeratorを介してやりとりする
サーバ側
sample_server.rb
#!/usr/bin/env ruby
this_dir = File.expand_path(File.dirname(__FILE__))
lib_dir = File.join(this_dir, 'lib')
$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
require 'grpc'
require 'sample_services_pb'
class EchoServer < Sample::Echo::Service
def talk(requests)
return enum_for(:talk, requests) unless block_given?
requests.each do |request|
sleep rand(5)
message = "received (#{request.message})"
yield Sample::Response.new(message: message)
end
end
end
def main
s = GRPC::RpcServer.new
s.add_http2_port('localhost:50051', :this_port_is_insecure)
s.handle(EchoServer)
s.run_till_terminated
end
main
- サーバ用クラスのメソッドの引数には、クライアント側から渡されたEnumeratorなオブジェクト(後述)が渡ってくる
- 上記Enumeratorオブジェクトに対して、eachでブロックを渡す
- yieldでレスポンス用のオブジェクトを渡すようにしておく
- メソッドの戻り値としてEnumeratorオブジェクトを返す
- 実行時にはブロックなしで呼ばれるので、ブロックなしで呼ばれたらObject#enum_forで自メソッドをEnumerator化して返すようにしておく
- このEnumeratorオブジェクトがクライアント側のスタブの戻り値になる
クライアント側
- サーバ側がObject#enum_forだったので、サンプル的にEnumerator.newで
sample_client.rb
#!/usr/bin/env ruby
this_dir = File.expand_path(File.dirname(__FILE__))
lib_dir = File.join(this_dir, 'lib')
$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
require 'grpc'
require 'sample_services_pb'
def main
requests = Enumerator.new do |yielder|
loop do
print "\ninput: "
message = STDIN.gets.chomp
next if message.size == 0
break if %w(quit exit).include?(message)
yielder << Sample::Request.new(message: message)
end
end
stub = Sample::Echo::Stub.new('localhost:50051', :this_channel_is_insecure)
responses = stub.talk(requests)
responses.each do |response|
puts "\nresponse: #{response.message}"
end
end
main
- リクエスト用にスタブのメソッドにEnumeratorオブジェクトを渡す
- これがサーバ側メソッドの引数になる
- スタブの戻り値にサーバ側から返したEnumeratorオブジェクトが渡ってくる
- each実行でサーバ側メソッド(のenum_forでEnumerator化されたもの)が実行される
- サーバ側メソッド内でeachが実行されることで、リクエスト用に渡したクライアント側のEnumerator(Enumerator.newしたもの)が実行される
- Enumerator.newのブロック内でEnumerator::Yielder#<<に渡したリクエスト用オブジェクトがサーバ側のeachのブロック引数として渡る
- サーバ側でyieldに渡されたレスポンス用オブジェクトが、クライアント側のeachのブロック引数として渡ってくる