42
21

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Rubyのマルチスレッド

Last updated at Posted at 2019-08-10

環境

% ruby -v
ruby 2.3.7p456 (2018-03-28 revision 63024) [x86_64-darwin18]

シングルスレッド

mainスレッド内で順番に実行する。

p Time.now  # => 2019-06-30 12:35:46 +0900
sleep(3)
p Time.now  # => 2019-06-30 12:35:49 +0900

出力結果には3秒のズレが生じる。

Thread#new

スレッドが実行されるのは、Thread#new/fork/startで生成されたり、Thread#run, Thread#wakeupなどが呼ばれたとき。

th = Thread.new do
  p Time.now  # => 2019-06-30 12:36:41 +0900
  sleep(3)
end

p Time.now  # => 2019-06-30 12:36:41 +0900

1つ目のTime.nowとsleepが子スレッドで、2つ目のTime.nowがmainスレッドで、並行的に実行されるので出力結果は同時刻となる。

Thread#join

スレッドは基本的に互いに干渉せず実行されるが、Thread#joinをつかうとレシーバのスレッドが終了するまで待つ。

th = Thread.new do
  p Time.now  # => 2019-06-30 12:50:05 +0900
  sleep(3)
end

th.join
p Time.now  # => 2019-06-30 12:50:08 +0900

子スレッドの終了を待ってから親スレッドのp Time.nowを実行するので、3秒のズレが生じる。

メモリの共有

マルチスレッドではメモリを共有する(<=>マルチプロセスでは原則としてメモリを共有しない)。

hoge = 3

th = Thread.new do
  hoge = 10
end

th.join
p hoge  # => 10

Thread::Queue

積み上がったタスクを各スレッドがワーカーとして処理するために、Thread::Queueというクラスをつかえる。

q = Thread::Queue.new
[*1..100].each {|n| q.push(n)}

threads = [*1..5].map do |i|
  Thread.new do 
    until q.empty?
      n = q.pop
      puts "square of #{n} is #{n**2} (thread #{i})\n"
      sleep(rand(0.01..0.1))
    end
  end
end

threads.each(&:join)

結果

square of 1 is 1 (thread 3)
square of 2 is 4 (thread 4)
square of 3 is 9 (thread 2)
square of 4 is 16 (thread 1)
square of 5 is 25 (thread 5)
square of 6 is 36 (thread 5)
square of 7 is 49 (thread 3)
square of 8 is 64 (thread 1)
...(以下略)
  • 各スレッドが手が空き次第、値をキューから順次1つずつ取り出して処理している様子が確認できる。
  • (どうせメモリを共有しているならArrayをつかってもいいのでは?と思ったけど、Arrayはスレッドセーフではないらしい。)

スレッド固有の変数

基本的にはメモリを共有するけど、「このスレッドだけでつかえる変数」がほしいときもある。

スレッド生成時に宣言する

hoge = 3

th = Thread.new do |hoge|
  hoge = 10
end

th.join
p hoge  # => 3

Thread#newにブロック引数を渡すと、そのスレッド固有の変数になる。

Thread#[] で操作する

th = Thread.new do
  sleep(1)
  p Thread.current[:hoge]  # => 10
end

th[:hoge] = 10

th.join

スレッドセーフティ

  • 先述のとおり、複数スレッド間ではメモリを共有するため、互いに望ましくない影響を及ぼし合う可能性がある。
  • マルチスレッド処理において、各スレッドが互いに影響し合わないような性質をスレッドセーフという。

GVL

現在の実装では Ruby VM は Giant VM lock (GVL) を有しており、同時に実行されるネイティブスレッドは常にひとつです。
ただし、IO 関連のブロックする可能性があるシステムコールを行う場合には GVL を解放します。その場合にはスレッドは同時に実行され得ます。

つまり、IO 関連のブロックする可能性があるシステムコールを...

  • 呼ばない場合
    • GVLがかかる
    • スレッドは常にひとつしか実行されないので、コードによりスレッドセーフティを担保する必要はない
    • 並行性の恩恵を受けえない
      • (そもそも「IO 関連のブロックする可能性があるシステムコールを行わない」場合なので、ボトルネックはCPUであり、GVLがかかっていようがいまいが並行性による恩恵は生じないことが多い)
  • 呼ぶ場合
    • GVLが解放される
    • スレッドは複数同時に実行されるので、コードによりスレッドセーフティを担保する必要がある
    • 並行性の恩恵を受けうる
      • (上述のサンプルコードで並行性がみられたのも、putsとかsleepとかのせいでGVLが解放されていたから)

確認していく。

IO 関連のブロックする可能性があるシステムコールを呼ばない場合、コードによりスレッドセーフティを担保する必要はない

以下のコードは明示的にスレッドセーフティを担保されてはいないが、GVLによって結果的にスレッドセーフティが実現している。

n = nil

threads = (1..5).map do |i|
  Thread.new do
    n = i
    100_000.times {}  # a time consuming line
    safe = n == i
  end
end

p threads.map(&:join).map(&:value)  # => [true, true, true, true, true]

IO 関連のブロックする可能性があるシステムコールを呼ぶ場合、コードによりスレッドセーフティを担保する必要がある

以下はスレッドセーフではない。

n = nil

threads = (1..5).map do |i|
  Thread.new do
    n = i
    puts ''
    safe = n == i
  end
end

p threads.map(&:join).map(&:value)  # => [false, false, false, true, false]

ちなみにn = in == iの間に記述された処理にかかる時間は以下のとおり。
データレースが起こる時間的な間隙はtimesの方がずっと長いことがわかる。
にも関わらずデータレースが発生しているのがputsの方だけなのは「前者ではGVLがかかり/後者ではGVLが解放されたから」と考えられる。1

require 'benchmark'

Benchmark.bm do |x|
  x.report(:times) { 100_000.times {} }
  x.report(:puts) { puts '' }
end
       user     system      total        real
times  0.010000   0.000000   0.010000 (  0.002932)
puts   0.000000   0.000000   0.000000 (  0.000006)

Thread::Mutex

ではどうやってコード上でスレッドセーフティを担保すればよいのか。
そのためのツールとして、Thread::Mutexというクラスが提供されている。

  • Mutexとはmutal exclusion(相互排他)
  • あるスレッドXにおいてMutexオブジェクトがlockされている間は、他のスレッドYではMutexオブジェクトをlockすることができず、MutexオブジェクトがスレッドXでunlockされるまで、スレッドYの実行は停止される。
  • これにより、Mutex#lockとMutex#unlockで囲まれた部分の処理は複数スレッドで並行して行われることがなくなり、スレッドセーフであることが担保される。
mutex = Thread::Mutex.new
n = nil

threads = (1..5).map do |i|
  Thread.new do
    mutex.lock
    n = i
    puts ''
    safe = n == i
    mutex.unlock
    safe
  end
end

p threads.map(&:join).map(&:value)  # => [true, true, true, true, true]
  1. ちなみに10_000_000.timesとかにするとGVLが解放されないはずなのに、データレースが発生した。これはどうやらGVLとは別の、単一のスレッドに余りに長い時間がかかっているとタイマーでスレッドを切り替えるという仕組みによるものみたい。

42
21
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
42
21

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?