- Ruby 4.0 の Ractor 使ってみた (1) 触ってみるまで
- Ruby 4.0 の Ractor 使ってみた (2) 題材
- Ruby 4.0 の Ractor 使ってみた (3) Ractor::Port に馴染む
- Ruby 4.0 の Ractor 使ってみた (4) Ractor に馴染む
- Ruby 4.0 の Ractor 使ってみた (5) 設計方針
- Ruby 4.0 の Ractor 使ってみた (6) 実装
- Ruby 4.0 の Ractor 使ってみた (7) ベンチマークテスト
RactorFilterProcessor
いよいよ実装だ。
まず,汎用の仕組みである RactorFilterProcessor クラス。独立のファイルとして作っておこう。
class RactorFilterProcessor
def initialize(worker_count, &)
@controller = Ractor::Port.new
@result_port = Ractor::Port.new
@filter = Ractor.shareable_proc(&)
worker_count.times do
Ractor.new @controller, @result_port, @filter do |controller, result_port, filter|
while true
controller << Ractor.current
index, value = Ractor.receive
result_port << [index, filter[value]]
end
end
end
end
def filter(values)
value_count = values.count
values.each_with_index do |value, index|
worker = @controller.receive
worker << [index, value]
end
Array.new(value_count){ @result_port.receive }.sort.map{ _2 }
end
end
以下,解説していく。
分かりにくいところがあったら気軽にコメントで尋ねてくださいね。
initialize
ブロックからフィルターを作る
initialize メソッド中で,最初に「?」となるのは
@filter = Ractor.shareable_proc(&)
だろう。
これは,基本的には RactorFilterProcessor.new に与えられたブロック(=各ワーカーにやらせたい変換処理)を Proc 化して @filter に収めているもの。
ただ,一般の Proc オブジェクトは「共有可能」(shareable)ではない。
「共有可能」というのは Ractor プログラミングにおいて非常に重要な概念だ。
たとえば,freeze されていない配列がもしも複数の Ractor の間で共有されてしまったらどうか。非同期に動くそいつらが配列に変更を加えてしまったら,全く予測不能な事態に陥る。
Ractor はそういうことが起こらないように,「freeze されていない配列」などは「共有不能」としている。
Ractor.shareable_proc は「共有可能な Proc オブジェクト」を作るために Ruby 4.0 で導入されたらしい。
Ractor を作る
次にやっているのは,引数に与えられた worker_count 個の Ractor オブジェクトを作ること。作った Ractor はとくに変数にしまったりしていない。
Ractor を作るところは
Ractor.new @controller, @result_port, @filter do |controller, result_port, filter|
# 云々
end
と書いている。
コントローラー,結果ポート,フィルター Proc の三つを引数に渡している。それをブロックパラメーターで受け取っている(Ractor.new は受け取った引数をそのままブロックパラメーターに渡す)。
「えっ?」と思わないか?
この三つはブロックの中で使うものだが,ブロックはそもそも,その外で定義された変数(インスタンス変数もローカル変数も)にアクセスできるはずでは? こんな回りくどい書き方をしなくても,ブロック内で直接 @controller などとして参照すればよいのでは?
……できないんである。
Ractor.new に与えたブロックの中は別世界なのだ。よくは知らないが,「分離されている(isolated)」というらしい。
分離しないと安全に並列処理なんてできないのだ。
一つの Ractor がオブジェクトを他とやり取りできるのは
- 〈
Ractor.newの引数 → ブロックパラメーター〉という経路 - 何らかのポート経由
のみに制限されているようだ(間違ってたら教えて)。
ブロック内の記述
Ractor.new のブロック内では,ワーカーとして
- 仕事紹介所で申し込み
- 仕事内容を受け取り
- 仕事結果を納品
を永遠に繰り返すので,
while true
# 云々
end
と書いている。
無限ループを書くのに while true/end が良いのか loop do/end がよいのかよく分からない。
前者はスコープを作らないが後者は作る。しかし,いまの場合,この点についてはどちらでもよいと思う。
ブロックの実行はかすかにコストがかかるはずだと思うが,パフォーマンスに影響を及ぼすレベルかは計測していないので分からない。
ともかく,ループの中は
controller << Ractor.current
index, value = Ractor.receive
result_port << [index, filter[value]]
となっている。
Ractor.current は,Ractor.new のブロック中で「その Ractor」を得る手段だ。
つまり,この 1 行目は「この Ractor が自分自身をコントローラーに送る」ことを意味する。
controller はポート(Ractor::Port オブジェクト)であった。
ポートにオブジェクトを送るには << を使う。これは send メソッドのエイリアスなので,send を使ってもよい。
次の行の Ractor.receive は,自身(Ractor)のデフォルトポートからオブジェクトを受け取るもの。
全ての Ractor には,唯一つの「デフォルトポート」があるんだったね。
あとで説明するが,ワーカーのデフォルトポートには [インデックス, 値] の形の配列が渡される。
それを受け取って,ローカル変数 index, value に多重代入している。
3 行目の filter[value] は,Proc オブジェクト filter に value を渡してその処理結果を受け取っている。
それをインデックスとともに結果ポートに送っている。
filter
では,「処理したい値の配列を与えると処理結果の配列を返す」メソッド filter はどうなっているのだろう。
メソッドシグニチャーは
def filter(values)
となっている。
values が「処理したい値の配列」だ。
最初に要素数を value_count に収めている。
これは,「結果ポートからいくつ値をもらってくるか」を知るために必要になる。
その次に
values.each_with_index do |value, index|
worker = @controller.receive
worker << [index, value]
end
とある。
ブロックの中身はもちろん 1 行で書けるが,説明変数として worker を使っている。
@controller.receive は,コントローラー(と名付けられたポート)に送られたワーカー(と名付けられた Ractor オブジェクト)を受け取るものだ。
そのワーカーのデフォルトポートに [index, value] を送っている。
Ractor のデフォルトポートは Ractor#default_port で得られるのだが,わざわざ
worker.default_port << [index, value]
と書かなくても,Ractor オブジェクトに直接 << すればよい。
ポートと同様,Ractor#<< も send のエイリアスなので,send と書いてかまわない。
全ての要素をインデックスと共にワーカーに送り終わったら,こんどは結果ポートから処理結果を取り出す:
Array.new(value_count){ @result_port.receive }.sort.map{ _2 }
sort によって,インデックスの順に並べ替えておき,それが済んだらインデックスは用済みなので,「処理結果の値」だけの配列仕立てている。
使用
では,これを使ってみる。
まず,小さな「テキストの配列」を用意し,動作が正しいかを確認:
require_relative "ractor_filter_processor"
texts = [
"Mississippi",
"Colorado",
"a"
]
def char_histogram(text)
text.chars.tally.sort_by{ -_2 }.to_h
end
processor = RactorFilterProcessor.new(2) do |text|
char_histogram(text)
end
result = processor.filter(texts)
pp result
動作テストなのでワーカー数はいくつでもよいが,仮に 2 としてみた。
結果は
[{"i" => 4, "s" => 4, "p" => 2, "M" => 1},
{"o" => 3, "C" => 1, "l" => 1, "r" => 1, "a" => 1, "d" => 1},
{"a" => 1}]
なので,正しく動作しているようだ。
さてさて,果たしてこのやり方で実行時間は縮まっているのだろうか。
処理の並列化というのは,しばしば期待に反して
- ちょっとしか効果なかった
- かえって時間がかかった
ということが起こる。
ベンチマークテストしてみるまでドキドキだ。
次回(最終回?)はベンチマークテストを実行する。