はじめに
この記事はRuby3で導入される並列・並行処理のための新機能Ractorに興味のある方向けの記事になります。
簡単なRactorを使ったサンプルコードを解説しつつ、理解を深めることができるように書いてみました(ほとんど未来の僕へ向けた記事になっている感じはありますが……)。
また、Ractor自体でどういったことができるのかを調べた結果をまとめた記事でもあります。
そのため後半はRactorで色々遊んだ時のコードをもとに挙動を解説しています。
環境
- Windows10 2004
- WSL2(Ubuntu 18.04)
- ruby 3.0.0dev (2020-09-07T04:29:42Z master 17a27060a7) [x86_64-linux]
Ractorとは?
そもそもRactorについて知らない人もいると思いますので、簡単に紹介します。
RactorとはRuby3で導入される並列・並行処理のための新機能になります。機能自体の提案は数年前からあり、当時はGuildという名前で提案されていました。
しかし、ゲーム業界から「Guildという名前は使っているので、別の名前にしてほしい」という声があり、現在のRactorへと変わりました。
Actorモデルを参考にしているようで、そのためRactor(Ruby’s Actor)という名前に変更されたようです。
Ractorは並行実行の単位であり、それぞれが並行して実行されます。
たとえば以下のコードではputs :helloとputs :helloはそれぞれ並行して実行されます。
Ractor.new do
5.times do
puts :hello
end
end
5.times do
puts :world
end
このコードを実行すると以下のような結果になります。
world
helloworld
hello
world
helloworld
helloworld
hello
このようにそれぞれの処理を並行して実行できます。
またRactorは別のRactorへとオブジェクトを送受信し、同期しながら実行することもできます。同期の方法としてはpush型とpull型の二つがあります。
たとえばpush型の場合は以下のようなコードになります。
r1 = Ractor.new do
:hoge
end
r2 = Ractor.new do
puts :fuga, Ractor.recv
end
r2.send(r1.take)
r2.take
# => :fuga, :hoge
Ractorではsendメソッドを使い、別のRactorへとオブジェクトを送ることができます。上記のコードだと
r2.send(r1.take)
の部分でr2へと送信しています。
送信されたオブジェクトはRactor内でRactor.recvで受け取ることができ
r2 = Ractor.new do
puts :fuga, Ractor.recv #
end
r2.send(r1.take)で送られたオブジェクトを受け取ってputsメソッドに渡すことができます。
またRactorが実行された結果を受け取る際にtakeメソッドを使います。
ですのでr1.takeは:hogeを受け取っています。
つまり、r2.send(r1.take)ではr1の実行結果を受け取り、それをr2へと送信しています。
そして、r2内のputs :fuga, Ractor.recvはputs :fuga, :hogeとなり、fugaとhogeがそれぞれ出力されるということです。
これがpush型でのオブジェクトをやり取りしている流れになります。
対してpull型は以下のようなコードになります。
r1 = Ractor.new 42 do |arg|
Ractor.yield arg
end
r2 = Ractor.new r1 do |r1|
r1.take
end
puts r2.take
Ractor.newで渡した引数は|arg|のようにブロック内で使用できる変数として受け取ることができます。
例えば以下のコードはr1でtakeメソッドが実行されるまで処理を待ちます。
r1 = Ractor.new 42 do |arg|
Ractor.yield arg
end
またRactor.newには別のRactorを渡すことができるため以下のように書くことができます。
r2 = Ractor.new r1 do |r1|
r1.take
end
これでr1が引数として受け取った42をr2の中で受け取ることができます。
最後にputs r2.takeで42を受け取って出力しています。
pull型はこういった流れになります。
ざっくりと解説すると
-
push型:Ractor#send+Ractor.recv -
pull型:Ractor.yield+Ractor#take
という感じです。
より詳細なRactorの解説に関しては下記のリンクを参照していただければと思います。
- A proposal of new concurrency model for Ruby 3
- Guild Prototype
- Ruby向け並列化機構Guildの試作
- Guild → Ractor
- [JA] Ractor report / Koichi Sasada ko1
- https://github.com/ko1/ruby/blob/ractor/ractor.ja.md
- Ractor - Ruby's Actor-like concurrent abstraction
- Ractor: a proposal for a new concurrent abstraction without thread-safety issues
Ractorのコード
Ractorの生成
RactorはRactor.newでブロックに実行したい処理を書きます。
Ractor.new do
# このブロックが並行に実行される
end
このブロック内の処理が並行実行されます。
つまり、以下のようなコードの場合
Ractor.new do
10.times do
puts :hoge
end
end
10.times do
puts :fuga
end
:hogeと:fugaがそれぞれ並行に出力されます。
またRactor.newに実行したい処理をブロックとして渡しますので以下のように書くこともできます。
Ractor.new{
10.times{
puts :hoge
}
}
またキーワード引数nameを使って名前を付けることもでき、Ractor#nameで名前をを受け取ることもできます。
r = Ractor.new name: 'r1' do
puts :hoge
end
p r.name
# => "r1"
これにより、どのRactorで処理が実行されているかを確認することもできそうです。
Ractorへ引数を渡す
Ractor.newに引数を渡すことでブロック内にオブジェクトを渡すことができます。
r = Ractor.new :hoge do |a|
p a
end
r.take
# => :hoge
このように引数を経由してオブジェクトを渡すことができます。
また複数の引数を渡すこともできます
r = Ractor.new :hoge, :fuga do |a, b|
p a
p b
end
r.take
# => fuga
# => hoge
このようにArrayを渡すこともできます。
r = Ractor.new [:hoge, :fuga] do |a|
p a.inspect
end
r.take
# => "[:hoge, :fuga]"
ちなみに、|a|を|a, b|に変更すると
r = Ractor.new [:hoge, :fuga] do |a, b|
p a
p b
end
r.take
# => :hoge
# => :fuga
という出力結果になります。これはa, b = [:hoge, :fuga]と同じ挙動と解釈されているようです。
またHashの場合は
r = Ractor.new({:hoge => 42, :fuga => 21}) do |a|
p a
p a[:hoge]
end
r.take
# => {:hoge=>42, :fuga=>21}
# => 42
と出力されます。
ちなみに、Ractor.newの後に()で括っていないとSyntaxErrorになるので注意が必要です。
r = Ractor.new({:hoge => 42, :fuga => 21}) do |a|
p a
p a[:hoge]
end
r.take
# => SyntaxError
Ractorでの返り値
Ractorでは実行されるブロック内の返り値をtakeメソッドで受け取ることができます。
r = Ractor.new do
:hoge
end
p r.take
# => :hoge
ちなみに、ブロック内でreturnをするとLocalJumpErrorとなるようです。
r = Ractor.new do
return :fuga
:hoge
end
p r.take
# => LocalJumpError
Ractor内での例外
Ractor内での例外は以下のようにして受け取ることができます。
r = Ractor.new do
raise 'error'
end
begin
r.take
rescue Ractor::RemoteError => e
p e.message
end
ちなみに、Ractor内でも例外処理を書くことはできます。
r = Ractor.new name: 'r1' do
begin
raise 'error'
rescue => e
p e.message
end
end
r.take
またドキュメントによるとRactorのブロック内から返ってきた値を受け取る領域で例外をキャッチできるようです。
つまり以下のようなコードも書くことができます。
r1 = Ractor.new do
raise 'error'
end
r2 = Ractor.new r1 do |r1|
begin
r1.take
rescue Ractor::RemoteError => e
p e.message
end
end
r2.take
# => "thrown by remote Ractor."
Ractorでの並行実行
簡単な例
こんな感じでRactorで並行実行を行うことができます。
Ractor.new do
3.times do
puts 42
end
end
3.times do
puts 21
end
実行すると42と21という出力がそれぞればらばらに表示されます。
ちょっとした例
以下のような感じで複数のworkerをRactorで生成し、それをpipeで経由して値を渡し、結果をまとめることができます。
require 'prime'
pipe = Ractor.new do
loop do
Ractor.yield Ractor.recv
end
end
N = 1000
RN = 10
workers = (1..RN).map do
Ractor.new pipe do |pipe|
while n = pipe.take
Ractor.yield [n, n.prime?]
end
end
end
(1..N).each{|i|
pipe << i
}
pp (1..N).map{
r, (n, b) = Ractor.select(*workers)
[n, b]
}.sort_by{|(n, b)| n}
# => 0 ~ 999 までの数値が素数かどうかの結果を出力
このコードでは10個のworkerを生成し、pipeを経由してそれぞれのworkerにオブジェクトを渡しています。
また、受け取ったオブジェクトをRactor.yield [n, n.prime?]で返しています。
こんな感じでworkerを複数作り、pipe経由で処理させたり、結果を受け取ることができます。
よしなにworkerなどを生成して処理させるクラスを書いてみる
先ほどのコードだとworker内の処理が後々で大きくなることもありそうだったので、以下のようにworkerをよしなに生成してくれるクラスを書いてみました。
class Ninsoku
def initialize(task, worker_count: 10)
@task = task
@pipe = create_pipe
@workers = create_workers(worker_count)
end
def send(arg)
@pipe.send arg
end
def run
yield Ractor.select(*@workers)
end
def create_pipe
Ractor.new do
loop do
Ractor.yield Ractor.recv
end
end
end
def create_workers(worker_count)
(1..worker_count).map do
Ractor.new @pipe, @task do |pipe, task|
loop do
arg = pipe.take
task.send arg
Ractor.yield task.take
end
end
end
end
end
Ninsoku.newでpipeとworkerを生成しています。またtaskは処理させたい内容をRactorで渡し、workerで実行させています。
実際に使うケースとしてはこんな感じです。
task = Ractor.new do
func = lambda{|n| n.downcase }
loop do
Ractor.yield func.call(Ractor.recv)
end
end
ninsoku = Ninsoku.new(task)
('A'..'Z').each{|i|
ninsoku.send i
}
('A'..'Z').map{
ninsoku.run{|r, n|
puts n
}
}
# => a ~ z までが並行して出力される
このクラスはあとでgemにでもしてみようかと思います。
gemにしてみました(rubygemsにはpushしていませんが……)
例えば、僕の住んでいる島根県浜田市のAED位置情報をRactorを使って処理してみます。
なお、島根県のAED位置情報は島根県が公開しているオープンデータを使用させていただきました。この場を借りて感謝を申し上げます。
require "rorker"
require "csv"
task = Ractor.new do
func = lambda{|row|
row.map{|value|
if value =~ /浜田市/
row
end
}.compact
}
loop do
Ractor.yield func.call(Ractor.recv)
end
end
rorker = Rorker.new(task)
csv = CSV.read "a.csv"
csv.each do |row|
rorker.send row
end
n = 0
while n < csv.count
rorker.run{|worker, result|
if !result.empty?
puts result
end
}
n += 1
end
こんな感じで読み取ったCSVを一行づつworkerに渡して並行処理で必要なデータをとってくることもできます。
RactorでNumbered Parameterを使う
Ractorでは処理をブロックで渡しますので、Numbered Parameterを使って引数を受け取ることもできます。
r = Ractor.new :hoge do
puts _1
end
r.take
# => hoge
ちなみに、複数の引数でも動作します。
r = Ractor.new :hoge, :hoge do
puts _1
puts _2
end
r.take
# => hoge
# => fuga
複数渡した場合は渡された順番通りに_1から_9に渡されるみたいです。
ちなみに、Hashを渡した場合はこんな感じになります。
r = Ractor.new ({hoge: 1, fuga: 2}) do
_1.map do |key, value|
p ":#{key} => #{value}"
end
end
r.take
# => ":hoge => 1"
# => ":fuga => 2"
=>を使ったHashも同様の結果になりました
r = Ractor.new({:hoge => 1, :fuga => 2}) do
_1.map do |key, value|
p ":#{key} => #{value}"
end
end
r.take
# => ":hoge => 1"
# => ":fuga => 2"
ただし、Arrayの場合は少し挙動が異なります
r = Ractor.new [1, 2, 3] do
puts _1
puts _1.class
puts _2
puts _2.class
puts _3
puts _3.class
end
r.take
# => 1
# => Integer
# => 2
# => Integer
# => 3
# => Integer
どうやら通常通り複数の引数を渡した時のようにArrayの先頭から順番に渡されるようです。
おそらくは以下のように解釈されているのではないかと思います。
_1, _2, _3 = [1, 2, 3]
ちなみに、Numbered Parameterで受け取れる数より大きなArrayを渡すと
r = Ractor.new [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] do
puts _1
puts _2
puts _3
puts _4
puts _5
puts _6
puts _7
puts _8
puts _9
end
r.take
# => 1
# => 2
# => 3
# => 4
# => 5
# => 6
# => 7
# => 8
# => 9
このように取得できるのはNumbered Parameterが受け取れる範囲までのようです
Ractor内でNumbered Parameterを使う場合はHashを引数に渡した際か
r = Ractor.new ({hoge: 1, fuga: 2}) do |hash|
hash.map do
p ":#{_1} => #{_2}"
end
end
r.take
":hoge => 1"
":fuga => 2"
またはいくつかの引数を渡した時に省略して書きたいときに使うことになりそうです
r = Ractor.new :hoge, :fuga do
p _1
p _2
end
r.take
# => :hoge
# => :fuga
おわりに
この記事を読んでRactorについて興味を持って頂ければ幸いです。
今後もRactorを使って試したコードを追加していこうと思います
参考
- A proposal of new concurrency model for Ruby 3
- Guild Prototype
- Ruby向け並列化機構Guildの試作
- Guild → Ractor
- [JA] Ractor report / Koichi Sasada ko1
- https://github.com/ko1/ruby/blob/ractor/ractor.ja.md
- Ractor - Ruby's Actor-like concurrent abstraction
- https://gist.github.com/niku/c19da11edf0b97470af27844b44d12fa
- Ractor: a proposal for a new concurrent abstraction without thread-safety issues