Steepコードリーディング(4日目)
Steep::Drivers::Check#run
の Server::Master
の処理を追っていく。
Server::Master
Server::Master#start
前日読んだ部分で作成したtypecheck_workers
毎にThreadを分けてJobをenqueueしていそう。ReceiveMessageJob
が何をするものなのか今はわからないので置いておく。このクラス内で出てくるQueue
やSizedQueue
は、Rubyの組み込みライブラリのクラス。
def start
Steep.logger.tagged "master" do
tags = Steep.logger.formatter.current_tags.dup
# @type var worker_threads: Array[Thread]
worker_threads = []
if interaction_worker
worker_threads << Thread.new do
Steep.logger.formatter.push_tags(*tags, "from-worker@interaction")
interaction_worker.reader.read do |message|
job_queue << ReceiveMessageJob.new(source: interaction_worker, message: message)
end
end
end
typecheck_workers.each do |worker|
worker_threads << Thread.new do
Steep.logger.formatter.push_tags(*tags, "from-worker@#{worker.name}")
worker.reader.read do |message|
job_queue << ReceiveMessageJob.new(source: worker, message: message)
end
end
end
read_client_thread = Thread.new do
reader.read do |message|
job_queue << ReceiveMessageJob.new(source: :client, message: message)
break if message[:method] == "exit"
end
end
write_thread = Thread.new do
Steep.logger.formatter.push_tags(*tags)
Steep.logger.tagged "write" do
while job = write_queue.deq
# @type var job: SendMessageJob
case job.dest
when :client
Steep.logger.info { "Processing SendMessageJob: dest=client, method=#{job.message[:method] || "-"}, id=#{job.message[:id] || "-"}" }
writer.write job.message
when WorkerProcess
Steep.logger.info { "Processing SendMessageJob: dest=#{job.dest.name}, method=#{job.message[:method] || "-"}, id=#{job.message[:id] || "-"}" }
job.dest << job.message
end
end
end
end
loop_thread = Thread.new do
Steep.logger.formatter.push_tags(*tags)
Steep.logger.tagged "main" do
while job = job_queue.deq
case job
when ReceiveMessageJob
src = case job.source
when :client
:client
else
job.source.name
end
Steep.logger.tagged("ReceiveMessageJob(#{src}/#{job.message[:method]}/#{job.message[:id]})") do
if job.response? && result_controller.process_response(job.message)
# nop
Steep.logger.info { "Processed by ResultController" }
else
case job.source
when :client
process_message_from_client(job.message)
if job.message[:method] == "exit"
job_queue.close()
end
when WorkerProcess
process_message_from_worker(job.message, worker: job.source)
end
end
end
when Proc
job.call()
end
end
end
end
waiter = ThreadWaiter.new
each_worker do |worker|
waiter << worker.wait_thread
end
waiter.wait_one()
unless job_queue.closed?
# Exit by error
each_worker do |worker|
worker.kill(force: true)
end
raise "Unexpected worker process exit"
end
write_queue.close()
write_thread.join
read_client_thread.join()
worker_threads.each do |thread|
thread.join
end
loop_thread.join
end
end
ReceiveMessageJob
ReceiveMessageJob = _ = Struct.new(:source, :message, keyword_init: true) do
# @implements ReceiveMessageJob
def response?
message.key?(:id) && !message.key?(:method)
end
include MessageUtils
end
MessageUtils
module MessageUtils
def request?
if method && id
true
else
false
end
end
def response?
if id && !method
true
else
false
end
end
def notification?
if method && !id
true
else
false
end
end
def method
message[:method]
end
def id
message[:id]
end
def result
message[:result]
end
def params
message[:params]
end
end
ResultController
class ResultController
attr_reader :handlers
def initialize()
@handlers = []
end
def <<(handler)
@handlers << handler
end
def request_group()
group = GroupHandler.new()
yield group
group
end
def process_response(message)
handlers.each do |handler|
return true if handler.process_response(message)
end
false
ensure
handlers.reject!(&:completed?)
end
end
$ steep worker --typecheck --name=typecheck@1 --steepfile=Steepfile --log-level=error --max-index=1 --index=1 --delay-shutdown
の結果を受け取ってゴニョゴニョしていそうだけど、コマンドの実行結果含めよくわかっていないのでこのまま追うのは難しそう。