7
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

RubyでgRPCのストリーミング

Posted at

概要

  • gRPCの双方向なストリーミング機能をRubyで利用する方法

前提

  • 以下でgRPC用のgemをインストール
$ gem install grpc
$ gem install grpc-tools

方法

protoファイル

  • service定義で、パラメータと戻り値に"stream"を付ける
sample.proto
syntax = 'proto3';

package sample;

service Echo {
    rpc Talk (stream Request) returns (stream Response);
}

message Request {
    string message = 1;
}

message Response {
    string message = 1;
}

コンパイル

grpc_tools_ruby_protoc --ruby_out=lib --grpc_out=lib sample.proto
  • 上記protoファイルをコンパイル
    • sample_pb.rbとsample_services_pb.rbが生成される

プログラム実装(例)

  • サーバ側、クライアント側双方、Enumeratorを介してやりとりする

サーバ側

sample_server.rb
#!/usr/bin/env ruby

this_dir = File.expand_path(File.dirname(__FILE__))
lib_dir = File.join(this_dir, 'lib')
$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)

require 'grpc'
require 'sample_services_pb'

class EchoServer < Sample::Echo::Service
  def talk(requests)
    return enum_for(:talk, requests) unless block_given?

    requests.each do |request|
      sleep rand(5)
      message = "received (#{request.message})"
      yield Sample::Response.new(message: message)
    end
  end
end

def main
  s = GRPC::RpcServer.new
  s.add_http2_port('localhost:50051', :this_port_is_insecure)
  s.handle(EchoServer)
  s.run_till_terminated
end

main
  • サーバ用クラスのメソッドの引数には、クライアント側から渡されたEnumeratorなオブジェクト(後述)が渡ってくる
  • 上記Enumeratorオブジェクトに対して、eachでブロックを渡す
    • yieldでレスポンス用のオブジェクトを渡すようにしておく
  • メソッドの戻り値としてEnumeratorオブジェクトを返す
    • 実行時にはブロックなしで呼ばれるので、ブロックなしで呼ばれたらObject#enum_forで自メソッドをEnumerator化して返すようにしておく
    • このEnumeratorオブジェクトがクライアント側のスタブの戻り値になる

クライアント側

  • サーバ側がObject#enum_forだったので、サンプル的にEnumerator.newで
sample_client.rb
#!/usr/bin/env ruby

this_dir = File.expand_path(File.dirname(__FILE__))
lib_dir = File.join(this_dir, 'lib')
$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)

require 'grpc'
require 'sample_services_pb'

def main
  requests = Enumerator.new do |yielder|
    loop do
      print "\ninput: "
      message = STDIN.gets.chomp
      next if message.size == 0
      break if %w(quit exit).include?(message)

      yielder << Sample::Request.new(message: message)
    end
  end

  stub = Sample::Echo::Stub.new('localhost:50051', :this_channel_is_insecure)

  responses = stub.talk(requests)
  responses.each do |response|
    puts "\nresponse: #{response.message}"
  end
end

main
  • リクエスト用にスタブのメソッドにEnumeratorオブジェクトを渡す
    • これがサーバ側メソッドの引数になる
  • スタブの戻り値にサーバ側から返したEnumeratorオブジェクトが渡ってくる
    • each実行でサーバ側メソッド(のenum_forでEnumerator化されたもの)が実行される
    • サーバ側メソッド内でeachが実行されることで、リクエスト用に渡したクライアント側のEnumerator(Enumerator.newしたもの)が実行される
    • Enumerator.newのブロック内でEnumerator::Yielder#<<に渡したリクエスト用オブジェクトがサーバ側のeachのブロック引数として渡る
    • サーバ側でyieldに渡されたレスポンス用オブジェクトが、クライアント側のeachのブロック引数として渡ってくる
7
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?