(2012-04-22) オレオレ訳。
Amazon SWF との連携には、participant を使うのもありかも。
参加者の実装をする
参加者(Participant)はワークアイテムを受取り、ワークアイテムに何かを行う。そして、必要なら、ruote に「更新された」ワークアイテムを返す。
最も単純な参加者の実装は、block participant
である。
engine.register_participant :flush_garbage do |workitem|
GarbageCan.find(workitem.fields['garbage_can_id']).flush
end
これは、下記の参加者の実装(と登録)と等価である:
class Flusher
include Ruote::LocalParticipant
def consume(workitem)
GarbageCan.find(workitem.fields['garbage_can_id']).flush
reply_to_engine(workitem)
end
def cancel(fei, flavour)
# nothing to do
end
#def on_reply(workitem)
#end
end
engine.register_participant :flush_garbage, Flusher
block participant
のブロックは、reply_to_engine(workitem)
が追加されているが、この参加者の consume
メソッドの複製である。
#consume
が参加者の唯一のメソッドではなく、#cancel
もある。cancel
メソッドは、プロセス·インスタンスまたはそのセグメントがキャンセルされたときに呼び出される。そして、参加者は、そのセグメントのためにワークアイテムを保持する。ワークアイテムは、キャンセル/削除/ロールバックされなければならない。
二つの付加的なメソッドがある: #accept?
と #on_reply
である。accept?
メソッドを持つ参加者は、ワークアイテムを扱えない場合はワークアイテムを破棄しても良い。on_reply
メソッドも持つ参加者は、通常、実際の(リモートの)参加者のプロキシーである。on_reply
メソッドは、実際の参加者から返答(ワークアイテム)が返って来た時に呼ばれる。
デフォルトでは、プロセスインスタンスの実行が参加者に達した時、参加者の #consume
メソッドが専用の Ruby スレッドで実行される。もし、#do_not_thread スレッドが false
を返すなら、スレッドの生成が行われずに直ちに実行される。(すなわち、ワーカーはブロックされる)
ruote 2.3.0 から、上記の Flusher は、次のように書き換え可能である。
class Flusher
include Ruote::LocalParticipant
def on_workitem
GarbageCan.find(workitem.fields['garbage_can_id']).flush
reply
end
def on_cancel
# nothing to do
end
#def on_reply
#end
end
engine.register_participant :flush_garbage, Flusher
consume
は on_workitem
に、cancel
は on_cancel
にリネームされる。これは、参加者の反応性を強調する。
ワークアイテム、フロー式ID(fei)、flavour は、引数として定義可能であるが、定義しなくても暗黙的に渡されることに注意。
Ruote::LocalParticipant
殆どの場合、あなたの参加者は Ruote::LocalParticipant
をインクルードすることになる
ruote ストレージへのアクセスを持っているはずなので、'local' と名付けられている。
参加者を実装する場合に実装、オーバーライドすべきメソッドと、Ruote::LocalParticipant
からのヘルパーメソッド。
メソッド
- #consume (#on_workitem)
- #cancel (#on_cancel)
- #accept?
- #on_reply
- #do_not_thread (#dont_thread?)
- #re_dispatch
- #unschedule_re_dispatch
- #rtimeout
ヘルパー
- reply_to_engine (reply)
- fexp
- workitem
- applied_workitem
- participant_name
- lookup_variable
- put and get
#consume #on_workitem
参加者がワークアイテムを受け取った時、このメソッドがワークアイテムを引数に呼ばれる。
# ワークアイテムとしてパスされた請求書の合計を計算する
#
class TotalParticipant
include Ruote::LocalParticipant
def consume(workitem)
workitem.fields['total'] =
workitem.fields['items'].inject(0.0) { |t, item|
t + item['count'] * PricingService.lookup(item['id'])
}
reply_to_engine(workitem)
end
end
ワークアイテムの処理が終わったら、ワークアイテムを引数にして #reply_to_engine
を呼ぶことに注意。このメソッドを呼ばないと、プロセスはこの参加者の後続を再開できない。
ruote 2.3.0 から、上記の参加者は下記のように書ける:
# computing the total for a invoice being passed in the workitem.
#
class TotalParticipant
include Ruote::LocalParticipant
def on_workitem
workitem.fields['total'] =
workitem.fields['items'].inject(0.0) { |t, item|
t + item['count'] * PricingService.lookup(item['id'])
}
reply
end
end
#cancel #on_cancel
consume
メソッドの中でエンジンに対する返答が迅速に行えない場合、#cancel
を実装するべきだ。
require 'fileutils'
require 'yaml'
# 平易な "worklist" 実装, workitems は、ファイルとして worklist/ に
# 置かれる
#
# アプリケーションは yaml ファイルを読み、ワークアイテムの処理が終わったら
# PlainWorklist.reply(workitem_h) を呼び出す
#
class PlainWorklist
include Ruote::LocalParticipant
# ruote から呼び出される
#
def consume(workitem)
h = workitem.to_h
h['fei'] = workitem.fei.sid
File.open("worklist/workitem_#{workitem.fei.sid}.yaml", wb) do |f|
f.puts(h.to_yaml)
end
# no reply_to_engine(workitem)
end
# ruote がプロセスインスタンスのキャンセルをする場合に呼ばれる
#
def cancel(fei, flavour)
FileUtils.rm("worklist/workitem_#{fei.sid}.yaml")
end
# 外部のシステムからワークアイテムの処理が終わったら呼ばれる
#
def self.reply(workitem_h)
FileUtils.rm("worklist/workitem_#{workitem_h['fei']}.yaml")
reply_to_engine(workitem)
end
end
この consume
メソッドは、エンジンに応答しない。キャンセルの要求が来ると、ディスクに格納されたワークアイテムは削除される。
ruote 2.3.0 から、下記のように書ける:
require 'fileutils'
require 'yaml'
# 平易な "worklist" 実装, workitems は、ファイルとして worklist/ に
# 置かれる
#
# アプリケーションは yaml ファイルを読み、ワークアイテムの処理が終わったら
# PlainWorklist.reply(workitem_h) を呼び出す
#
class PlainWorklist
include Ruote::LocalParticipant
# ruote から呼び出される
#
def on_workitem
h = workitem.to_h
h['fei'] = workitem.fei.sid
File.open("worklist/workitem_#{workitem.fei.sid}.yaml", wb) do |f|
f.puts(h.to_yaml)
end
# no reply_to_engine
end
# ruote がプロセスインスタンスのキャンセルをする場合に呼ばれる
#
def on_cancel
FileUtils.rm("worklist/workitem_#{fei.sid}.yaml")
end
# 外部のシステムからワークアイテムの処理が終わったら呼ばれる
#
def self.reply(workitem_h)
FileUtils.rm("worklist/workitem_#{workitem_h['fei']}.yaml")
receive(workitem)
end
end
ruote 2.3.0 より、#on_cancel
が false
を返したら、ruote に対して何も応答しない。これは、#on_workitem
それ自身がキャンセルメッセージを扱うシナリオで有用だ。
cancel flavour
この変数は、通常、‘cancel’ がセットされる。キャンセルされる代わりにプロセスがキルされた場合、flavour
は “kill” がセットされる。通常は、この変数はシンプルに無視される。しかし、誰が知っていようか。同じコンテキストで異なる振る舞いをするために必要とされるかもしれない。
#accept?
次の参加者が与えられた時、
engine.register_participant 'al.+', FirstParticipantImplementation
engine.register_participant 'al.+', SecondParticipantImplementation
参加者の名前がマッチすると、最初の参加者が使われる。もし、最初の参加者が #accept?(workitem)
メソッドを持っており、それが呼び出され false
が帰ると、二番目の参加者が試される。
class FirstParticipantImplementation
include Ruote::LocalParticipant
# Do something...
#
def consume(workitem)
workitem.fields['message'] = 'kilroy was here'
reply_to_engine(workitem)
end
# この参加者は、コンテクストが'軍事'でない受け付けない
# 次にマッチした参加者が試される
#
def accept?(workitem)
workitem.fields['context'] == 'military'
end
end
ruote 2.3.0 からは、accept?
メソッドは次のように書ける:
def accept?
workitem.fields['context'] == 'military'
end
#on_reply
大まかに、参加者には二つのカテゴリーがある。ワークアイテムを受取り、即座に返すものと、応答するのに時間が掛かるものである。
二番目のカテゴリーに属するものに、リモートの参加者にワークアイテムを送信するものがある。ワークアイテムは、通常、リスナーを経由して戻ってくる。
#on_reply(workitem)
を実装している参加者は、ワークアイテムが戻ってきたとき、微調整する機会を与えられる。
下記の例では、二つの ruote システムの間に仮想的なワークキュー(WorkQueue)が置かれており、リモートの参加者は "remote" という名前でバインドされている。受信者(Receiver)は、メッセージをポップしエンジンに返す。ワークアイテムが返って来たら、ワーカーは参加者の on_reply
メソッドへ渡す。
class MyRemoteParticipant
#include Ruote::LocalParticipant
def consume(workitem)
WorkQueue.push(workitem_to_msg(workitem))
end
def on_reply(workitem)
workitem.fields['msg'] = "back at #{Time.now.to_s}"
end
protected
def workitem_to_msg(workitem)
# ...
end
end
class Receiver < Ruote::Receiver
def initialize(engine)
super
Thread.new { listen }
end
protected
def listen
loop { reply_to_engine(workitem_from_msg(WorkQueue.pop)) }
end
def workitem_from_msg(msg)
# ...
end
end
engine = Ruote::Engine.new(Ruote::Worker.new(Ruote::HashStorage.new))
engine.register_participant :remote, MyRemoteParticipant
receiver = Receiver.new(engine)
# ...
ruote 2.3.0 からは、on_reply
メソッドは下記のように書ける:
def on_reply
workitem.fields['msg'] = "back at #{Time.now.to_s}"
end
#do_not_thread #dont_thread?
デフォルトでは、ワークアイテムが参加者へディスパッチされた時、ruote ワーカーは、参加者の consume
メソッドを新しいスレッドで実行する。このアイデアの目的は、[潜在的に]大量の作業を、迅速にワーカーのループの外に移すことにあります。
これは、常に適切な訳ではない。参加者は、呼ばれたらtrue
を返す do_not_thread
メソッドを実装することにより、ワーカーに新しいスレッドを生成しないように伝えることが出来る。
class MyParticipant
include Ruote::LocalParticipant
# Do something...
#
def consume(workitem)
workitem.fields['message'] = 'kilroy was here'
reply_to_engine(workitem)
end
# consume メソッドは、スレッドの生成に値しないような些細な事を実行する
#
def do_not_thread
true
end
end
ワークアイテムを引数として受け取るような do_not_thread
メソッドでも良い。
def do_not_thread(workitem)
workitem.fields['colour'] == 'blue'
end
ディスパッチ中のスレッド化に より劇的な影響を与えたい場合は、dispatch_pool をオーバーライドするのが恐らくベストな解決策だ。
#
# この実装は、新しいスレッドに決してディスパッチしない
#
class MyDispatchPool < Ruote::DispatchPool
def dispatch(msg)
participant = @context.plist.lookup(
msg['participant'] || msg['participant_name'], msg['workitem'])
do_dispatch(participant, msg)
end
end
# ...
opts = {
's_dispatch_pool' => [ 'path/to/my_dispatch_pool', 'MyDispatchPool' ]
}
engine = Ruote::Engine.new(Ruote::Worker.new(Ruote::HashStorage.new(opts)))
ruote 2.3.0 からは、dont_thread?
メソッドも使えるため、下記のように出来る:
def dont_thread?
workitem.fields['colour'] == 'blue'
end
#re_dispatch
Use this method to re_dispatch the workitem.
このメソッドは、ワークアイテムを再度、ディスパッチするために使う。
It takes two options :in and :at for “later re_dispatch”. Without one of those options, the method is a “reject”.
このメソッドは、:in
と :at
の二つのオプションを取る。これらのオプションが無いと、メソッドの実行は拒否される。
Here is an example where re_dispatch is used to implement a retry tactic.
この例では、再実行を実装するのに、re_dispatch
を使う。
class RetryParticipant
include Ruote::LocalParticipant
def initialize(opts)
@opts = opts
end
def consume(workitem)
begin
do_the_job
reply(workitem)
rescue
re_dispatch(workitem, :in => @opts['delay'] || '1s')
end
end
def cancel(fei, flavour)
unschedule_re_dispatch(fei)
end
end
re_dispatch/reject makes sense too in a multi worker context where a participant on a given host wants to reject a task and let another worker (on another host) do the work. Maybe the accept? method is better for those cases, though.
re_dispatch
/reject
は、あるホスト上の参加者がタスクを却下し、別のホストのワーカーが作業を行うような、複数ワーカーコンテキストにおいて理に適っている。恐らく、accept?
メソッドの使用もこのようなケースでは良い。
ruote 2.3.0 からは、上記の例は下記のように単純化できる:
class RetryParticipant
include Ruote::LocalParticipant
def initialize(opts)
@opts = opts
end
def on_workitem
begin
do_the_job
reply
rescue
re_dispatch(:in => @opts['delay'] || '1s')
end
end
def on_cancel
unschedule_re_dispatch
end
end
#unschedule_re_dispatch
前のセクションの例を参照。
#rtimeout
通常、タイムアウトは、プロセス定義中の式で与えられる。
participant 'alice', :timeout => '2d'
ここでは、アリスは彼女のタスクを終えるのに2日掛かる。(ワークアイテムを返すのに)
しかし、エンジンに登録される参加者のクラスは、自分自身のタイムアウトの値を提供できる。参加者のインスタンスは、#rtimeout
メソッドに応答し、意味のあるタイムアウト値を返す。例を挙げる:
class MyParticipant
# ...
def rtimeout(workitem)
workitem.fields['category'] == 'shoes' ? '2w' : '1w'
end
# ...
end
ruote 2.3.0 からは、workitem は暗黙的に定義される:
class MyParticipant
# ...
def rtimeout
workitem.fields['category'] == 'shoes' ? '2w' : '1w'
end
# ...
end
しかしながら、プロセス定義のタイムアウト(もしあれば)が、参加者で定義されたタイムアウト値よりも優先される。
reply_to_engine reply
しばしば consume
メソッドの実装に使われる。
class Total
include Ruote::LocalParticipant
def consume(workitem)
workitem.fields['total'] =
workitem.fields['items'].inject(0) { |t, (i, c)|
item = Item.find(i)
t = t + c * item.price
}
reply_to_engine(workitem)
end
end
engine.register 'total', Total
この後の applied_workitem
の例で、このメソッドが、consume
メソッドの外側で呼ばれる例を示す。
#reply_to_engine(workitem)
メソッドは、#reply(workitem)
と短く書ける。
ruote 2.3.0 からは、‘workitem’ を省ける。
class Total
include Ruote::LocalParticipant
def on_workitem
workitem.fields['total'] =
workitem.fields['items'].inject(0) { |t, (i, c)|
item = Item.find(i)
t = t + c * item.price
}
reply
end
end
engine.register 'total', Total
fexp
Via this method, the participant implementation has access to its [participant] expression.
このメソッドを介して、参加者の実装は[参加者の]式にアクセスする。
class PickCustomer
include Ruote::LocalParticipant
def consume(workitem)
customers = fexp(workitem).lookup_variable('customers')
workitem.fields['customer'] = customers[rand(customers.length)]
reply_to_engine(workitem)
end
end
engine.register 'pick_customer', PickCustomer
ruote 2.3.0 からは、fexp
の引数は省略できる。
class PickCustomer
include Ruote::LocalParticipant
def on_workitem
customers = fexp.lookup_variable('customers')
workitem.fields['customer'] = customers[rand(customers.length)]
reply
end
end
engine.register 'pick_customer', PickCustomer
workitem
Whereas applied_workitem returns the workitem as was applied (to the participant expression), workitem returns the workitem as was dispatched to this participant [implementation].
applied_workitem
が(参加者式が)適用されたワークアイテムが返されるの対して、workitem
は参加者[の実装]へディスパッチされたワークアイテムを返す。
The main difference is that a workitem as dispatched contains the ‘params’ field corresponding to the attributes specified in the process definition.
主な違いは、ディスパッチされたワークアイテムが、プロセス定義で定義された属性に対応する ‘params’ 項目を含むことだ。
applied_workitem
特に、参加者がタスクリストとして扱われる時、ワークアイテムは変更されるかもしれないし、すぐに処理されてエンジンに返されるとは限らない。
参加者[式]へ達した時のワークアイテムのコピーを見ることが必要だ。
class TaskList
include Ruote::LocalParticipant
def initialize(opts)
@tasks = connect_to_some_db(opts)
end
def consume(workitem)
@tasks[workitem.fei] = workitem
end
def cancel(fei, flavour)
@tasks.delete(fei)
end
def by_user(username)
@tasks.by(:user => username)
end
def update(workitem)
@tasks[workitem.fei] = workitem
end
def original(workitem)
applied_workitem(workitem)
end
def self.reply(workitem)
if @tasks.delete[workitem.fei]
reply_to_engine(workitem)
end
end
end
# ...
engine.register 'user_.+', TaskList
# ...
tasklist = engine.participant('user_x')
# returns an instance of our participant, 'user_x' will do since
# it's registered for any participant starting with 'user_'
workitem = tasklist.by_user('user_fred').first
if workitem.fields != tasklist.original(workitem).fields
puts "fred's workitem got modified"
end
tasklist.reply(workitem)
Engine#participant(name)
の使用は、エンジンを介して参加者のインスタンスをフェッチすることであることに注意。
participant_name
ruote 2.3.0 から、次のような例は、
def consume(workitem)
(workitem.fields['supervisors'] ||= []) << workitem.participant_name
reply_to_engine(workitem)
end
次のように短縮できる。
def on_workitem
(workitem.fields['supervisors'] ||= []) << participant_name
reply
end
lookup_variable
ruote 2.3.0 から使用できる。
def on_workitem
workitem.fields['my_var'] = lookup_variable('my_var')
workitem.fields['my_other_var'] = fexp.lookup_variable('my_other_var')
reply
end
put and get
ときには、参加者の実装はデータを遮蔽することを必要とする。しかし、参加者は、ワークアイテム自身を遮蔽することは出来ない。put
と get
メソッドが使える。
スレッド の最後に例が載っている。