0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

SteepコードリーディングAdvent Calendar 2024

Day 4

Steepコードリーディング(4日目)

Last updated at Posted at 2024-12-08

Steepコードリーディング(4日目)

Steep::Drivers::Check#runServer::Master の処理を追っていく。

Server::Master

Server::Master#start

前日読んだ部分で作成したtypecheck_workers毎にThreadを分けてJobをenqueueしていそう。ReceiveMessageJobが何をするものなのか今はわからないので置いておく。このクラス内で出てくるQueueSizedQueueは、Rubyの組み込みライブラリのクラス。

def start
  Steep.logger.tagged "master" do
    tags = Steep.logger.formatter.current_tags.dup

    # @type var worker_threads: Array[Thread]
    worker_threads = []

    if interaction_worker
      worker_threads << Thread.new do
        Steep.logger.formatter.push_tags(*tags, "from-worker@interaction")
        interaction_worker.reader.read do |message|
          job_queue << ReceiveMessageJob.new(source: interaction_worker, message: message)
        end
      end
    end

    typecheck_workers.each do |worker|
      worker_threads << Thread.new do
        Steep.logger.formatter.push_tags(*tags, "from-worker@#{worker.name}")
        worker.reader.read do |message|
          job_queue << ReceiveMessageJob.new(source: worker, message: message)
        end
      end
    end

    read_client_thread = Thread.new do
      reader.read do |message|
        job_queue << ReceiveMessageJob.new(source: :client, message: message)
        break if message[:method] == "exit"
      end
    end

    write_thread = Thread.new do
      Steep.logger.formatter.push_tags(*tags)
      Steep.logger.tagged "write" do
        while job = write_queue.deq
          # @type var job: SendMessageJob
          case job.dest
          when :client
            Steep.logger.info { "Processing SendMessageJob: dest=client, method=#{job.message[:method] || "-"}, id=#{job.message[:id] || "-"}" }
          writer.write job.message
          when WorkerProcess
            Steep.logger.info { "Processing SendMessageJob: dest=#{job.dest.name}, method=#{job.message[:method] || "-"}, id=#{job.message[:id] || "-"}" }
            job.dest << job.message
          end
        end
      end
    end

    loop_thread = Thread.new do
      Steep.logger.formatter.push_tags(*tags)
      Steep.logger.tagged "main" do
        while job = job_queue.deq
          case job
          when ReceiveMessageJob
            src = case job.source
                  when :client
                    :client
                  else
                    job.source.name
                  end
            Steep.logger.tagged("ReceiveMessageJob(#{src}/#{job.message[:method]}/#{job.message[:id]})") do
              if job.response? && result_controller.process_response(job.message)
                # nop
                Steep.logger.info { "Processed by ResultController" }
              else
                case job.source
                when :client
                  process_message_from_client(job.message)

                  if job.message[:method] == "exit"
                    job_queue.close()
                  end
                when WorkerProcess
                  process_message_from_worker(job.message, worker: job.source)
                end
              end
            end
          when Proc
            job.call()
          end
        end
      end
    end

    waiter = ThreadWaiter.new
    each_worker do |worker|
      waiter << worker.wait_thread
    end
    waiter.wait_one()

    unless job_queue.closed?
      # Exit by error
      each_worker do |worker|
        worker.kill(force: true)
      end
      raise "Unexpected worker process exit"
    end

    write_queue.close()
    write_thread.join

    read_client_thread.join()
    worker_threads.each do |thread|
      thread.join
    end

    loop_thread.join
  end
end

ReceiveMessageJob

ReceiveMessageJob = _ = Struct.new(:source, :message, keyword_init: true) do
  # @implements ReceiveMessageJob

  def response?
    message.key?(:id) && !message.key?(:method)
  end

  include MessageUtils
end

MessageUtils

module MessageUtils
  def request?
    if method && id
      true
    else
      false
    end
  end

  def response?
    if id && !method
      true
    else
      false
    end
  end

  def notification?
    if method && !id
      true
    else
      false
    end
  end

  def method
    message[:method]
  end

  def id
    message[:id]
  end

  def result
    message[:result]
  end

  def params
    message[:params]
  end
end

ResultController

class ResultController
  attr_reader :handlers

  def initialize()
    @handlers = []
  end

  def <<(handler)
    @handlers << handler
  end

  def request_group()
    group = GroupHandler.new()
    yield group
    group
  end

  def process_response(message)
    handlers.each do |handler|
      return true if handler.process_response(message)
    end
    false
  ensure
    handlers.reject!(&:completed?)
  end
end

$ steep worker --typecheck --name=typecheck@1 --steepfile=Steepfile --log-level=error --max-index=1 --index=1 --delay-shutdownの結果を受け取ってゴニョゴニョしていそうだけど、コマンドの実行結果含めよくわかっていないのでこのまま追うのは難しそう。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?