LoginSignup
1
1

More than 5 years have passed since last update.

ruote - implementing participants を訳してみた

Posted at

(2012-04-22) オレオレ訳。
Amazon SWF との連携には、participant を使うのもありかも。


参加者の実装をする

参加者(Participant)はワークアイテムを受取り、ワークアイテムに何かを行う。そして、必要なら、ruote に「更新された」ワークアイテムを返す。

最も単純な参加者の実装は、block participant である。

engine.register_participant :flush_garbage do |workitem|
  GarbageCan.find(workitem.fields['garbage_can_id']).flush
end

これは、下記の参加者の実装(と登録)と等価である:

class Flusher
  include Ruote::LocalParticipant

  def consume(workitem)
    GarbageCan.find(workitem.fields['garbage_can_id']).flush
    reply_to_engine(workitem)
  end

  def cancel(fei, flavour)
    # nothing to do
  end

  #def on_reply(workitem)
  #end
end

engine.register_participant :flush_garbage, Flusher

block participant のブロックは、reply_to_engine(workitem) が追加されているが、この参加者の consume メソッドの複製である。

#consume が参加者の唯一のメソッドではなく、#cancel もある。cancel メソッドは、プロセス·インスタンスまたはそのセグメントがキャンセルされたときに呼び出される。そして、参加者は、そのセグメントのためにワークアイテムを保持する。ワークアイテムは、キャンセル/削除/ロールバックされなければならない。

二つの付加的なメソッドがある: #accept?#on_reply である。accept? メソッドを持つ参加者は、ワークアイテムを扱えない場合はワークアイテムを破棄しても良い。on_reply メソッドも持つ参加者は、通常、実際の(リモートの)参加者のプロキシーである。on_reply メソッドは、実際の参加者から返答(ワークアイテム)が返って来た時に呼ばれる。

デフォルトでは、プロセスインスタンスの実行が参加者に達した時、参加者の #consume メソッドが専用の Ruby スレッドで実行される。もし、#do_not_thread スレッドが false を返すなら、スレッドの生成が行われずに直ちに実行される。(すなわち、ワーカーはブロックされる)

ruote 2.3.0 から、上記の Flusher は、次のように書き換え可能である。

class Flusher
  include Ruote::LocalParticipant

  def on_workitem
    GarbageCan.find(workitem.fields['garbage_can_id']).flush
    reply
  end

  def on_cancel
    # nothing to do
  end

  #def on_reply
  #end
end

engine.register_participant :flush_garbage, Flusher

consumeon_workitem に、cancelon_cancel にリネームされる。これは、参加者の反応性を強調する。

ワークアイテム、フロー式ID(fei)、flavour は、引数として定義可能であるが、定義しなくても暗黙的に渡されることに注意。

Ruote::LocalParticipant

殆どの場合、あなたの参加者は Ruote::LocalParticipant をインクルードすることになる

ruote ストレージへのアクセスを持っているはずなので、'local' と名付けられている。


参加者を実装する場合に実装、オーバーライドすべきメソッドと、Ruote::LocalParticipant からのヘルパーメソッド。

メソッド

  • #consume (#on_workitem)
  • #cancel (#on_cancel)
  • #accept?
  • #on_reply
  • #do_not_thread (#dont_thread?)
  • #re_dispatch
  • #unschedule_re_dispatch
  • #rtimeout

ヘルパー

  • reply_to_engine (reply)
  • fexp
  • workitem
  • applied_workitem
  • participant_name
  • lookup_variable
  • put and get

#consume #on_workitem

参加者がワークアイテムを受け取った時、このメソッドがワークアイテムを引数に呼ばれる。

# ワークアイテムとしてパスされた請求書の合計を計算する
#
class TotalParticipant
  include Ruote::LocalParticipant

  def consume(workitem)
    workitem.fields['total'] =
      workitem.fields['items'].inject(0.0) { |t, item|
        t + item['count'] * PricingService.lookup(item['id'])
      }
    reply_to_engine(workitem)
  end
end

ワークアイテムの処理が終わったら、ワークアイテムを引数にして #reply_to_engine を呼ぶことに注意。このメソッドを呼ばないと、プロセスはこの参加者の後続を再開できない。

ruote 2.3.0 から、上記の参加者は下記のように書ける:

# computing the total for a invoice being passed in the workitem.
#
class TotalParticipant
  include Ruote::LocalParticipant

  def on_workitem
    workitem.fields['total'] =
      workitem.fields['items'].inject(0.0) { |t, item|
        t + item['count'] * PricingService.lookup(item['id'])
      }
    reply
  end
end

#cancel #on_cancel

consume メソッドの中でエンジンに対する返答が迅速に行えない場合、#cancel を実装するべきだ。

require 'fileutils'
require 'yaml'

# 平易な "worklist" 実装, workitems は、ファイルとして worklist/ に
# 置かれる
#
# アプリケーションは yaml ファイルを読み、ワークアイテムの処理が終わったら
# PlainWorklist.reply(workitem_h) を呼び出す
#
class PlainWorklist
  include Ruote::LocalParticipant

  # ruote から呼び出される
  #
  def consume(workitem)
    h = workitem.to_h
    h['fei'] = workitem.fei.sid
    File.open("worklist/workitem_#{workitem.fei.sid}.yaml", wb) do |f|
      f.puts(h.to_yaml)
    end
    # no reply_to_engine(workitem)
  end

  # ruote がプロセスインスタンスのキャンセルをする場合に呼ばれる
  #
  def cancel(fei, flavour)
    FileUtils.rm("worklist/workitem_#{fei.sid}.yaml")
  end

  # 外部のシステムからワークアイテムの処理が終わったら呼ばれる
  #
  def self.reply(workitem_h)
    FileUtils.rm("worklist/workitem_#{workitem_h['fei']}.yaml")
    reply_to_engine(workitem)
  end
end

この consume メソッドは、エンジンに応答しない。キャンセルの要求が来ると、ディスクに格納されたワークアイテムは削除される。

ruote 2.3.0 から、下記のように書ける:

require 'fileutils'
require 'yaml'


# 平易な "worklist" 実装, workitems は、ファイルとして worklist/ に
# 置かれる
#
# アプリケーションは yaml ファイルを読み、ワークアイテムの処理が終わったら
# PlainWorklist.reply(workitem_h) を呼び出す
#
class PlainWorklist
  include Ruote::LocalParticipant

  # ruote から呼び出される
  #
  def on_workitem
    h = workitem.to_h
    h['fei'] = workitem.fei.sid
    File.open("worklist/workitem_#{workitem.fei.sid}.yaml", wb) do |f|
      f.puts(h.to_yaml)
    end
    # no reply_to_engine
  end

  # ruote がプロセスインスタンスのキャンセルをする場合に呼ばれる
  #
  def on_cancel
    FileUtils.rm("worklist/workitem_#{fei.sid}.yaml")
  end

  # 外部のシステムからワークアイテムの処理が終わったら呼ばれる
  #
  def self.reply(workitem_h)
    FileUtils.rm("worklist/workitem_#{workitem_h['fei']}.yaml")
    receive(workitem)
  end
end

ruote 2.3.0 より、#on_cancelfalse を返したら、ruote に対して何も応答しない。これは、#on_workitem それ自身がキャンセルメッセージを扱うシナリオで有用だ。

cancel flavour

この変数は、通常、‘cancel’ がセットされる。キャンセルされる代わりにプロセスがキルされた場合、flavour は “kill” がセットされる。通常は、この変数はシンプルに無視される。しかし、誰が知っていようか。同じコンテキストで異なる振る舞いをするために必要とされるかもしれない。

#accept?

次の参加者が与えられた時、

engine.register_participant 'al.+', FirstParticipantImplementation
engine.register_participant 'al.+', SecondParticipantImplementation

参加者の名前がマッチすると、最初の参加者が使われる。もし、最初の参加者が #accept?(workitem) メソッドを持っており、それが呼び出され false が帰ると、二番目の参加者が試される。

class FirstParticipantImplementation
  include Ruote::LocalParticipant

  # Do something...
  #
  def consume(workitem)
    workitem.fields['message'] = 'kilroy was here'
    reply_to_engine(workitem)
  end

  # この参加者は、コンテクストが'軍事'でない受け付けない
  # 次にマッチした参加者が試される
  #
  def accept?(workitem)
    workitem.fields['context'] == 'military'
  end
end

ruote 2.3.0 からは、accept? メソッドは次のように書ける:

def accept?
  workitem.fields['context'] == 'military'
end

#on_reply

大まかに、参加者には二つのカテゴリーがある。ワークアイテムを受取り、即座に返すものと、応答するのに時間が掛かるものである。

二番目のカテゴリーに属するものに、リモートの参加者にワークアイテムを送信するものがある。ワークアイテムは、通常、リスナーを経由して戻ってくる。

#on_reply(workitem) を実装している参加者は、ワークアイテムが戻ってきたとき、微調整する機会を与えられる。

下記の例では、二つの ruote システムの間に仮想的なワークキュー(WorkQueue)が置かれており、リモートの参加者は "remote" という名前でバインドされている。受信者(Receiver)は、メッセージをポップしエンジンに返す。ワークアイテムが返って来たら、ワーカーは参加者の on_reply メソッドへ渡す。

class MyRemoteParticipant
  #include Ruote::LocalParticipant

  def consume(workitem)
    WorkQueue.push(workitem_to_msg(workitem))
  end

  def on_reply(workitem)
    workitem.fields['msg'] = "back at #{Time.now.to_s}"
  end

  protected

  def workitem_to_msg(workitem)
    # ...
  end
end

class Receiver < Ruote::Receiver

  def initialize(engine)
    super
    Thread.new { listen }
  end

  protected

  def listen

    loop { reply_to_engine(workitem_from_msg(WorkQueue.pop)) }
  end

  def workitem_from_msg(msg)
    # ...
  end
end

engine = Ruote::Engine.new(Ruote::Worker.new(Ruote::HashStorage.new))

engine.register_participant :remote, MyRemoteParticipant

receiver = Receiver.new(engine)

# ...

ruote 2.3.0 からは、on_reply メソッドは下記のように書ける:

def on_reply
  workitem.fields['msg'] = "back at #{Time.now.to_s}"
end

#do_not_thread #dont_thread?

デフォルトでは、ワークアイテムが参加者へディスパッチされた時、ruote ワーカーは、参加者の consume メソッドを新しいスレッドで実行する。このアイデアの目的は、[潜在的に]大量の作業を、迅速にワーカーのループの外に移すことにあります。

これは、常に適切な訳ではない。参加者は、呼ばれたらtrue を返す do_not_thread メソッドを実装することにより、ワーカーに新しいスレッドを生成しないように伝えることが出来る。

class MyParticipant
  include Ruote::LocalParticipant

  # Do something...
  #
  def consume(workitem)
    workitem.fields['message'] = 'kilroy was here'
    reply_to_engine(workitem)
  end

  # consume メソッドは、スレッドの生成に値しないような些細な事を実行する
  #
  def do_not_thread
    true
  end
end

ワークアイテムを引数として受け取るような do_not_thread メソッドでも良い。

def do_not_thread(workitem)
  workitem.fields['colour'] == 'blue'
end

ディスパッチ中のスレッド化に より劇的な影響を与えたい場合は、dispatch_pool をオーバーライドするのが恐らくベストな解決策だ。

#
# この実装は、新しいスレッドに決してディスパッチしない
#
class MyDispatchPool < Ruote::DispatchPool

  def dispatch(msg)

    participant = @context.plist.lookup(
      msg['participant'] || msg['participant_name'], msg['workitem'])

    do_dispatch(participant, msg)
  end
end

# ...

opts = {
  's_dispatch_pool' => [ 'path/to/my_dispatch_pool', 'MyDispatchPool' ]
}

engine = Ruote::Engine.new(Ruote::Worker.new(Ruote::HashStorage.new(opts)))

ruote 2.3.0 からは、dont_thread? メソッドも使えるため、下記のように出来る:

def dont_thread?
  workitem.fields['colour'] == 'blue'
end

#re_dispatch

Use this method to re_dispatch the workitem.

このメソッドは、ワークアイテムを再度、ディスパッチするために使う。

It takes two options :in and :at for “later re_dispatch”. Without one of those options, the method is a “reject”.

このメソッドは、:in:at の二つのオプションを取る。これらのオプションが無いと、メソッドの実行は拒否される。

Here is an example where re_dispatch is used to implement a retry tactic.

この例では、再実行を実装するのに、re_dispatch を使う。

class RetryParticipant
  include Ruote::LocalParticipant

  def initialize(opts)
    @opts = opts
  end

  def consume(workitem)
    begin
      do_the_job
      reply(workitem)
    rescue
      re_dispatch(workitem, :in => @opts['delay'] || '1s')
    end
  end

  def cancel(fei, flavour)
    unschedule_re_dispatch(fei)
  end
end

re_dispatch/reject makes sense too in a multi worker context where a participant on a given host wants to reject a task and let another worker (on another host) do the work. Maybe the accept? method is better for those cases, though.

re_dispatch/reject は、あるホスト上の参加者がタスクを却下し、別のホストのワーカーが作業を行うような、複数ワーカーコンテキストにおいて理に適っている。恐らく、accept? メソッドの使用もこのようなケースでは良い。

ruote 2.3.0 からは、上記の例は下記のように単純化できる:

class RetryParticipant
  include Ruote::LocalParticipant

  def initialize(opts)
    @opts = opts
  end

  def on_workitem
    begin
      do_the_job
      reply
    rescue
      re_dispatch(:in => @opts['delay'] || '1s')
    end
  end

  def on_cancel
    unschedule_re_dispatch
  end
end

#unschedule_re_dispatch

前のセクションの例を参照。

#rtimeout

通常、タイムアウトは、プロセス定義中の式で与えられる。

participant 'alice', :timeout => '2d'

ここでは、アリスは彼女のタスクを終えるのに2日掛かる。(ワークアイテムを返すのに)

しかし、エンジンに登録される参加者のクラスは、自分自身のタイムアウトの値を提供できる。参加者のインスタンスは、#rtimeout メソッドに応答し、意味のあるタイムアウト値を返す。例を挙げる:

class MyParticipant
  # ...
  def rtimeout(workitem)
    workitem.fields['category'] == 'shoes' ? '2w' : '1w'
  end
  # ...
end

ruote 2.3.0 からは、workitem は暗黙的に定義される:

class MyParticipant
  # ...
  def rtimeout
    workitem.fields['category'] == 'shoes' ? '2w' : '1w'
  end
  # ...
end

しかしながら、プロセス定義のタイムアウト(もしあれば)が、参加者で定義されたタイムアウト値よりも優先される。


reply_to_engine reply

しばしば consume メソッドの実装に使われる。

class Total
  include Ruote::LocalParticipant

  def consume(workitem)
    workitem.fields['total'] =
      workitem.fields['items'].inject(0) { |t, (i, c)|
        item = Item.find(i)
        t = t + c * item.price
      }
    reply_to_engine(workitem)
  end
end

engine.register 'total', Total

この後の applied_workitem の例で、このメソッドが、consume メソッドの外側で呼ばれる例を示す。

#reply_to_engine(workitem) メソッドは、#reply(workitem) と短く書ける。

ruote 2.3.0 からは、‘workitem’ を省ける。

class Total
  include Ruote::LocalParticipant

  def on_workitem
    workitem.fields['total'] =
      workitem.fields['items'].inject(0) { |t, (i, c)|
        item = Item.find(i)
        t = t + c * item.price
      }
    reply
  end
end

engine.register 'total', Total

fexp

Via this method, the participant implementation has access to its [participant] expression.

このメソッドを介して、参加者の実装は[参加者の]式にアクセスする。

class PickCustomer
  include Ruote::LocalParticipant

  def consume(workitem)
    customers = fexp(workitem).lookup_variable('customers')
    workitem.fields['customer'] = customers[rand(customers.length)]
    reply_to_engine(workitem)
  end
end

engine.register 'pick_customer', PickCustomer

ruote 2.3.0 からは、fexp の引数は省略できる。

class PickCustomer
  include Ruote::LocalParticipant

  def on_workitem
    customers = fexp.lookup_variable('customers')
    workitem.fields['customer'] = customers[rand(customers.length)]
    reply
  end
end

engine.register 'pick_customer', PickCustomer

workitem

Whereas applied_workitem returns the workitem as was applied (to the participant expression), workitem returns the workitem as was dispatched to this participant [implementation].

applied_workitem が(参加者式が)適用されたワークアイテムが返されるの対して、workitem は参加者[の実装]へディスパッチされたワークアイテムを返す。

The main difference is that a workitem as dispatched contains the ‘params’ field corresponding to the attributes specified in the process definition.

主な違いは、ディスパッチされたワークアイテムが、プロセス定義で定義された属性に対応する ‘params’ 項目を含むことだ。

applied_workitem

特に、参加者がタスクリストとして扱われる時、ワークアイテムは変更されるかもしれないし、すぐに処理されてエンジンに返されるとは限らない。

参加者[式]へ達した時のワークアイテムのコピーを見ることが必要だ。

class TaskList
  include Ruote::LocalParticipant

  def initialize(opts)
    @tasks = connect_to_some_db(opts)
  end

  def consume(workitem)
    @tasks[workitem.fei] = workitem
  end

  def cancel(fei, flavour)
    @tasks.delete(fei)
  end

  def by_user(username)
    @tasks.by(:user => username)
  end

  def update(workitem)
    @tasks[workitem.fei] = workitem
  end

  def original(workitem)
    applied_workitem(workitem)
  end

  def self.reply(workitem)
    if @tasks.delete[workitem.fei]
      reply_to_engine(workitem)
    end
  end
end

# ...

engine.register 'user_.+', TaskList

# ...

tasklist = engine.participant('user_x')
  # returns an instance of our participant, 'user_x' will do since
  # it's registered for any participant starting with 'user_'

workitem = tasklist.by_user('user_fred').first

if workitem.fields != tasklist.original(workitem).fields
  puts "fred's workitem got modified"
end

tasklist.reply(workitem)

Engine#participant(name) の使用は、エンジンを介して参加者のインスタンスをフェッチすることであることに注意。

participant_name

ruote 2.3.0 から、次のような例は、

def consume(workitem)
  (workitem.fields['supervisors'] ||= []) << workitem.participant_name
  reply_to_engine(workitem)
end

次のように短縮できる。

def on_workitem
  (workitem.fields['supervisors'] ||= []) << participant_name
  reply
end

lookup_variable

ruote 2.3.0 から使用できる。

def on_workitem
  workitem.fields['my_var'] = lookup_variable('my_var')
  workitem.fields['my_other_var'] = fexp.lookup_variable('my_other_var')
  reply
end

put and get

ときには、参加者の実装はデータを遮蔽することを必要とする。しかし、参加者は、ワークアイテム自身を遮蔽することは出来ない。putget メソッドが使える。

スレッド の最後に例が載っている。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1