目的
理解が足りていないIO、プロセス、ソケット、スレッド
と、Ruby
で使われるFiber
について調べているうちに、チャットプログラムのソースに出会いました。
コードが読めるようになり、解説もできるようになれば
理解が進み、かつ理解の証明にもなるはず。
- 理解するため
- 理解の証明とするため
以上が目的となります。
概要
どなたか知りませんが、今回、誠に勝手ながら、教材として頂きます。
Github
解説開始
プログラムを上から順に読んでいってもわかりづらいので、
実際に処理される順番で解説していきます。
require 'rubygems'
これはRubyにバンドルされているパッケージ管理ツール。
include Socket::Constants
ソケット操作の指定のための定数を定義したモジュール
def initialize
@reading = Array.new
@writing = Array.new
@clients = Hash.new
end
ChatServerインスタンス
を作成時に、インスタンス変数を作成します。
def start
@server_socket = TCPServer.new('localhost', 4242)
@reading.push(@server_socket)
run_acceptor
end
localhost
のポート4242番を利用してTCPServer
を作成し、@server_socket
にTCPServerインスタンス
を格納しています。
そして、作成したTCPServerインスタンス
を@reading
の配列に追加(push
)
ここまではまあいいんですよ。
この後のソケットの入出力待ちとか、Fiber
の挙動とかを把握するのに結構時間がかかりました。
メソッド少ないくせにようわからんかったなあ。
IO
の挙動とFiber
の挙動が分かっていればすんなりいきそう。
ということで、run_acceptor
中身を見ていきましょう。
def run_acceptor
puts "accepting on shared socket (localhost:4242)"
loop do
puts "current clients: #{@clients.length}"
# 何かしらの入出力があるまではここで処理が止まります。
readable, writable = IO.select(@reading, @writing)
# 何かしらの入出力があったとき、配列で返ってくるのでeachしてます
readable.each do |socket|
if socket == @server_socket
# 初めてアクセス要求があった時、socketにはTCPServerクラスのインスタンスが入っている
# なので、ここでクライアントが追加されることになります。
add_client
else
# ① 一旦省略。後ほど解説。
end
end
end
end
最初、@reading
にはTCPServerクラスのインスタンス
だけが入っている。
@writing
は便宜上設定しているだけ。ここで引数設定をしていないと、返り値が崩れる。
具体的には・・・ちょっと待って今考え中だから。
IO.select
は入力or出力or例外の準備ができたもの(TCPServerクラスのインスタンス
か、
もしくはTCPSocketクラスのインスタンス
)を配列にして、配列の配列として返す。
初期起動時(ruby /path/to/fiberchat.rb
をした時)は、TCPServerクラスのインスタンス
が入ります。
クライアント側から接続要求があった時はTCPServerクラスのインスタンス
が入ります。
データ送信がクライアント側から行われた時はTCPSocketクラスのインスタンス
が入ります。
ここで気をつけるのは、何かしらの入出力があるまではここで処理が止まる
ということ。
さて、ここから飛び飛びになっていきます。
まずはadd_client
から
def add_client
socket = @server_socket.accept_nonblock
@reading.push(socket)
@clients[socket] = Fiber.new do |message|
loop {
if message.nil?
# ② 後ほど
else
# ③ 後ほど
end
}
end
puts "client #{socket} connected"
return @clients[socket]
end
socket = @server_socket.accept_nonblock
@reading.push(socket)
accept_nonblock
でソケットをノンブロッキングモード(複数のアクセスを受け付ける)にします。そして、クライアントからの接続要求を受け付け、接続したTCPSocket
のインスタンスを返します。TCPSocket
のインスタンスは@reading
に追加されています。
@clients[socket] = Fiber.new do |message|
...
end
@clients
(ハッシュ)のkey
を
接続したTCPSocketのインスタンス
にして、そのvalue
にはFiberインスタンス
を格納している。
格納時はブロックの中身は実行されません。
Fiber
はThreadより軽くて柔軟なクラス
って感じ。
ここで説明するには長くなるので別途調べて頂きたい!
覚えておいて欲しいのは、
- Fiberクラスのインスタンス
のブロック内でFiber.yield
が行われたら途中で処理が止まるということ。返り値はFiber.yield
の引数か
- 再度呼び出した時に(Fiber#resume
)処理が止まったところから処理が再開されるということ。
add_client
がひとまずここで終了し、
readable.each do |socket| ...
の処理が終わります。
そうしたらまたrun_acceptor
のloop do
の処理が再開します。
つまり、readable, writable = IO.select(@reading, @writing)
にまた返ってくるわけですね。
そして、接続されているクライアントからデータ通信が行われた時にどうなるか。
readable, writable = IO.select(@reading, @writing)
が処理され、
readable.each do |socket|
に移り、
if socket == @server_socket
ではないelse
内の処理(①)が始まります。
client = @clients[socket]
message = client.resume
まずclient
変数に@clients
(ハッシュ)のkey
がsocket
(ここではTCPSocketクラスのインスタンス
)のvalue
であるFiberインスタンス
が格納されます。
④message = client.resume
この時引数は渡されていないので
if message.nil?
内の処理(②)がスタート。
chat = socket.gets
socket.flush
message = Fiber.yield(chat.strip) #⑤
socket.getsで送信されたデータを取得しています。
socket.flushで更新内容を反映しています。
message = Fiber.yield(chat.strip)
ですが、
この時点ではmessage
にFiber.yield(chat.strip)
は格納されていません。
Fiber.yield(chat.strip)
が評価された時点で一旦処理はストップします。(この処理が止まったタイミングを⑤とします)このFiber
クラスのインスタンスのブロックの処理、すなわちclient.resume
の返り値はchat.strip
になります。
処理はストップされ、④に戻ります。
puts "client #{socket} sent: #{message}"
broadcast(message)
message = client.resume
のmessage
には
先ほど処理されたchat.strip
が格納されています。
そのmessage
がbroadcast
メソッドに引数として渡されて処理が実行されます。
def broadcast(message)
@clients.each_pair do |key, value|
puts "invoking client #{key}"
value.resume(message) #⑥
end
end
each_pair
はeach
と同じです。
@clients
にはTCPSocket
クラスのインスタンスをkey
としたハッシュが入っています。そして、value
はFiber
クラスのインスタンスになります。
value.resume(message)
を実行する(この時点を⑥とします。)ことで、引数が渡された状態でFiber
クラスのインスタンスのブロック処理が行われます。
ということで⑤に戻ります。
loop {
if message.nil?
chat = socket.gets
socket.flush
message = Fiber.yield(chat.strip) #⑤
else
socket.puts("chat: #{message.strip}")
socket.flush
message = Fiber.yield
end
}
⑤の処理が再開します。
message
にFiber.yield(chat.strip)
が代入されます。
処理が終了し、またloop
が走ります。
message
には値が入っているのでelse
内の処理が走ります。
socket.puts("chat: #{message.strip}")
ここでソケットを通してクライアントにデータが送られています。
socket.puts
message = Fiber.yield
の時点でまた、代入処理は行われずにストップします。⑥に戻りましょう。
まあ⑥に戻っても特に何もないのですが。
def broadcast(message)
@clients.each_pair do |key, value|
puts "invoking client #{key}"
value.resume(message) #⑥
end
end
broadcast
の処理が終了し、
readable.each do |socket|
の処理も終了し、
またrun_acceptor
内のloop
の処理がスタートします。
これまでの流れを踏襲しつつも、ここだけは一応理解しておくべきなのですが、次にmessage = client.resume
をした時、Fiber
インスタンスのloop
内の処理が走るのですが、始まるのはelse
内のmessage = Fiber.yield
の代入処理からです。
これが終了し、またloop
が走ります。
この時message
には値が入っていないので・・・と、先ほど説明した処理がまた繰り返されます。
んー、Fiber
の処理の把握が難しいですね。
[WIP]細かい情報。Fiber
の返り値とか