search
LoginSignup
2

More than 1 year has passed since last update.

posted at

updated at

Ruby 3 の Ractor がわからない

Ruby 3.0 の予定どおりのリリース、おめでとうございます。3.0 では Ruby で並列並行プログラミングを可能にする Ractor という機能が入りました。Ractor については Qiita でも既によい記事があります。が、プログラミング完全に独学素人のわたしには少しレヴェルが高すぎるので、以下はもう少し初歩的な部分を、ただ自分用にメモしたものです。もし他人のお役に立てば幸いです。

なお、Ruby のバージョンは ruby 3.0.0p0 (2020-12-25 revision 95aff21468) [x86_64-linux] です。

範囲外の参照

Ractor の外のローカル変数を参照することはできません。

i = 0
Ractor.new {
  i
}.take
#=><internal:ractor>:267:in `new': can not isolate a Proc because it accesses outer variables (i). (ArgumentError)

ローカル変数を渡したければ、Ractor.newの引数として渡すことができます。

i = 0
Ractor.new(i) {|j|
  puts j    #=>0
}.take

ただし、定数は参照できます。

L = 0
Ractor.new {
  puts L    #=>0
}.take

クラスのインスタンスも渡せます。

class Counter
  def initialize
    @value = 0
  end
  attr_reader :value

  def inc
    @value += 1
  end
end

c = Counter.new

r = Ractor.new(c) {|counter|
  10.times { counter.inc }
  counter.value
}

puts r.take    #=>10

ミュータブル・オブジェクト

なんか Ractor ではミュータブル・オブジェクト(破壊可能オブジェクト)は使えないみたいなイメージがありました。では、Array や String はダメなのだろうか。

ary = [1, 2, "3"]
str = "Ruby"
Ractor.new(ary, str) {|a, s|
  p a    #=>[1, 2, "3"]
  p s    #=>"Ruby"
}.take

別にそんなことはないようですね。
破壊的操作も可能なようです。

str = "Ruby"
Ractor.new(str) {|s|
  s << "!"
  p s    #=>"Ruby!"
}.take

以下でも大丈夫みたいです。

str = "Ruby"
p 3.times.map {
  Ractor.new(str) {|s|
    s << ["!", "?"].sample
    s
  }
}.map(&:take)    #=>["Ruby?", "Ruby!", "Ruby?"]
p str            #=>"Ruby"

つまり、new(str)strは複製されてブロック変数のsに入るみたいですね。
これは自作クラスのインスタンスでも同様なようです。

c = Counter.new

p 4.times.map {
  Ractor.new(c) {|counter|
    counter.inc
    counter.value
  }
}.map(&:take)    #=>[1, 1, 1, 1]

p c.value    #=>0

配列が[1, 2, 3, 4]みたいになることはないようです。インスタンスcが共有されていません。

クラス

クラスは外で定義されていても参照できます。

class A
  def sum(ary)
    ary.sum
  end
end

p Ractor.new {
  a = A.new
  a.sum([*1..10])
}.take    #=>55

なので、当然トップレベルのメソッドも参照できます。

def sum(ary)
  ary.sum
end

p Ractor.new {
  sum([*1..10])
}.take    #=>55

というか、「当然」と書いたけれど、Ractor 内の self は何なんだ?

Ractor.new {
 p self                    #=>#<Ractor:#2 ractor_sample.rb:1 running>
 p self.class              #=>Ractor
 p self.class.ancestors    #=>[Ractor, Object, Kernel, BasicObject]
}.take

Ractor 内で定義されたクラスを、外で使うこともできるみたいです。

Ractor.new {
  class A
    def rand
      Random.rand(10)
    end
  end

  p A.new.rand    #=>7
}.take

p A.new.rand      #=>1

でもこれ、たまたまうまくいっているだけなのかも…。

Proc, Object#method

Proc は Ractor に渡せないみたいですね…。残念です。

f = ->(x) { x * 3 }
Ractor.new(f) {|func|
  (1..4).map(&func)
}.take
#=><internal:ractor>:267:in `new': allocator undefined for Proc (TypeError)

では、これもダメですよね。

def f(x) = x * 3
Ractor.new(method(:f)) {|func|
  (1..4).map(&func)
}.take
#=><internal:ractor>:267:in `new': allocator undefined for Method (TypeError)

あきらめてこうするか…

def f(x) = x * 3
Ractor.new {
  p (1..4).map(&method(:f))    #=>[3, 6, 9, 12]
}.take

まあ、Ruby で無理に関数型プログラミングっぽくしなくてもいいのだろうけれど、ちょっと残念。
ちなみに「Endless method definition」(1行def)をここでは使ってみました。

Pull型通信

Ractor.yield と Ractor#take を使います。Ractor.yield は外部の Ractor#take があるまで待ち、take で値を取り出します。Ractor の返り値は自動的に Ractor.yield されます。余分に take するとエラー。take されない Ractor.yield は、捨てられるのかな?

r = Ractor.new do
  Ractor.yield 1
  Ractor.yield 2
  3
end

p r.take    #=>1
sleep(1)
p r.take    #=>(1秒後に)2
p r.take    #=>3

p r.take    #=><internal:ractor>:694:in `take': The outgoing-port is already closed (Ractor::ClosedError)

Ractor.yield より先に take した場合は、yield されるまで待ちます。

r = Ractor.new do
  sleep(1)
  Ractor.yield 1
end

p r.take    #=>(1秒後に)1

つまり、必要がある(Ractor#take される)まで評価されません。なんか「遅延評価」っぽいです。

Push型通信

Ractor#send で Ractor 内の Queue に突っ込み、Ractor.receive で Queue から取り出します。Queue が空ならばブロックします。

r = Ractor.new do
  loop do
    p Ractor.receive
  end
end

r.send(1)
r.send(2)
sleep(1)
r.send(3)

r.take

これを実行するとまず 1, 2 が出力され、1秒後に 3 が出力されたあと、フリーズします。
内部に Queue があるので、Ractor#send による入力を溜めておけます。

Push + Pull

並行処理の例

r = Ractor.new do
  a = Ractor.receive
  #(何か a を使った処理)
  #(返り値)
end

#r と do_something を並行して処理
r.send(data)
do_something
result = r.take

rはこうすれば使い回せるかな。

r = Ractor.new do
  loop do
    a = Ractor.receive
    #(何か a を使った処理)
    Ractor.yield result    #返り値
  end
end

これは何をやっているのか

1〜1000 の素数判定なのですが…

require 'prime'

N = 1000
RN = 10

pipe = Ractor.new do
  loop do
    Ractor.yield Ractor.receive
  end
end

workers = (1..RN).map do
  Ractor.new(pipe) do |pipe|
    loop do
      n = pipe.take
      Ractor.yield [n, n.prime?]
    end
  end
end

(1..N).each { |i| pipe.send(i) }

pp (1..N).map {
  r, (n, b) = Ractor.select(*workers)
  [n, b]
}.sort_by { |(n, b)| n }

Ractor には別の Ractor を渡すことができます。

pipepipe.sendされたものを内部の Queue に溜めて、pipe.takeがあるまで待って流します。ただここでは先にworkerの定義でpipe.takeされているので、後からpipe.sendしたのを待ってpipe.takeすることになります。これはどちらが先でもいいので、(1..N).each { |i| pipe.send(i) }workerの定義の前にもっていくこともできます。

workerは全部でRN個作られます。つまり、pipeの 1個+RN個の並列処理になります。

pipeにはN個のデータが流し込まれます。それを適当にRN個のworkerに振り分けて処理します。

それぞれのworkerは処理が終わったら結果をRactor.yieldして待ちます。それをRactor.select(*workers)で、出来た順に受け取ります。渡したworkerloopでまた繰り返し同じ処理をします。

なんだかすごく「遅延評価」っぽいです。

最後に

以下がとても参考になりました。

間違いがあったら御指摘下さい。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
2