Ruby 3.0 の予定どおりのリリース、おめでとうございます。3.0 では Ruby で並列並行プログラミングを可能にする Ractor という機能が入りました。Ractor については Qiita でも既によい記事があります。が、プログラミング完全に独学素人のわたしには少しレヴェルが高すぎるので、以下はもう少し初歩的な部分を、ただ自分用にメモしたものです。もし他人のお役に立てば幸いです。
なお、Ruby のバージョンは ruby 3.0.0p0 (2020-12-25 revision 95aff21468) [x86_64-linux] です。
#範囲外の参照
Ractor の外のローカル変数を参照することはできません。
i = 0
Ractor.new {
i
}.take
#=><internal:ractor>:267:in `new': can not isolate a Proc because it accesses outer variables (i). (ArgumentError)
ローカル変数を渡したければ、Ractor.new
の引数として渡すことができます。
i = 0
Ractor.new(i) {|j|
puts j #=>0
}.take
ただし、定数は参照できます。
L = 0
Ractor.new {
puts L #=>0
}.take
クラスのインスタンスも渡せます。
class Counter
def initialize
@value = 0
end
attr_reader :value
def inc
@value += 1
end
end
c = Counter.new
r = Ractor.new(c) {|counter|
10.times { counter.inc }
counter.value
}
puts r.take #=>10
#ミュータブル・オブジェクト
なんか Ractor ではミュータブル・オブジェクト(破壊可能オブジェクト)は使えないみたいなイメージがありました。では、Array や String はダメなのだろうか。
ary = [1, 2, "3"]
str = "Ruby"
Ractor.new(ary, str) {|a, s|
p a #=>[1, 2, "3"]
p s #=>"Ruby"
}.take
別にそんなことはないようですね。
破壊的操作も可能なようです。
str = "Ruby"
Ractor.new(str) {|s|
s << "!"
p s #=>"Ruby!"
}.take
以下でも大丈夫みたいです。
str = "Ruby"
p 3.times.map {
Ractor.new(str) {|s|
s << ["!", "?"].sample
s
}
}.map(&:take) #=>["Ruby?", "Ruby!", "Ruby?"]
p str #=>"Ruby"
つまり、new(str)
のstr
は複製されてブロック変数のs
に入るみたいですね。
これは自作クラスのインスタンスでも同様なようです。
c = Counter.new
p 4.times.map {
Ractor.new(c) {|counter|
counter.inc
counter.value
}
}.map(&:take) #=>[1, 1, 1, 1]
p c.value #=>0
配列が[1, 2, 3, 4]
みたいになることはないようです。インスタンスc
が共有されていません。
#クラス
クラスは外で定義されていても参照できます。
class A
def sum(ary)
ary.sum
end
end
p Ractor.new {
a = A.new
a.sum([*1..10])
}.take #=>55
なので、当然トップレベルのメソッドも参照できます。
def sum(ary)
ary.sum
end
p Ractor.new {
sum([*1..10])
}.take #=>55
というか、「当然」と書いたけれど、Ractor 内の self は何なんだ?
Ractor.new {
p self #=>#<Ractor:#2 ractor_sample.rb:1 running>
p self.class #=>Ractor
p self.class.ancestors #=>[Ractor, Object, Kernel, BasicObject]
}.take
Ractor 内で定義されたクラスを、外で使うこともできるみたいです。
Ractor.new {
class A
def rand
Random.rand(10)
end
end
p A.new.rand #=>7
}.take
p A.new.rand #=>1
でもこれ、たまたまうまくいっているだけなのかも…。
#Proc, Object#method
Proc は Ractor に渡せないみたいですね…。残念です。
f = ->(x) { x * 3 }
Ractor.new(f) {|func|
(1..4).map(&func)
}.take
#=><internal:ractor>:267:in `new': allocator undefined for Proc (TypeError)
では、これもダメですよね。
def f(x) = x * 3
Ractor.new(method(:f)) {|func|
(1..4).map(&func)
}.take
#=><internal:ractor>:267:in `new': allocator undefined for Method (TypeError)
あきらめてこうするか…
def f(x) = x * 3
Ractor.new {
p (1..4).map(&method(:f)) #=>[3, 6, 9, 12]
}.take
まあ、Ruby で無理に関数型プログラミングっぽくしなくてもいいのだろうけれど、ちょっと残念。
ちなみに「Endless method definition」(1行def)をここでは使ってみました。
#Pull型通信
Ractor.yield と Ractor#take を使います。Ractor.yield は外部の Ractor#take があるまで待ち、take で値を取り出します。Ractor の返り値は自動的に Ractor.yield されます。余分に take するとエラー。take されない Ractor.yield は、捨てられるのかな?
r = Ractor.new do
Ractor.yield 1
Ractor.yield 2
3
end
p r.take #=>1
sleep(1)
p r.take #=>(1秒後に)2
p r.take #=>3
p r.take #=><internal:ractor>:694:in `take': The outgoing-port is already closed (Ractor::ClosedError)
Ractor.yield より先に take した場合は、yield されるまで待ちます。
r = Ractor.new do
sleep(1)
Ractor.yield 1
end
p r.take #=>(1秒後に)1
つまり、必要がある(Ractor#take される)まで評価されません。なんか「遅延評価」っぽいです。
#Push型通信
Ractor#send で Ractor 内の Queue に突っ込み、Ractor.receive で Queue から取り出します。Queue が空ならばブロックします。
r = Ractor.new do
loop do
p Ractor.receive
end
end
r.send(1)
r.send(2)
sleep(1)
r.send(3)
r.take
これを実行するとまず 1, 2 が出力され、1秒後に 3 が出力されたあと、フリーズします。
内部に Queue があるので、Ractor#send による入力を溜めておけます。
#Push + Pull
並行処理の例
r = Ractor.new do
a = Ractor.receive
#(何か a を使った処理)
#(返り値)
end
#r と do_something を並行して処理
r.send(data)
do_something
result = r.take
r
はこうすれば使い回せるかな。
r = Ractor.new do
loop do
a = Ractor.receive
#(何か a を使った処理)
Ractor.yield result #返り値
end
end
#これは何をやっているのか
1〜1000 の素数判定なのですが…
require 'prime'
N = 1000
RN = 10
pipe = Ractor.new do
loop do
Ractor.yield Ractor.receive
end
end
workers = (1..RN).map do
Ractor.new(pipe) do |pipe|
loop do
n = pipe.take
Ractor.yield [n, n.prime?]
end
end
end
(1..N).each { |i| pipe.send(i) }
pp (1..N).map {
r, (n, b) = Ractor.select(*workers)
[n, b]
}.sort_by { |(n, b)| n }
Ractor には別の Ractor を渡すことができます。
pipe
はpipe.send
されたものを内部の Queue に溜めて、pipe.take
があるまで待って流します。ただここでは先にworker
の定義でpipe.take
されているので、後からpipe.send
したのを待ってpipe.take
することになります。これはどちらが先でもいいので、(1..N).each { |i| pipe.send(i) }
をworker
の定義の前にもっていくこともできます。
worker
は全部でRN
個作られます。つまり、pipe
の 1個+RN
個の並列処理になります。
pipe
にはN
個のデータが流し込まれます。それを適当にRN
個のworker
に振り分けて処理します。
それぞれのworker
は処理が終わったら結果をRactor.yield
して待ちます。それをRactor.select(*workers)
で、出来た順に受け取ります。渡したworker
はloop
でまた繰り返し同じ処理をします。
なんだかすごく「遅延評価」っぽいです。
#最後に
以下がとても参考になりました。
間違いがあったら御指摘下さい。