はじめに
この記事はRuby3で導入される並列・並行処理のための新機能Ractor
に興味のある方向けの記事になります。
簡単なRactor
を使ったサンプルコードを解説しつつ、理解を深めることができるように書いてみました(ほとんど未来の僕へ向けた記事になっている感じはありますが……)。
また、Ractor
自体でどういったことができるのかを調べた結果をまとめた記事でもあります。
そのため後半はRactor
で色々遊んだ時のコードをもとに挙動を解説しています。
環境
- Windows10 2004
- WSL2(Ubuntu 18.04)
- ruby 3.0.0dev (2020-09-07T04:29:42Z master 17a27060a7) [x86_64-linux]
Ractorとは?
そもそもRactor
について知らない人もいると思いますので、簡単に紹介します。
Ractor
とはRuby3で導入される並列・並行処理のための新機能になります。機能自体の提案は数年前からあり、当時はGuild
という名前で提案されていました。
しかし、ゲーム業界から「Guildという名前は使っているので、別の名前にしてほしい」という声があり、現在のRactor
へと変わりました。
Actor
モデルを参考にしているようで、そのためRactor(Ruby’s Actor)
という名前に変更されたようです。
Ractor
は並行実行の単位であり、それぞれが並行して実行されます。
たとえば以下のコードではputs :hello
とputs :hello
はそれぞれ並行して実行されます。
Ractor.new do
5.times do
puts :hello
end
end
5.times do
puts :world
end
このコードを実行すると以下のような結果になります。
world
helloworld
hello
world
helloworld
helloworld
hello
このようにそれぞれの処理を並行して実行できます。
またRactor
は別のRactor
へとオブジェクトを送受信し、同期しながら実行することもできます。同期の方法としてはpush型
とpull型
の二つがあります。
たとえばpush型
の場合は以下のようなコードになります。
r1 = Ractor.new do
:hoge
end
r2 = Ractor.new do
puts :fuga, Ractor.recv
end
r2.send(r1.take)
r2.take
# => :fuga, :hoge
Ractor
ではsend
メソッドを使い、別のRactor
へとオブジェクトを送ることができます。上記のコードだと
r2.send(r1.take)
の部分でr2
へと送信しています。
送信されたオブジェクトはRactor
内でRactor.recv
で受け取ることができ
r2 = Ractor.new do
puts :fuga, Ractor.recv #
end
r2.send(r1.take)
で送られたオブジェクトを受け取ってputs
メソッドに渡すことができます。
またRactor
が実行された結果を受け取る際にtake
メソッドを使います。
ですのでr1.take
は:hoge
を受け取っています。
つまり、r2.send(r1.take)
ではr1
の実行結果を受け取り、それをr2
へと送信しています。
そして、r2
内のputs :fuga, Ractor.recv
はputs :fuga, :hoge
となり、fuga
とhoge
がそれぞれ出力されるということです。
これがpush型
でのオブジェクトをやり取りしている流れになります。
対してpull型
は以下のようなコードになります。
r1 = Ractor.new 42 do |arg|
Ractor.yield arg
end
r2 = Ractor.new r1 do |r1|
r1.take
end
puts r2.take
Ractor.new
で渡した引数は|arg|
のようにブロック内で使用できる変数として受け取ることができます。
例えば以下のコードはr1
でtake
メソッドが実行されるまで処理を待ちます。
r1 = Ractor.new 42 do |arg|
Ractor.yield arg
end
またRactor.new
には別のRactor
を渡すことができるため以下のように書くことができます。
r2 = Ractor.new r1 do |r1|
r1.take
end
これでr1
が引数として受け取った42
をr2
の中で受け取ることができます。
最後にputs r2.take
で42
を受け取って出力しています。
pull型
はこういった流れになります。
ざっくりと解説すると
-
push型
:Ractor#send
+Ractor.recv
-
pull型
:Ractor.yield
+Ractor#take
という感じです。
より詳細なRactor
の解説に関しては下記のリンクを参照していただければと思います。
- A proposal of new concurrency model for Ruby 3
- Guild Prototype
- Ruby向け並列化機構Guildの試作
- Guild → Ractor
- [JA] Ractor report / Koichi Sasada ko1
- https://github.com/ko1/ruby/blob/ractor/ractor.ja.md
- Ractor - Ruby's Actor-like concurrent abstraction
- Ractor: a proposal for a new concurrent abstraction without thread-safety issues
Ractorのコード
Ractorの生成
Ractor
はRactor.new
でブロックに実行したい処理を書きます。
Ractor.new do
# このブロックが並行に実行される
end
このブロック内の処理が並行実行されます。
つまり、以下のようなコードの場合
Ractor.new do
10.times do
puts :hoge
end
end
10.times do
puts :fuga
end
:hoge
と:fuga
がそれぞれ並行に出力されます。
またRactor.new
に実行したい処理をブロックとして渡しますので以下のように書くこともできます。
Ractor.new{
10.times{
puts :hoge
}
}
またキーワード引数name
を使って名前を付けることもでき、Ractor#name
で名前をを受け取ることもできます。
r = Ractor.new name: 'r1' do
puts :hoge
end
p r.name
# => "r1"
これにより、どのRactor
で処理が実行されているかを確認することもできそうです。
Ractorへ引数を渡す
Ractor.new
に引数を渡すことでブロック内にオブジェクトを渡すことができます。
r = Ractor.new :hoge do |a|
p a
end
r.take
# => :hoge
このように引数を経由してオブジェクトを渡すことができます。
また複数の引数を渡すこともできます
r = Ractor.new :hoge, :fuga do |a, b|
p a
p b
end
r.take
# => fuga
# => hoge
このようにArray
を渡すこともできます。
r = Ractor.new [:hoge, :fuga] do |a|
p a.inspect
end
r.take
# => "[:hoge, :fuga]"
ちなみに、|a|
を|a, b|
に変更すると
r = Ractor.new [:hoge, :fuga] do |a, b|
p a
p b
end
r.take
# => :hoge
# => :fuga
という出力結果になります。これはa, b = [:hoge, :fuga]
と同じ挙動と解釈されているようです。
またHash
の場合は
r = Ractor.new({:hoge => 42, :fuga => 21}) do |a|
p a
p a[:hoge]
end
r.take
# => {:hoge=>42, :fuga=>21}
# => 42
と出力されます。
ちなみに、Ractor.new
の後に()
で括っていないとSyntaxError
になるので注意が必要です。
r = Ractor.new({:hoge => 42, :fuga => 21}) do |a|
p a
p a[:hoge]
end
r.take
# => SyntaxError
Ractorでの返り値
Ractorでは実行されるブロック内の返り値をtake
メソッドで受け取ることができます。
r = Ractor.new do
:hoge
end
p r.take
# => :hoge
ちなみに、ブロック内でreturn
をするとLocalJumpError
となるようです。
r = Ractor.new do
return :fuga
:hoge
end
p r.take
# => LocalJumpError
Ractor内での例外
Ractor内での例外は以下のようにして受け取ることができます。
r = Ractor.new do
raise 'error'
end
begin
r.take
rescue Ractor::RemoteError => e
p e.message
end
ちなみに、Ractor内でも例外処理を書くことはできます。
r = Ractor.new name: 'r1' do
begin
raise 'error'
rescue => e
p e.message
end
end
r.take
またドキュメントによるとRactorのブロック内から返ってきた値を受け取る領域で例外をキャッチできるようです。
つまり以下のようなコードも書くことができます。
r1 = Ractor.new do
raise 'error'
end
r2 = Ractor.new r1 do |r1|
begin
r1.take
rescue Ractor::RemoteError => e
p e.message
end
end
r2.take
# => "thrown by remote Ractor."
Ractorでの並行実行
簡単な例
こんな感じでRactorで並行実行を行うことができます。
Ractor.new do
3.times do
puts 42
end
end
3.times do
puts 21
end
実行すると42
と21
という出力がそれぞればらばらに表示されます。
ちょっとした例
以下のような感じで複数のworker
をRactor
で生成し、それをpipe
で経由して値を渡し、結果をまとめることができます。
require 'prime'
pipe = Ractor.new do
loop do
Ractor.yield Ractor.recv
end
end
N = 1000
RN = 10
workers = (1..RN).map do
Ractor.new pipe do |pipe|
while n = pipe.take
Ractor.yield [n, n.prime?]
end
end
end
(1..N).each{|i|
pipe << i
}
pp (1..N).map{
r, (n, b) = Ractor.select(*workers)
[n, b]
}.sort_by{|(n, b)| n}
# => 0 ~ 999 までの数値が素数かどうかの結果を出力
このコードでは10個のworker
を生成し、pipe
を経由してそれぞれのworker
にオブジェクトを渡しています。
また、受け取ったオブジェクトをRactor.yield [n, n.prime?]
で返しています。
こんな感じでworker
を複数作り、pipe
経由で処理させたり、結果を受け取ることができます。
よしなにworkerなどを生成して処理させるクラスを書いてみる
先ほどのコードだとworker
内の処理が後々で大きくなることもありそうだったので、以下のようにworker
をよしなに生成してくれるクラスを書いてみました。
class Ninsoku
def initialize(task, worker_count: 10)
@task = task
@pipe = create_pipe
@workers = create_workers(worker_count)
end
def send(arg)
@pipe.send arg
end
def run
yield Ractor.select(*@workers)
end
def create_pipe
Ractor.new do
loop do
Ractor.yield Ractor.recv
end
end
end
def create_workers(worker_count)
(1..worker_count).map do
Ractor.new @pipe, @task do |pipe, task|
loop do
arg = pipe.take
task.send arg
Ractor.yield task.take
end
end
end
end
end
Ninsoku.new
でpipe
とworker
を生成しています。またtask
は処理させたい内容をRactor
で渡し、worker
で実行させています。
実際に使うケースとしてはこんな感じです。
task = Ractor.new do
func = lambda{|n| n.downcase }
loop do
Ractor.yield func.call(Ractor.recv)
end
end
ninsoku = Ninsoku.new(task)
('A'..'Z').each{|i|
ninsoku.send i
}
('A'..'Z').map{
ninsoku.run{|r, n|
puts n
}
}
# => a ~ z までが並行して出力される
このクラスはあとでgem
にでもしてみようかと思います。
gem
にしてみました(rubygemsにはpushしていませんが……)
例えば、僕の住んでいる島根県浜田市のAED位置情報をRactorを使って処理してみます。
なお、島根県のAED位置情報は島根県が公開しているオープンデータを使用させていただきました。この場を借りて感謝を申し上げます。
require "rorker"
require "csv"
task = Ractor.new do
func = lambda{|row|
row.map{|value|
if value =~ /浜田市/
row
end
}.compact
}
loop do
Ractor.yield func.call(Ractor.recv)
end
end
rorker = Rorker.new(task)
csv = CSV.read "a.csv"
csv.each do |row|
rorker.send row
end
n = 0
while n < csv.count
rorker.run{|worker, result|
if !result.empty?
puts result
end
}
n += 1
end
こんな感じで読み取ったCSVを一行づつworkerに渡して並行処理で必要なデータをとってくることもできます。
RactorでNumbered Parameterを使う
Ractor
では処理をブロックで渡しますので、Numbered Parameter
を使って引数を受け取ることもできます。
r = Ractor.new :hoge do
puts _1
end
r.take
# => hoge
ちなみに、複数の引数でも動作します。
r = Ractor.new :hoge, :hoge do
puts _1
puts _2
end
r.take
# => hoge
# => fuga
複数渡した場合は渡された順番通りに_1
から_9
に渡されるみたいです。
ちなみに、Hashを渡した場合はこんな感じになります。
r = Ractor.new ({hoge: 1, fuga: 2}) do
_1.map do |key, value|
p ":#{key} => #{value}"
end
end
r.take
# => ":hoge => 1"
# => ":fuga => 2"
=>
を使ったHashも同様の結果になりました
r = Ractor.new({:hoge => 1, :fuga => 2}) do
_1.map do |key, value|
p ":#{key} => #{value}"
end
end
r.take
# => ":hoge => 1"
# => ":fuga => 2"
ただし、Arrayの場合は少し挙動が異なります
r = Ractor.new [1, 2, 3] do
puts _1
puts _1.class
puts _2
puts _2.class
puts _3
puts _3.class
end
r.take
#=> 1
#=> Integer
#=> 2
#=> Integer
#=> 3
#=> Integer
どうやら通常通り複数の引数を渡した時のようにArrayの先頭から順番に渡されるようです。
おそらくは以下のように解釈されているのではないかと思います。
_1, _2, _3 = [1, 2, 3]
ちなみに、Numbered Parameter
で受け取れる数より大きなArray
を渡すと
r = Ractor.new [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] do
puts _1
puts _2
puts _3
puts _4
puts _5
puts _6
puts _7
puts _8
puts _9
end
r.take
#=> 1
#=> 2
#=> 3
#=> 4
#=> 5
#=> 6
#=> 7
#=> 8
#=> 9
このように取得できるのはNumbered Parameter
が受け取れる範囲までのようです
Ractor
内でNumbered Parameter
を使う場合はHash
を引数に渡した際か
r = Ractor.new ({hoge: 1, fuga: 2}) do |hash|
hash.map do
p ":#{_1} => #{_2}"
end
end
r.take
":hoge => 1"
":fuga => 2"
またはいくつかの引数を渡した時に省略して書きたいときに使うことになりそうです
r = Ractor.new :hoge, :fuga do
p _1
p _2
end
r.take
# => :hoge
# => :fuga
おわりに
この記事を読んでRactor
について興味を持って頂ければ幸いです。
今後もRactor
を使って試したコードを追加していこうと思います
参考
- A proposal of new concurrency model for Ruby 3
- Guild Prototype
- Ruby向け並列化機構Guildの試作
- Guild → Ractor
- [JA] Ractor report / Koichi Sasada ko1
- https://github.com/ko1/ruby/blob/ractor/ractor.ja.md
- Ractor - Ruby's Actor-like concurrent abstraction
- https://gist.github.com/niku/c19da11edf0b97470af27844b44d12fa
- Ractor: a proposal for a new concurrent abstraction without thread-safety issues