Threadとは
スレッド(Thread)とはメモリ空間を共有して同時に実行される制御の流れのことで、
スレッド(Thread)という処理を使えば、複数の処理を並行して実施することが可能
です。
RubyではThreadクラスを使用します。
基本的な使い方
###Threadの作成
Threadの作成には、Thread#fork
もしくはThread#new
などを使う
files = %w(.xinitrc .zshrc .vimrc)
threads = files.map {|file|
Thread.fork {
num = File.readlines(file).length
"#{file}: #{num}"
}
}
# スレッドの終了後に戻り値を返す
threads.map(&:value) #=> [".xinitrc: 58", ".zshrc: 51", ".vimrc: 209"]
# 単にスレッドの終了を待つ
threads.join
Threadの取得
# メインスレッドの取得
Thread.main #=> <Thread:...>
# 現在実行中のスレッドを取得
Thread.current #=> <Thread:...>
# 全てのスレッドの配列を取得
Thread.list #=> [<Thread:...>, #=> <Thread:...>, ...]
Threadでの変数の扱い方
他のスレッドと共有したくない値は、ブロックに引数を渡すことができます。
items = %w(foo bar baz)
Thread.fork(items) do |item|
sleep 1
puts item #=> "foo", "bar", ...
end
# 子スレッドの終了を待つ
(Thread.list - [Thread.current]).each(&:join)
スレッド固有のデータを格納
threads = Thread {
Thread[:status] = 'in progress'
sleep 1
}
# スレッド固有のデータにアクセス
thread[:status] #=> "in progress"
ライフサイクル
スレッドの状態はライフサイクルの中で変化していきます。
t = Thread.fork { sleep 1 }
# 状態を取得
t.status #=> "sleep"
# スレッドが生きているか
t.alive? #=> true
# スレッドが終了or停止しているか
t.stop #=> true
スレッドの状態
状態 | 意味 |
---|---|
run | 実行可能、実行中 |
sleep | 停止 |
aborting | 終了処理中 |
false | 正常終了 |
nil | 異常終了 |
Threadの操作
t = Thread.fork {
# 自スレッドを停止
Thread.stop
# 停止しているスレッドを実行可能にする
Thread.wakeup
puts 'Thread!'
}
# 即座にスレッドの処理を実行
t.run #=> "Thread!"
# スレッドを終了させる
t.kill
Threadの優先順位
優先順位を数値で設定することが出来て、値が大きいほど優先度は高くなります。
current = Thread.current
current.priority #=> 0
current.priority = 3
current.priority #=> 3
ThreadGroup
スレッドを1つのグループにまとめるためにはThreadGroupを利用します。
デフォルトでは、全てのスレッドはThreadGroup::Default
に所属しています。
group = ThreadGroup.new
thread = Thread.fork {
sleep 1
}
# グループにスレッドを追加
group.add(thread)
Mutex
複数のスレッドが同時に1つのデータを参照・更新する処理には適切なはいた処理が必要です。
Mutexを利用することで、相互排他処理を行うためのロックを提供
します。
def countup
File.open('counter', File::RDWR | File::CREAT) do |f|
last_count = f.read.to_i
f.rewind
f.write last_count + 1
end
end
mutex = Mutex.new
10.times.map {
Thread.fork {
# mutexのロックを取得してブロックを実行後に開放する
mutex.synchronize { countup }
}
}.map(&:join)
# 正しく10回カウントアップされる
puts File.read('counter').to_i
thread.rb
標準添付ライブラリであるthread.rbを利用することで、組み込み機能に加えて、キューや状態変数を使えるようになります。
Queue
Queueとは、スレッドセーフなFIFOキューです。
スレッドセーフとは、複数のスレッドから同時にアクセスしてのプログラムが壊れないような性質を持っている
ことです。
require 'thread'
queue = Queue.new
# ワーカースレッドを3つ用意する
workers = 3.times.map {|t|
Thread.fork {
while req = queue.deq # キューが空になると待機する
puts "Worker#{t} processing..."
req.call
end
}
}
# 10個のリクエストをenqueueする
10.times do |t|
# キューを末尾に追加
queue.enq -> {
sleep 1
}
end
# 全てのキューが処理されるまで待機する
sleep until queue.empty?
# キューが空になったので全スレッドがqueueを待っている
p workers.map(&*status) #=> ["sleep", "sleep", "sleep"]
ConditionVariable
ConditionVariableは状態変数を表現する
クラスです。
状態変数とは、排他領域で処理を行っているスレッド同士がお互いに通信し合う手段を提供する
ための仕組みのことです。
require 'thread'
class Bucket
def initialize(limit = 5)
@appendable = ConditionVariable.new
@flushable = ConditionVariable.new
@lock = Mutex.new
@limit = limit
@out = ''
end
def append(str)
@lock.syncronize {
# ロックを一時的に開放してカレンとスレッドを停止
@appendable.wait(@lock) unless appendable?
@out << str
# waitしているスレッドがあれば、そのうち1つを再開させて再開させたスレッドの配列を返す
@flushable.signal if flashable?
}
end
def flush
@lock.syncronize {
# ロックを一時的に開放してカレンとスレッドを停止
@flashable.wait(@lock) unless flasable?
puts '-' * 10
puts @out
@out = ''
# waitしているスレッドがあれば、そのうち1つを再開させて再開させたスレッドの配列を返す
@appendable.signal if appendable?
}
end
private
def appendable?
@out.lines.count < @limit
end
def flasable?
!appendable?
end
end
require_relative 'list11.2'
bucket = Bucket.new
# bucketに結果を書き込む
t1 = Thread.fork {
25.times do |t|
sleep rand
bucket.append("line: #{t}\n")
end
}