41
41

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

[翻訳] Elixirの外側へ: Portを使って外部プログラムを実行する

Last updated at Posted at 2015-08-29

Saša Jurićさんの2015年8月18日付のブログ記事、OUTSIDE ELIXIR: RUNNING EXTERNAL PROGRAMS WITH PORTSの翻訳です。
ElixirはBeagle Bone BlackやRaspberry Pi 2でも動作するのでそれならば最低でもLチカぐらいはさせたいところ。
そう考えて少し調べたらPortというものを使えば外部プログラムを走らせられる…つまり外部のC言語あたりで書いたLチカプログラムとElixirが連携できそうです。
この記事では主にRubyと話をする例が出てきます。

ちょっと長いです。


場合によっては何かのシステムの一部を実装するのにErlang/Elixirではない言語を使う方が有益なことがあります。私は少なくとも2つ、そうするほうがいい理由を知っています。まず、いくつかの特定の機能についてライブラリがないか他の言語に比べて同じような機能を持つライブラリが十分成熟しておらず、それをElixirで実装しようとすると大変な努力を要する場合。もうひとつの理由は、私の経験上まず出くわさないのですが、Elixirが得意としない、例えばネイティブのCPU速度が欲しい場合です。それでも、もしCPUを集中して使わなければならない箇所に速度に対する強い要求があって、1マイクロ秒1マイクロ秒が重要なのであればErlangでは歯がたたないでしょう。

Erlangが最適なツールではないかもしれない状況もあるでしょう。それでも、それはErlangを全く捨て去るに十分な理由ではありません。なぜならそれはErlangがある機能に対して適していないというだけで、他のシステムのほとんどを動かすためのよい選択肢ではない、ということにはならないからです。さらにもしErlangを使い続けるにしてもいくつかの部分の実装を他の言語に頼ることも可能です。Erlangはそのためにいくつかの手段を提供していますが、私の個人的な意見として、最も説得力のあるオプションは外部のプログラムをErlangから起動するのにportを使うことです。私はまずこのやり方を最初に考えてみて、特殊な場合にだけ他の方法に切り替えます。なのでこの記事ではportについてお話し、話を終わる前に簡単に他のオプションとトレード・オフについて議論することにします。

基礎理論

Erlangのportはプロセス固有のリソースです。portはあるプロセスによって所有され、そのプロセスだけがportと話すことができます。オーナープロセスが終了するとportもクローズされます。システム中では多数のportを生成することができ、ひとつのプロセスは複数のportを持つことができます。あるプロセスから別のプロセスにportの所有権を渡すことができる、ということも言っておいた方がいいですね。

portの実例としてはファイルハンドルやネットワークソケットがあります。それらはオーナープロセスに関連付けられており、プロセスが終了すればクローズされます。これによりちゃんと構成されたOTPアプリケーションは適切なクリーンアップ処理を行うことができます。監視ツリーの一部を停止させたならば、終了したプロセスによって所有されていた全てのリソースがクローズされます。

実装の観点から言うと、portには2つの種類があります。VM自体の中で直接実行されるコードで(port driver)、またはBEAM1の外で外部OSプロセスとして動作します。どちらにしても先の原則は順守していますし、使うのは:erlangモジュールのport関連の関数の小さなラッパーであるPortモジュールが公開しているほとんど同じ関数のセットです。この記事では外部プロセスとしてのportに焦点を当てます。これは最速のオプションではありませんが、フォールトトレラントな特性を維持しているという点からこれが賢明なやり方であると信じています。

始める前に、Alexei SholikのPorcelainライブラリについても言及しておきましょう。これはいくつかの場合においてportの作業を簡単にすることができます。必ずチェックしておくべきだと思いますが、この記事では抽象化の余計なレイヤーを避けるためにPortモジュールだけを使います。

最初の一手

簡単な例を見てみましょう。この練習ではErlang VMからRubyのコードを実行するためのサポート機能について紹介します。この状況でErlangからRubyのプロセスを起動し、それにRubyのコマンドを送ります。プロセスはそれらのコマンドを評価しレスポンスをErlangに返すこともあります。またRubyのコマンドが同じ状態を共有できるようにRubyインタプリタをステートフルにします。もちろん複数のRubyのインスタンスを起動できますし、各インスタンスを分離しておけます。

最初の手順は簡単です。portを通して外部のプログラムを走らせるにはPort.open/2を使ってportをオープンし、外部のプログラムを起動するコマンドを与える必要があります。その後、Port.command/2でそのプログラムにリクエストを発行します。もしプログラムが何かを送り返したならpipeのオーナープロセスはそれを受け取ることができます。これは古典的なメッセージパッシングのやり方に極めて類似しています。

一方で、外部プログラムは標準入出力をオーナープロセスとの会話に使います。基本的に標準入力から読み込み、入力を解釈し、やるべき作業を行い、場合によっては結果を標準出力にメッセージとして出力しそのメッセージはErlangのプロセスに返されます。プログラムが標準入力からEOFを検出するとオーナープロセスがportをクローズしたと考えられます。

実際に動かしてみてみましょう。まず、外部プログラム、この場合はRubyインタプリタを開始するコマンドを定義します1

cmd = ~S"""
  ruby -e '
    STDOUT.sync = true
    context = binding

    while (cmd = gets) do
      eval(cmd, context)
    end
  '
"""

この簡単なプログラムは行を標準入力から読み込んでそれを同じコンテキストで評価するので前のコマンドの現在のコンテキストに与える副作用が明白になっています。STDOUT.sync = trueによってどんな出力も即座にフラッシュされオーナーのErlangプロセスに返されます。

これでportを開始できます。

port = Port.open({:spawn, cmd}, [:binary])

2番めの引数はportのオプションを含みます。今は:binaryオプションだけ与えて外部プログラムからの受信データをバイナリとして受け取ることだけ指定します。後で他のオプションも利用しますが、利用可能な全オプションについては公式ドキュメントを読むことをおすすめします。

Rubyインタプリタが実行パス上のどこかにあるならば、上記のコードにより関連するOSプロセスが起動します。そしてPort.command/2を使ってそのプロセスと話せます。

Port.command(port, "a = 1\n")
Port.command(port, "a += 2\n")
Port.command(port, "puts a\n")

思い切り素直ですね。いくつかのメッセージを、受け取り側がちゃんと受け取れるように改行を挟んでportに送るだけです(受け取り側が各行を受け取るのにgetsを使っているから)。Rubyプログラムはこれらの式を(そう上で記述したように)評価します。一番最後の式で変数の値を出力しています。最後のステートメントはオーナープロセスへのメッセージに結果を返します。そのメッセージはいつもどおりreceiveで受け取ることができます。

receive do
  {^port, {:data, result}} ->
    IO.puts("Elixir got: #{inspect result}")
end

全ソースはこちらにあります。

プログラムの終了

大事なことなので繰り返しますが、portはオーナープロセスが終了するときにクローズされます。加えてオーナープロセスは明示的にPort.close/1を使ってもportをクローズすることができます。portがクローズされても外部プログラムは勝手に終了はしませんが通信に使われていたパイプはクローズされます。外部プログラムは標準入力からEOFを受け取ったときに何かする…例えば終了することができます。

これは既に我々が書いたRubyプログラムでは実現できていて:

while (cmd = gets) do
  eval(cmd, context)
end

portが閉じられるとgetsがnilを返してループを抜けるため、プログラムは確かに終了します。

しかしいくつか注意があります。ループの中で評価していることにお気づきでしょうか。もしcmdの中のコードが実行に長い時間を要すると外部プログラムはportがクローズされたあとも生き残ってしまいます。これは単純に現在のリクエストの処理についてプログラムがビジー状態であるということなんですが、そのため反対側のオーナープロセスがportをクローズしたということが検知できません。もし確実にすぐに終了して欲しいなら処理を別スレッドで実行し、メインスレッドは通信に集中し続けるようなやり方を考えてください。

他に問題としてはportをクローズするということは双方のパイプも閉じてしまうということが挙げられます。これによりもしEOFを与えてから出力を生成するようなツールを直接使いたいときに問題が見えてきます。portの状況としてこれが起こると既に入出力どちらのパイプもクローズされているのでツールは標準出力を通じて何も返せません。これについては数多くの議論(例えばここ)が行われています。本質的に、外部プログラムをリクエストを待ち受け、何か処理して、場合によっては結果を投げ返すサーバーとして作ってしまえばこれについては気にする必要はありません。しかしながらもし元々portとして実行されるように書かれていないプログラムを再利用する場合は特別に書いたスクリプトでラップするなり何かの回避策を得られるライブラリ、例えば前述したPorcelainなどに頼ることになります。

メッセージをパックする

オーナープロセスとport間の通信はデフォルトでストリームになっています。これはメッセージがどこからどこまでひとかたまりであるかについては保証されないということです。ですから何らかの方法でメッセージを自力で一文字ずつパースする必要があります。

前のRubyの例では(getsを使うことで)改行に頼って、コマンドの区切りとしての役割を持たせていました。これは手っ取り早い対処法ではありますが、複数行のコマンドを渡せません。そのうえElixirでメッセージを受信した場合、どこまでが一区切りかについて保証されません。データはストリームから出力されたそのままで返されるのでひとつのメッセージが複数のレスポンスを含んでいる場合もしくはひとつのレスポンスが複数のメッセージにまたがっている場合があるからです。

単純な解決策としてはメッセージのサイズに関する情報をメッセージ自身に含めることです。これは{:packet, n}オプションをPort.open/2に与えることで可能です:

port = Port.open({:spawn, cmd}, [:binary, {:packet, 4}])

portに送られる各メッセージはnバイト(この例では4バイト)の残りのメッセージのサイズを表す値から始まります。このサイズは符号なしのbig-endianの整数としてエンコードされます。

外部プログラムは4バイトの整数を読み込みその値のバイト数を読み込んでメッセージのペイロードを取得します:

def receive_input
  encoded_length = STDIN.read(4)                # メッセージの大きさを取得する
  return nil unless encoded_length

  length = encoded_length.unpack("N").first     # 整数に変換
  STDIN.read(length)                            # メッセージを読み込む
end

このreceive_inputを評価ループで使えば:

while (cmd = receive_input) do
  eval(cmd, context)
end

この変更でElixirのクライアントが複数行のステートメントを送ることができるようになります:

Port.command(port, "a = 1")
Port.command(port, ~S"""
  while a < 10 do
    a *= 3
  end
""")

もしRubyのプログラムがErlangにメッセージを送り返す必要がある場合にはメッセージのサイズも含む必要があります。

def send_response(value)
  response = value.inspect
  STDOUT.write([response.bytesize].pack("N"))
  STDOUT.write(response)
  true
end

Elixirのコードはsend_responseをRubyのコードに何かを返させるのに使えます。レスポンスが正しく区切られていることを証明するために2つのレスポンスを送ってみましょう。

Port.command(port, ~S"""
  send_response("response")
  send_response(a)
""")

これはElixir側で2つのメッセージになります:

receive do
  {^port, {:data, result}} ->
    IO.puts("Elixir got: #{inspect result}")
end

receive do
  {^port, {:data, result}} ->
    IO.puts("Elixir got: #{inspect result}")
end

# Elixir got: "\"response\""
# Elixir got: "27"

完全なソースはここにあります。

メッセージのエンコード/デコード

これまでの例は素のままのの文字列をメッセージとして扱ってきました。もっと複雑なシナリオでは様々なデータ型を扱う必要があります。そのための特別なサポートというのはありません。基本的にあるプロセスとあるポートはバイトのシーケンスをやりとりしているだけ2なのでデータの型付けを容易にする何らかのエンコード/デコードの仕組みをプログラマが実装しなければなりません。この目的のためには例えばJSONのようなよく知られたフォーマットも使えます。

この例では、Erlang's External Term Format(ETF)を使います。:erlang.term_to_binary/1:erlang.binary_to_term/1で簡単にErlangの表現をETFにエンコード/ETFからデコードすることができます。これの優れた利点はElixir側でサードパーティ製のライブラリを全く必要としないことです。

では実際の例を見てみましょう。素のままの文字列の代わりに、{:eval, command}タプルをRuby側に送ります。Rubyのプログラムは:evalのタグがついたタプルを受信した場合だけコマンドを実行します。加えてレスポンスを返す場合にはやはり{:resonse, value}のタプルの形式をとることにします。ここでvalueがErlangの表現になります。

Elixir側では{:eval, command}タプルをportに送るためのヘルパーとなるラムダ式を導入します。これは単純にcommandをタプルにパックしてETFバイナリにエンコードするだけです。

send_eval = fn(port, command) ->
  Port.command(port, :erlang.term_to_binary({:eval, command}))
end

この関数は次のように使います:

send_eval.(port, "a = 1")
send_eval.(port, ~S"""
  while a < 10 do
    a *= 3
  end
""")
send_eval.(port, "send_response(a)")

Ruby側ではETFバイナリをデコードしなければなりません。このためにはサードパーティ製のライブラリの助けが必要です。さーっと(そしてかる~く)探してみてerlang-etf gemを使うことにしました。次のような内容のGemfileがいります:

source "https://rubygems.org"

gem 'erlang-etf'

gemを取得するのにbundle installの実行が必要です。

さて、Rubyコードに必要なgemをrequireします。

require "bundler"
require "erlang/etf"
require "stringio"

次にread_input関数をバイトシーケンスをデコードできるように変更します:

def receive_input
  # ...

  Erlang.binary_to_term(STDIN.read(length))
end

評価ループは入力されたメッセージがタプルで、:evalアトムを最初の要素として持っていることをチェックする必要があります:

while (cmd = receive_input) do
  if cmd.is_a?(Erlang::Tuple) && cmd[0] == :eval
    eval(cmd[1], context)
  end
end

次はsend_response{:response, value}の形にエンコードできるようにします。

def send_response(value)
  response = Erlang.term_to_binary(Erlang::Tuple[:response, value])
  # ...
end

Elixir側に戻って、レスポンスを:erlang.binary_to_term/1でデコードできるようにします:

receive do
  {^port, {:data, result}} ->
    IO.puts("Elixir got: #{inspect :erlang.binary_to_term(result)}")
end

# Elixir got: {:response, 27}

注意して欲しいのは今度は値が整数(前は文字列でした)として受け取られている、ということです。これはレスポンスがRuby側でETFとしてエンコードされていたからです。

完全なコードはここにあります。

標準入力をバイパスする

標準入出力を経由した通信はちょっと残念なところがあります。もし外部プログラムで何か出力したい場合、例えばデバッグ目的などでしょうが、その出力はErlangに送り返されてしまうだけだからです。幸運にもこれはErlangにプログラムと通信するのにファイルディスクリプタの3番と4番を使うように教えてあげるだけで回避できます。ありそうな警告として…この機能はWindowsでも動くかどうかはわかりません3

この変更はとても簡単です。:nouse_stdioオプションをPort.open/2に与えるだけです:

port = Port.open({:spawn, cmd}, [:binary, {:packet, 4}, :nouse_stdio])

Ruby側でも3番、4番のファイルをオープンしてやります。出力ファイルはバッファーなしにしてください:

@input = IO.new(3)
@output = IO.new(4)
@output.sync = true

最後にSTDINSTDOUTへの参照をそれぞれ@input@outputに置き換えてやります。このコードは話を簡単にするために省きます。

これらの変更を加えたら、Rubyプロセスでデバッグメッセージを出力できるようになります:

while (cmd = receive_input) do
  if cmd.is_a?(Erlang::Tuple) && cmd[0] == :eval
    puts "Ruby: #{cmd[1]}"
    res = eval(cmd[1], context)
    puts "Ruby: => #{res.inspect}\n\n"
  end
end

puts "Ruby: exiting"

これは以下の様な出力になります:

Ruby: a = 1
Ruby: => 1

Ruby:   while a < 10 do
    a *= 3
  end
Ruby: => nil

Ruby: send_response(a)
Ruby: => true

Elixir got: {:response, 27}
Ruby: exiting

このコードはここで取得可能です。

サーバープロセスの中にportをラップする

portでの通信はメッセージパッシングに非常に依存しているのでportをGenServerの中で管理するに値します。これにはいくつかの利点があります:

  • サーバープロセスはクライアントに抽象化されたAPIを提供できます。例えばRubyServer.castRubyServer.call'といったものを公開することもできます。最初の操作は出力生成なしでコマンドを発行します。二番目のはRubyプログラムにsend_response`を起動するように指示し、レスポンスを送り返させます。加えてサーバープロセスはレスポンスのメッセージをクライアントプロセスに通知することで扱います。Erlangとプログラムの結合はサーバープロセスのコード内に置かれます。
  • サーバープロセスはportに発行される各リクエストについてユニークなIDを追加で含めることができます。RubyプログラムはこのIDをレスポンスメッセージに入れることもでき、サーバーはレスポンスと特定のクライアントリクエストを確実に一致させることができます。
  • サーバープロセスはRubyプログラムがクラッシュした場合にそれを検出でき、サーバープロセス自身もクラッシュさせることができます。

そのようなサーバーの利用例を見てみましょう:

{:ok, server} = RubyServer.start_link

RubyServer.cast(server, "a = 1")
RubyServer.cast(server, ~S"""
  while a < 10 do
    a *= 3
  end
""")

RubyServer.call(server, "Erlang::Tuple[:response, a]")
|> IO.inspect

# {:response, 27} 

もちろん、他のRubyインタプリタを生成しても問題ありません:

{:ok, another_server} = RubyServer.start_link
RubyServer.cast(another_server, "a = 42")
RubyServer.call(another_server, "Erlang::Tuple[:response, a]")
|> IO.inspect

# {:response, 42}

これらの2つのサーバーは異なるインタプリタと通信でき、オーバーラップはありません:

RubyServer.call(server, "Erlang::Tuple[:response, a]")
|> IO.inspect

# {:response, 27}

最後に、Rubyプログラム内でのクラッシュはGenServerで補足されそれ自身をクラッシュさせます:

RubyServer.call(server, "1/0")

# ** (EXIT from #PID<0.48.0>) an exception was raised:
#     ** (ErlangError) erlang error: {:port_exit, 1}
#         ruby_server.ex:43: RubyServer.handle_info/2
#         (stdlib) gen_server.erl:593: :gen_server.try_dispatch/4
#         (stdlib) gen_server.erl:659: :gen_server.handle_msg/5
#         (stdlib) proc_lib.erl:237: :proc_lib.init_p_do_apply/3

この実装はほとんど以前に述べたテクニックの焼き直しなので特にここで説明しません。唯一新しいのは:exit_statusオプションをPort.open/2に付け足したことです。このオプションにより確実にオーナープロセスが{port, {:exit_status, status}}メッセージを受信しportのクラッシュに対して何かをすることができます。このようにGenServer`自身を自分で実装するか、私の基本的な解法を解析するのがよいでしょう。

portの代替手段

他の事柄と同じように、portsにも関連したトレード・オフがあります。最も明白なものはエンコードとパイプ経由の通信によるパフォーマンスへの影響です。portを使っての実際の処理の時間が短いものであればオーバーヘッドは許容しがたいものになるでしょう。「そんなんだめじゃん」って声が聞こえるようですが
portは外部プログラムがある程度のガッツリした仕事量、例えばミリ秒オーダーで測られるような、をこなす場合に適していると言っておきましょう。

加えて、portはオーナーと結合されます(逆もしかりです)。もしオーナーが停止したら外部プログラムも止めたいところですよね。そうでなければ再起動されたオーナーは新しいプログラムのインスタンスを起動し、前のインスタンスはErlangと何も通信できなくなったままになります。

案件があなたの特定のケースに適切であれば、代替手段を考えてもよいでしょう。

  • Portドライバー(リンクドインドライバーとも呼ばれる)はportと同等の特性を持ちますが、外部プログラムは起動されません。その代わりにC/C++で実装されたコードがVM内部で直接実行されます。
  • NIF(Native Implemented Functions)はErlangの関数をCで実装してBEAM内部で走らせるために使います。Portドライバーとは異なりNIFは特定のプロセスに結び付けられません。
  • プログラムをErlangのノードのようにすることも可能です。CやJavaによるヘルパーライブラリがいくつか提供されています。クラスタ内の他のノードと同じようにあなたが書いたErlangノードはプログラムと通信できます。
  • もちろん、"microservice"スタイルをとることも可能です: 分離されたプログラムを起動し、HTTPインターフェイスを公開しておいてErlangシステムがそれと話せるようにするわけです。

最初の2つの代替手段は安全性は犠牲になりますが、大幅の実行速度改善が見込めるでしょう。NIFやPortドライバの中でのちゃんと対応されていない例外はBEAM全体をクラッシュさせるかもしれません。そのうえNIFもportドライバもスケジューラスレッドの中で実行されるため、実行にかかる時間を短く(<=1ms)しなくてはいけません。さもないとスケジューラの動作が怪しいものになってしまいます。これはスレッド動作の回避技でしかも汚いスケジューラの使い方である上、実装はもっと難解なものになるでしょう。

三番目のオプションは2つの部分のより緩やかな結合を提供し、個別の再起動を可能にします。分散Erlangを使っているため、反対側でも、もう一方のクラッシュを検知できます。

カスタムなHTTPインターフェイスのはErlangライクなノードよりもっと一般的なものであると言えます(Erlangのクライアントが不要なので)が、クラッシュの検知ができなくなります。もし片方が他方がクラッシュしたことを検出したい場合は独自のヘルスチェック機能を作り出す(もしくはサードパーティ製のこの機能のためのコンポーネントを再利用する)必要が出てくるでしょう。

ノードと分離されたサービスは2つの部分がピアであるように見える、つまりもう一方がなくても存在できるような場合ほど適していると言えます。一方でportは外部プログラムがシステム全体のコンテキストの中でのみ意味があって、システムの他のどこか一部が終了したら終了させられるような場合に、より興味深いものになります。

これまで見てきたように様々なオプションがあります。ですから私はErlangは孤立した島ではないとはっきり言えます。Erlang/Elixirに移ることは他の言語でシステムの一部を実装する能力を失うことを意味しません。ですからあなたがどのような理由である特別な機能を動かすためにより適している何か他の言語を使うことにしたとしても、間違いなくErlang/Elixirの利点を他のところで活かしたままそのやり方で進めることができるのです。

  1. この文中のプログラムはiexで動作確認可能です。 2

  2. それを言ったらErlangは元々「文字列」自体「単なるバイトシーケンス」なんですが…。

  3. 試してみましたがまずRuby側で Bad file descriptor (Errno::EBADF)になって動きません。これより前のプログラムも動かないような気が…。

41
41
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
41
41

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?