1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Build your own xxx (Redis編その2 event_loopとparser)

Posted at

Build your own Redis第2回 イベントループを回し、コマンドをパースしようぜ編 🚀

こんにちは!Web DeveloperのJinyangです!
前回の記事「そうだ、Redisを作ろう 🚀」では、クライアントからPINGを送って、PONGと返す簡単なserverをRubyで実装するところまで進めました。

今回は多数のクライアントからの接続に対応していこうと思います。本家のredisではeventloopという概念を用いているので、同じようにやってみましょう。🔥


目次 📚

  1. Event Loopとは?
  2. クラス設計図
  3. ロジックの解説
  4. めでたくCodeCraftesのfirst stageを突破

Event Loopとは? 🤔

Event Loop(イベントループ)は、非同期I/Oを効率的に処理するための基盤技術です。
特にRedisのような高速なデータベースでは、複数のクライアントが同時にアクセスしてもブロックせずに処理できるよう、この仕組みが採用されています。

今回は以下の二つを満たせるようなevent loopを実装してみたいと思います。

  • ソケット(TCPの読み書きイベント)を効率的に監視・処理
  • メインスレッドのみで処理を完結させることでシンプルかつ安全な設計を実現

クラス設計図 📐

以下は、RedisServerのイベントループの設計を表したクラス図です。

  • FileEvent: TCPSocketのラッパークラス。ソケットの読み取り・書き込みイベントに対するコールバックを保持します。

    • set_event_handle: イベントタイプごとのコールバックを登録する。
    • method_missing: TCPSocketのメソッドを透過的に呼び出す。
  • EventLoop: イベントループを管理し、複数のFileEventを監視・処理します。

    • create_file_event: ソケットイベントを登録し、FileEventを管理。
    • process_events: IO.selectでソケットイベントを監視・実行する。
    • main_loop: イベントループを開始し、停止まで処理を繰り返す。
  • Server: クライアント接続を受け付け、EventLoopを使って通信を管理します。

    • start: サーバーを開始し、接続をEventLoopに登録。
    • accept_handler: クライアント接続を受け入れ、読み取りイベントを設定。
  • Command: Redisの基本コマンド(PING, SET, GETなど)を実行します。

    • execute: コマンドと引数を受け取り、適切な結果を返す。
  • Parser: クライアントから送信されたRESP形式のデータを解析します。

    • parse: データを解析してコマンドと引数を返す。

ロジックの解説

プロセスの流れ

ファイルイベント

require 'socket'

# FileEvent クラスは TCPSocket をラップし、
# ソケットに対する特定のイベント(READABLE, WRITABLE)の処理を簡単に管理できるようにする。
class FileEvent < TCPSocket
  # ソケットのインスタンスを取得
  attr_reader :socket
  
  # イベントハンドラ(読み取り・書き込み)とクライアント固有データを保持
  attr_accessor :handler, :client_data

  # FileEvent の初期化
  # - socket: イベントを監視するソケット
  def initialize(socket)
    @socket = socket
    
    # イベントごとのハンドラを格納するハッシュ
    # 初期化時は READABLE と WRITABLE に nil を設定
    @handler = {}
    @handler[EventType::READABLE] = nil
    @handler[EventType::WRITABLE] = nil

    # クライアント固有のデータを保持(任意)
    @client_data = nil
  end

  # メソッドが FileEvent で定義されていない場合、
  # ソケットのメソッドを透過的に呼び出す。
  # 例: FileEvent のインスタンスで `readpartial` を呼び出した場合、
  # 内部の @socket に転送される。
  def method_missing(method, *args, &block)
    if @socket.respond_to?(method)
      @socket.public_send(method, *args, &block)
    else
      super
    end
  end

  # respond_to? メソッドをオーバーライドして、
  # ソケットが対応しているメソッドを動的にサポート
  def respond_to_missing?(method, include_private = false)
    @socket.respond_to?(method) || super
  end

  # 特定のイベント(READABLE, WRITABLE)に対する処理を設定する。
  # - mask: イベントタイプ(例: EventType::READABLE)
  # - proc: イベント発生時に実行するコールバック(Proc オブジェクト)
  def set_event_handle(mask, proc)
    # ハンドラのハッシュにコール

長々と書きましたが、要はソケットとそのソケットが読み取り可能、書き込み可能な時に実行する関数をハッピーセットにしたといった具合です。

EventLoop

下には長々とコードを貼っつけておりますが、main_loopをじゃんじゃん回して、監視対象のソケットに対して、ハンドラーを実行するというのがメインの処理となっています。@firedは間違いなく、必要のない変数ですが、本家のredisリスペクトということで、入れてあります。


class EventLoop
  def initialize
    # ファイルイベントを管理するハッシュ。
    # キー: ファイルディスクリプター番号、値: FileEvent オブジェクト
    @events = {}

    # IO.select で検出されたイベントを保持する配列。
    # 各要素は [ソケット, イベントタイプ] 形式。
    @fired = []

    # イベントループを停止するためのフラグ。
    # true に設定されると、ループが終了します。
    @stop = false

    # ログを記録するための Logger インスタンス。
    @logger = Logger.new
  end

  # 新しいファイルイベントを作成し、@events に登録する。
  # - socket: イベントを監視するソケット
  # - mask: イベントタイプ (READABLE または WRITABLE)
  # - proc: イベント発生時に実行するコールバック
  # - client_data: クライアント固有のデータ (オプション)
  def create_file_event(socket, mask, proc = nil, client_data = nil)
    return false if mask == EventType::NONE

    # 既存のイベントを取得、または新しい FileEvent を作成
    event = @events[socket.fileno] || FileEvent.new(socket)
    event.set_event_handle(mask, proc)
    event.client_data = client_data

    # ファイルディスクリプター番号をキーとして登録
    @events[socket.fileno] = event
    true
  end

  # 指定されたファイルディスクリプターとイベントを削除する。
  # - fd: ファイルディスクリプター番号
  # - mask: 削除対象のイベントタイプ
  def delete_file_event(fd, mask)
    # 対象のイベントを取得
    event = @events[fd]
    return false unless event

    # 指定されたイベントを削除
    event.handler[mask] = nil

    # ハンドラが空の場合、イベント自体を削除
    @events.delete(fd) if event.handler.values.compact.empty?
    true
  end

  # イベントループを開始し、@stop が true になるまで繰り返し実行する。
  def main_loop
    process_events until @stop
  end

  # イベントループを停止する。
  def stop
    @stop = true
  end

  private

  # ファイルイベントを処理する。
  # IO.select を使用してソケットの状態を確認し、対応する処理を実行する。
  def process_events
    # IO.select を使用して監視中のソケットをチェック
    ready_to_read, ready_to_write = IO.select(
      @events.values.map(&:socket), # 読み取り対象
      @events.values.map(&:socket), # 書き込み対象
      nil, # 例外対象 (今回は使用しない)
      10   # タイムアウト (秒)
    )

    # 読み取り可能なソケットを fired に追加
    ready_to_read&.each { |socket| @fired << [socket, EventType::READABLE] }

    # 書き込み可能なソケットを fired に追加
    ready_to_write&.each { |socket| @fired << [socket, EventType::WRITABLE] }

    # 検出されたイベントを順に処理
    @fired.each do |socket, mask|
      file_event = @events[socket.fileno]
      callback = file_event.handler[mask]

      # READABLE イベントの処理
      callback&.call(file_event.client_data) if mask == EventType::READABLE

      # WRITABLE イベントの処理
      callback&.call if mask == EventType::WRITABLE
    end

    # 処理が完了したイベントリストをクリア
    @fired.clear
  end
end

Server

このクラスでは、Redis Serverの初期化などを担当しています。sercer socketがクライアントから接続要求が来た際のハンドラー、クライアントソケットの読み書きハンドラーのクロージャを定義していて、file eventの作成およびそれをevent loopに登録する処理もここに書いています。

# サーバーのメインクラス
# クライアント接続を受け付け、イベントループを通じて通信を管理する。
module RedisServer
  class Server
    # サーバーの初期化
    # - port: 接続ポート
    # - host: 接続ホスト (デフォルト: 127.0.0.1)
    def initialize(port, host: '127.0.0.1')
      @port = port
      @host = host
      @logger = Logger.new
      @el = EventLoop.new
      @clients = {}

      # プロトコルの解析とコマンド実行の管理
      @parser = Parser.new
      @executor = Command.new(@el)
    end

    # サーバーを開始し、接続の監視とイベント処理を行う
    def start
      server = TCPServer.new(@host, @port)
      if @el.create_file_event(server, EventType::READABLE, accept_handler(server))
        @logger.info("Server listening on #{@host}:#{@port}")
        @el.main_loop
      else
        @logger.error("Server failed to listen on #{@host}:#{@port}")
      end
    end

    private

    # クライアント接続を受け入れ、読み取りイベントを設定するハンドラー
    # - socket: サーバーのソケット
    def accept_handler(socket)
      lambda do
        client_socket = socket.accept
        client = Client.new(client_socket)
        @clients[client_socket.fileno] = client
        @logger.info("Accepted connection from #{client_socket.peeraddr.inspect} #{client_socket.fileno.inspect}")
        @el.create_file_event(client_socket, EventType::READABLE, read_handler(client))
      rescue IOError => e
        @logger.error("Error accepting connection: #{e.message}")
      end
    end

    # クライアントからデータを読み取り、コマンドを処理するハンドラー
    # - client: 接続されたクライアント
    def read_handler(client)
      lambda do
        data = client.socket.readpartial(1024)
        if data.empty?
          @logger.info("Client disconnected: #{client.socket.peeraddr.inspect}")
          @el.delete_file_event(client.socket.fileno, EventType::READABLE)
          client.socket.close
          @clients.delete(client.socket.fileno)
        else
          @logger.info("Received data from client: #{client.socket.peeraddr.inspect}")
          client.buffer << data
          @el.create_file_event(client.socket, EventType::WRITABLE, writer_handler(client))
        end
      rescue EOFError, IOError => e
        @logger.error("Error reading from client: #{e.message}")
        @el.delete_file_event(client.socket.fileno, EventType::READABLE)
      end
    end

    # クライアントへの書き込みを処理するハンドラー
    # - client: 接続されたクライアント
    def writer_handler(client)
      lambda do
        client_socket = client.socket
        parsed = @parser.parse(client.buffer)
        client.buffer.clear
        command = parsed[0]
        args = parsed[1..-1]
        client_socket.write(@executor.execute(command, args))
        @el.delete_file_event(client_socket.fileno, EventType::WRITABLE)
      rescue IOError => e
        @logger.error("Error writing to client: #{e.message}")
        @el.delete_file_event(client_socket.fileno, EventType::WRITABLE)
        client.socket.close
      end
    end

    # サーバーの終了処理
    def shutdown
      @clients.each_value { |client| client.socket.close }
      @el.stop
      exit(0)
    end
  end
end

Command

Commandクラスは実際にクライアントから渡ってきて、文字列をパースした結果を受け取り、実行するロジックを書いています。Redisで使われているRESP protocolに関しては、Qiitaに上がっている別記事を参照してみて下さい。とても詳しく書かれております😄

ここで、特徴的?なのは CodeCraftersで出ていた課題で、setコマンドにpxオプションが与えられている(キーに有効期限を持たせる)際の実装です。@time_stamp_storeというインスタンス変数を使い、キーごとの有効期限を管理するようにしました。本家のredisでは、time_eventというスケジューリングされたeventがあり、せっかく
event_loopもどきを実装してみたので、time_eventで一定時間経過後に自動的に@storeから該当するキーを削除するコールバックを実行させようとしましたが、なぜか実行しようとするとフリーズしてしまい動かないんですよね。調査をしている最中です。

# Redisのコマンドを管理・実行するクラス
module RedisServer
  class Command
    # 初期化時にデータストアとタイムスタンプストアを準備
    def initialize(_event_loop)
      @store = {} # キー・バリューのデータを格納
      @time_stamp_store = {} # キーごとの有効期限を管理
    end

    # コマンドを実行して結果を返す
    # - command: コマンド名 (例: "GET", "SET", "PING")
    # - args: コマンドの引数
    def execute(command, args)
      case command.upcase
      when 'PING'
        "+PONG\r\n" # PINGに対する応答
      when 'ECHO'
        "+#{args.join("\r\n")}\r\n" # ECHOに対する応答
      when 'SET'
        set_command(args) # データを設定
      when 'GET'
        get_command(args) # データを取得
      else
        "-ERR unknown command '#{command}'\r\n" # 未実装のコマンド
      end
    end

    private

    # SETコマンドを処理
    # - args: ["key", "value", "PX", "ttl"] の形式
    def set_command(args)
      @store[args[0]] = args[1]
      if args[2]&.upcase == 'PX'
        @time_stamp_store[args[0]] = (Time.now.to_f * 1000).to_i + args[3].to_i
      else
        @time_stamp_store[args[0]] = nil
      end
      "+OK\r\n"
    end

    # GETコマンドを処理
    # - args: ["key"] の形式
    def get_command(args)
      value = @store[args[0]]
      expired_time = @time_stamp_store[args[0]]
      if value.nil? || (!expired_time.nil? && expired_time < (Time.now.to_f * 1000).to_i)
        return "$-1\r\n" # 存在しない場合や期限切れの場合
      end
      "$#{value.length}\r\n#{value}\r\n"
    end
  end
end

Parser

最後に、クライアントから送られてきた文字列をパースするクラスです。RESPは各パーツが\r\n(CRLF)で区切られていることが特徴です。

ここで、簡単な例を一つあげますと、redis-cliを使って、pingを送った際にserver側は以下のような文字列を受け取るが、意味合いとしては、

redis-cli ping

# server側はこんな感じ文字列を受け取る
*1\r\n$4\r\nPING\r\n
  1. *1 -> 要素が一つの配列を受け取ったよ
  2. $4 -> それは長さ4文字の文字列だよ
  3. PING -> PINGという文字列だよ

なので、

*2\r\n$4\r\nPING\r\n$4\r\nPING\r\nのようなコマンドも考えられます。よって、CRLFによって区切られた先頭の文字をくり返し読み込む必要があるので、繰り返して同じような操作を行うのに適したアルゴリズム、つまり、再帰的なロジックが有効と考えました。


# frozen_string_literal: true

# RESP形式のデータを解析し、コマンドと引数に変換するクラス
# RESP (REdis Serialization Protocol) はRedisプロトコルで使用されるデータフォーマットです。
module RedisServer
  class Parser
    # データを解析して、コマンドや引数に変換するメインメソッド
    # - data: クライアントから送信されたRESP形式の文字列
    # 戻り値:
    # - コマンドや引数を表す構造化データ
    def parse(data)
      case data[0] # データの先頭文字で形式を判別
      when '+' then parse_simple_string(data[1..-1]) # 単純文字列
      when '-' then parse_error(data[1..-1])         # エラーメッセージ
      when ':' then parse_integer(data[1..-1])      # 整数
      when '$' then parse_bulk_string(data[1..-1])  # バルク文字列
      when '*' then parse_array(data[1..-1])        # 配列
      else
        raise 'Invalid RESP type' # 未知の形式の場合はエラーをスロー
      end
    end

    private

    # 単純文字列(Simple String)を解析する
    # - data: "+<文字列>\r\n" 形式のデータ
    # 戻り値:
    # - パースされた文字列
    def parse_simple_string(data)
      string, rest_string = data.split("\r\n", 2)
      return string if rest_string.empty? # データが完全であれば文字列を返す

      [string, rest_string] # 残りのデータがある場合は配列で返す
    end

    # エラーメッセージ(Error)を解析する
    # - data: "-<エラーメッセージ>\r\n" 形式のデータ
    # 戻り値:
    # - パースされたエラーメッセージ
    def parse_error(data)
      error, rest_string = data.split("\r\n", 2)
      return error if rest_string.empty? # 完全なエラーを返す

      [error, rest_string] # 残りのデータを含む配列を返す
    end

    # 整数(Integer)を解析する
    # - data: ":<整数>\r\n" 形式のデータ
    # 戻り値:
    # - パースされた整数
    def parse_integer(data)
      num_string, rest_string = data.split("\r\n", 2)
      num_string.to_i if rest_string.empty? # データが完全であれば整数を返す

      [num_string.to_i, rest_string] # 残りのデータを含む配列を返す
    end

    # バルク文字列(Bulk String)を解析する
    # - data: "$<長さ>\r\n<データ>\r\n" 形式のデータ
    # 戻り値:
    # - パースされた文字列、またはnil(NULLバルク文字列の場合)
    def parse_bulk_string(data)
      length, rest_string = data.split("\r\n", 2)
      length = length.to_i

      return nil if length == -1 # NULLバルク文字列の場合

      # 長さ分のデータを返し、残りを含む配列を返す
      return rest_string[0...length] if rest_string[length + 2..].empty?

      [rest_string[0...length], rest_string[length + 2..]]
    end

    # 配列(Array)を解析する
    # - data: "*<要素数>\r\n<要素データ>" 形式のデータ
    # 戻り値:
    # - 配列内のすべての要素を解析した結果
    def parse_array(data)
      elements = []

      # 配列サイズを取得
      split_pos = data.index("\r\n")
      num_elements = data[0..split_pos].to_i
      return nil if num_elements == -1 # NULL配列の場合

      # 残りのデータを繰り返し解析
      rest_data = data[split_pos + 2..-1]
      num_elements.times do
        command, rest_data = parse(rest_data)
        elements << command
      end

      elements
    end
  end
end

めでたくCodeCraftesのfirst stageを突破🎉

さて、ここまでやってようやくfirset stageを突破しました。
ここからはあと4stage 合計40sectionくらい残っているので、まだまだ終わりませんね😅
スクリーンショット 2024-12-03 1.18.43.png

もう眠たいので、ここで一旦おさらばしたいと思います。
無駄な実装も多いですし、rubistの皆様から熱いレビューをお待ちしております。

time_eventはなんで固まったんだろうねえ?もうちょいそこは考えます🛏️

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?