Build your own Redis第2回 イベントループを回し、コマンドをパースしようぜ編 🚀
こんにちは!Web DeveloperのJinyangです!
前回の記事「そうだ、Redisを作ろう 🚀」では、クライアントからPINGを送って、PONGと返す簡単なserverをRubyで実装するところまで進めました。
今回は多数のクライアントからの接続に対応していこうと思います。本家のredisではeventloopという概念を用いているので、同じようにやってみましょう。🔥
目次 📚
Event Loopとは? 🤔
Event Loop(イベントループ)は、非同期I/Oを効率的に処理するための基盤技術です。
特にRedisのような高速なデータベースでは、複数のクライアントが同時にアクセスしてもブロックせずに処理できるよう、この仕組みが採用されています。
今回は以下の二つを満たせるようなevent loopを実装してみたいと思います。
- ソケット(TCPの読み書きイベント)を効率的に監視・処理
- メインスレッドのみで処理を完結させることでシンプルかつ安全な設計を実現
クラス設計図 📐
以下は、RedisServerのイベントループの設計を表したクラス図です。
-
FileEvent: TCPSocketのラッパークラス。ソケットの読み取り・書き込みイベントに対するコールバックを保持します。
-
set_event_handle
: イベントタイプごとのコールバックを登録する。 -
method_missing
: TCPSocketのメソッドを透過的に呼び出す。
-
-
EventLoop: イベントループを管理し、複数のFileEventを監視・処理します。
-
create_file_event
: ソケットイベントを登録し、FileEventを管理。 -
process_events
: IO.selectでソケットイベントを監視・実行する。 -
main_loop
: イベントループを開始し、停止まで処理を繰り返す。
-
-
Server: クライアント接続を受け付け、EventLoopを使って通信を管理します。
-
start
: サーバーを開始し、接続をEventLoopに登録。 -
accept_handler
: クライアント接続を受け入れ、読み取りイベントを設定。
-
-
Command: Redisの基本コマンド(PING, SET, GETなど)を実行します。
-
execute
: コマンドと引数を受け取り、適切な結果を返す。
-
-
Parser: クライアントから送信されたRESP形式のデータを解析します。
-
parse
: データを解析してコマンドと引数を返す。
-
ロジックの解説
プロセスの流れ
ファイルイベント
require 'socket'
# FileEvent クラスは TCPSocket をラップし、
# ソケットに対する特定のイベント(READABLE, WRITABLE)の処理を簡単に管理できるようにする。
class FileEvent < TCPSocket
# ソケットのインスタンスを取得
attr_reader :socket
# イベントハンドラ(読み取り・書き込み)とクライアント固有データを保持
attr_accessor :handler, :client_data
# FileEvent の初期化
# - socket: イベントを監視するソケット
def initialize(socket)
@socket = socket
# イベントごとのハンドラを格納するハッシュ
# 初期化時は READABLE と WRITABLE に nil を設定
@handler = {}
@handler[EventType::READABLE] = nil
@handler[EventType::WRITABLE] = nil
# クライアント固有のデータを保持(任意)
@client_data = nil
end
# メソッドが FileEvent で定義されていない場合、
# ソケットのメソッドを透過的に呼び出す。
# 例: FileEvent のインスタンスで `readpartial` を呼び出した場合、
# 内部の @socket に転送される。
def method_missing(method, *args, &block)
if @socket.respond_to?(method)
@socket.public_send(method, *args, &block)
else
super
end
end
# respond_to? メソッドをオーバーライドして、
# ソケットが対応しているメソッドを動的にサポート
def respond_to_missing?(method, include_private = false)
@socket.respond_to?(method) || super
end
# 特定のイベント(READABLE, WRITABLE)に対する処理を設定する。
# - mask: イベントタイプ(例: EventType::READABLE)
# - proc: イベント発生時に実行するコールバック(Proc オブジェクト)
def set_event_handle(mask, proc)
# ハンドラのハッシュにコール
長々と書きましたが、要はソケットとそのソケットが読み取り可能、書き込み可能な時に実行する関数をハッピーセットにしたといった具合です。
EventLoop
下には長々とコードを貼っつけておりますが、main_loopをじゃんじゃん回して、監視対象のソケットに対して、ハンドラーを実行するというのがメインの処理となっています。@fired
は間違いなく、必要のない変数ですが、本家のredisリスペクトということで、入れてあります。
class EventLoop
def initialize
# ファイルイベントを管理するハッシュ。
# キー: ファイルディスクリプター番号、値: FileEvent オブジェクト
@events = {}
# IO.select で検出されたイベントを保持する配列。
# 各要素は [ソケット, イベントタイプ] 形式。
@fired = []
# イベントループを停止するためのフラグ。
# true に設定されると、ループが終了します。
@stop = false
# ログを記録するための Logger インスタンス。
@logger = Logger.new
end
# 新しいファイルイベントを作成し、@events に登録する。
# - socket: イベントを監視するソケット
# - mask: イベントタイプ (READABLE または WRITABLE)
# - proc: イベント発生時に実行するコールバック
# - client_data: クライアント固有のデータ (オプション)
def create_file_event(socket, mask, proc = nil, client_data = nil)
return false if mask == EventType::NONE
# 既存のイベントを取得、または新しい FileEvent を作成
event = @events[socket.fileno] || FileEvent.new(socket)
event.set_event_handle(mask, proc)
event.client_data = client_data
# ファイルディスクリプター番号をキーとして登録
@events[socket.fileno] = event
true
end
# 指定されたファイルディスクリプターとイベントを削除する。
# - fd: ファイルディスクリプター番号
# - mask: 削除対象のイベントタイプ
def delete_file_event(fd, mask)
# 対象のイベントを取得
event = @events[fd]
return false unless event
# 指定されたイベントを削除
event.handler[mask] = nil
# ハンドラが空の場合、イベント自体を削除
@events.delete(fd) if event.handler.values.compact.empty?
true
end
# イベントループを開始し、@stop が true になるまで繰り返し実行する。
def main_loop
process_events until @stop
end
# イベントループを停止する。
def stop
@stop = true
end
private
# ファイルイベントを処理する。
# IO.select を使用してソケットの状態を確認し、対応する処理を実行する。
def process_events
# IO.select を使用して監視中のソケットをチェック
ready_to_read, ready_to_write = IO.select(
@events.values.map(&:socket), # 読み取り対象
@events.values.map(&:socket), # 書き込み対象
nil, # 例外対象 (今回は使用しない)
10 # タイムアウト (秒)
)
# 読み取り可能なソケットを fired に追加
ready_to_read&.each { |socket| @fired << [socket, EventType::READABLE] }
# 書き込み可能なソケットを fired に追加
ready_to_write&.each { |socket| @fired << [socket, EventType::WRITABLE] }
# 検出されたイベントを順に処理
@fired.each do |socket, mask|
file_event = @events[socket.fileno]
callback = file_event.handler[mask]
# READABLE イベントの処理
callback&.call(file_event.client_data) if mask == EventType::READABLE
# WRITABLE イベントの処理
callback&.call if mask == EventType::WRITABLE
end
# 処理が完了したイベントリストをクリア
@fired.clear
end
end
Server
このクラスでは、Redis Serverの初期化などを担当しています。sercer socketがクライアントから接続要求が来た際のハンドラー、クライアントソケットの読み書きハンドラーのクロージャを定義していて、file eventの作成およびそれをevent loopに登録する処理もここに書いています。
# サーバーのメインクラス
# クライアント接続を受け付け、イベントループを通じて通信を管理する。
module RedisServer
class Server
# サーバーの初期化
# - port: 接続ポート
# - host: 接続ホスト (デフォルト: 127.0.0.1)
def initialize(port, host: '127.0.0.1')
@port = port
@host = host
@logger = Logger.new
@el = EventLoop.new
@clients = {}
# プロトコルの解析とコマンド実行の管理
@parser = Parser.new
@executor = Command.new(@el)
end
# サーバーを開始し、接続の監視とイベント処理を行う
def start
server = TCPServer.new(@host, @port)
if @el.create_file_event(server, EventType::READABLE, accept_handler(server))
@logger.info("Server listening on #{@host}:#{@port}")
@el.main_loop
else
@logger.error("Server failed to listen on #{@host}:#{@port}")
end
end
private
# クライアント接続を受け入れ、読み取りイベントを設定するハンドラー
# - socket: サーバーのソケット
def accept_handler(socket)
lambda do
client_socket = socket.accept
client = Client.new(client_socket)
@clients[client_socket.fileno] = client
@logger.info("Accepted connection from #{client_socket.peeraddr.inspect} #{client_socket.fileno.inspect}")
@el.create_file_event(client_socket, EventType::READABLE, read_handler(client))
rescue IOError => e
@logger.error("Error accepting connection: #{e.message}")
end
end
# クライアントからデータを読み取り、コマンドを処理するハンドラー
# - client: 接続されたクライアント
def read_handler(client)
lambda do
data = client.socket.readpartial(1024)
if data.empty?
@logger.info("Client disconnected: #{client.socket.peeraddr.inspect}")
@el.delete_file_event(client.socket.fileno, EventType::READABLE)
client.socket.close
@clients.delete(client.socket.fileno)
else
@logger.info("Received data from client: #{client.socket.peeraddr.inspect}")
client.buffer << data
@el.create_file_event(client.socket, EventType::WRITABLE, writer_handler(client))
end
rescue EOFError, IOError => e
@logger.error("Error reading from client: #{e.message}")
@el.delete_file_event(client.socket.fileno, EventType::READABLE)
end
end
# クライアントへの書き込みを処理するハンドラー
# - client: 接続されたクライアント
def writer_handler(client)
lambda do
client_socket = client.socket
parsed = @parser.parse(client.buffer)
client.buffer.clear
command = parsed[0]
args = parsed[1..-1]
client_socket.write(@executor.execute(command, args))
@el.delete_file_event(client_socket.fileno, EventType::WRITABLE)
rescue IOError => e
@logger.error("Error writing to client: #{e.message}")
@el.delete_file_event(client_socket.fileno, EventType::WRITABLE)
client.socket.close
end
end
# サーバーの終了処理
def shutdown
@clients.each_value { |client| client.socket.close }
@el.stop
exit(0)
end
end
end
Command
Command
クラスは実際にクライアントから渡ってきて、文字列をパースした結果を受け取り、実行するロジックを書いています。Redisで使われているRESP protocolに関しては、Qiitaに上がっている別記事を参照してみて下さい。とても詳しく書かれております😄
ここで、特徴的?なのは CodeCraftersで出ていた課題で、set
コマンドにpx
オプションが与えられている(キーに有効期限を持たせる)際の実装です。@time_stamp_store
というインスタンス変数を使い、キーごとの有効期限を管理するようにしました。本家のredisでは、time_eventというスケジューリングされたeventがあり、せっかく
event_loopもどきを実装してみたので、time_eventで一定時間経過後に自動的に@store
から該当するキーを削除するコールバックを実行させようとしましたが、なぜか実行しようとするとフリーズしてしまい動かないんですよね。調査をしている最中です。
# Redisのコマンドを管理・実行するクラス
module RedisServer
class Command
# 初期化時にデータストアとタイムスタンプストアを準備
def initialize(_event_loop)
@store = {} # キー・バリューのデータを格納
@time_stamp_store = {} # キーごとの有効期限を管理
end
# コマンドを実行して結果を返す
# - command: コマンド名 (例: "GET", "SET", "PING")
# - args: コマンドの引数
def execute(command, args)
case command.upcase
when 'PING'
"+PONG\r\n" # PINGに対する応答
when 'ECHO'
"+#{args.join("\r\n")}\r\n" # ECHOに対する応答
when 'SET'
set_command(args) # データを設定
when 'GET'
get_command(args) # データを取得
else
"-ERR unknown command '#{command}'\r\n" # 未実装のコマンド
end
end
private
# SETコマンドを処理
# - args: ["key", "value", "PX", "ttl"] の形式
def set_command(args)
@store[args[0]] = args[1]
if args[2]&.upcase == 'PX'
@time_stamp_store[args[0]] = (Time.now.to_f * 1000).to_i + args[3].to_i
else
@time_stamp_store[args[0]] = nil
end
"+OK\r\n"
end
# GETコマンドを処理
# - args: ["key"] の形式
def get_command(args)
value = @store[args[0]]
expired_time = @time_stamp_store[args[0]]
if value.nil? || (!expired_time.nil? && expired_time < (Time.now.to_f * 1000).to_i)
return "$-1\r\n" # 存在しない場合や期限切れの場合
end
"$#{value.length}\r\n#{value}\r\n"
end
end
end
Parser
最後に、クライアントから送られてきた文字列をパースするクラスです。RESPは各パーツが\r\n
(CRLF)で区切られていることが特徴です。
ここで、簡単な例を一つあげますと、redis-cliを使って、ping
を送った際にserver側は以下のような文字列を受け取るが、意味合いとしては、
redis-cli ping
# server側はこんな感じ文字列を受け取る
*1\r\n$4\r\nPING\r\n
-
*1
-> 要素が一つの配列を受け取ったよ - $4 -> それは長さ4文字の文字列だよ
- PING -> PINGという文字列だよ
なので、
*2\r\n$4\r\nPING\r\n$4\r\nPING\r\n
のようなコマンドも考えられます。よって、CRLFによって区切られた先頭の文字をくり返し読み込む必要があるので、繰り返して同じような操作を行うのに適したアルゴリズム、つまり、再帰的なロジックが有効と考えました。
# frozen_string_literal: true
# RESP形式のデータを解析し、コマンドと引数に変換するクラス
# RESP (REdis Serialization Protocol) はRedisプロトコルで使用されるデータフォーマットです。
module RedisServer
class Parser
# データを解析して、コマンドや引数に変換するメインメソッド
# - data: クライアントから送信されたRESP形式の文字列
# 戻り値:
# - コマンドや引数を表す構造化データ
def parse(data)
case data[0] # データの先頭文字で形式を判別
when '+' then parse_simple_string(data[1..-1]) # 単純文字列
when '-' then parse_error(data[1..-1]) # エラーメッセージ
when ':' then parse_integer(data[1..-1]) # 整数
when '$' then parse_bulk_string(data[1..-1]) # バルク文字列
when '*' then parse_array(data[1..-1]) # 配列
else
raise 'Invalid RESP type' # 未知の形式の場合はエラーをスロー
end
end
private
# 単純文字列(Simple String)を解析する
# - data: "+<文字列>\r\n" 形式のデータ
# 戻り値:
# - パースされた文字列
def parse_simple_string(data)
string, rest_string = data.split("\r\n", 2)
return string if rest_string.empty? # データが完全であれば文字列を返す
[string, rest_string] # 残りのデータがある場合は配列で返す
end
# エラーメッセージ(Error)を解析する
# - data: "-<エラーメッセージ>\r\n" 形式のデータ
# 戻り値:
# - パースされたエラーメッセージ
def parse_error(data)
error, rest_string = data.split("\r\n", 2)
return error if rest_string.empty? # 完全なエラーを返す
[error, rest_string] # 残りのデータを含む配列を返す
end
# 整数(Integer)を解析する
# - data: ":<整数>\r\n" 形式のデータ
# 戻り値:
# - パースされた整数
def parse_integer(data)
num_string, rest_string = data.split("\r\n", 2)
num_string.to_i if rest_string.empty? # データが完全であれば整数を返す
[num_string.to_i, rest_string] # 残りのデータを含む配列を返す
end
# バルク文字列(Bulk String)を解析する
# - data: "$<長さ>\r\n<データ>\r\n" 形式のデータ
# 戻り値:
# - パースされた文字列、またはnil(NULLバルク文字列の場合)
def parse_bulk_string(data)
length, rest_string = data.split("\r\n", 2)
length = length.to_i
return nil if length == -1 # NULLバルク文字列の場合
# 長さ分のデータを返し、残りを含む配列を返す
return rest_string[0...length] if rest_string[length + 2..].empty?
[rest_string[0...length], rest_string[length + 2..]]
end
# 配列(Array)を解析する
# - data: "*<要素数>\r\n<要素データ>" 形式のデータ
# 戻り値:
# - 配列内のすべての要素を解析した結果
def parse_array(data)
elements = []
# 配列サイズを取得
split_pos = data.index("\r\n")
num_elements = data[0..split_pos].to_i
return nil if num_elements == -1 # NULL配列の場合
# 残りのデータを繰り返し解析
rest_data = data[split_pos + 2..-1]
num_elements.times do
command, rest_data = parse(rest_data)
elements << command
end
elements
end
end
end
めでたくCodeCraftesのfirst stageを突破🎉
さて、ここまでやってようやくfirset stageを突破しました。
ここからはあと4stage 合計40sectionくらい残っているので、まだまだ終わりませんね😅
もう眠たいので、ここで一旦おさらばしたいと思います。
無駄な実装も多いですし、rubistの皆様から熱いレビューをお待ちしております。
time_eventはなんで固まったんだろうねえ?もうちょいそこは考えます🛏️