はじめに
これは Python + asyncio で実装した MySQL プロトコル対応の自作プロキシの解説記事です。
本記事では実際のコードとともに、どのように MySQL プロトコルを扱い、どこでルーティングしているかを説明します。
この記事の前提は「MySQLクライアント(mysql / アプリ)に対してプロキシが MySQLサーバっぽく振る舞い、
受け取ったクエリを master / slave に振り分けて中継する」ことです。
目的は 教材(プロトコル理解) であり、プロダクション品質の完成形(互換性や安全性が十分)ではありません。
この記事は「コード全文を記事に同梱しない」前提で、要点となる部分だけを抜粋・疑似コード化して説明します。
動作前提(ざっくり)
実装例はPythonの非同期MySQLクライアントを利用して upstream(master/slave)へ接続し、
クライアント側にはMySQLサーバとして振る舞います。
- Python(3.10+ を想定)
asyncio- upstream接続:
aiomysql(内部的にpymysqlの定義を参照する箇所もあります)
※ この記事は環境構築の手順を主目的にしないため、詳細な手順は省略します。
全体構成
クライアント → プロキシ → MySQL(master / slave)
プロキシは TCP レベルでクライアントと MySQL の間に入り、プロトコルを中継・解析します。
起動フロー
- クライアント接続を待ち受ける
- ハンドシェイクを返す
- 認証
- クエリ受信と振り分け
本記事の実装例では、プロトコル上の状態遷移をざっくり次のように扱います。
-
Handshake(サーバ→クライアント) -
Handshake Response(クライアント→サーバ) - (必要なら)
Auth Switch Request(サーバ→クライアント)→Auth Switch Response(クライアント→サーバ) -
OK/ERR -
COM_QUERY/COM_INIT_DB(クライアント→サーバ) - 結果セット(サーバ→クライアント、最後に EOF / OK)
※ MySQLプロトコルはバージョン差分が大きいので、ここでは実装が対応している範囲(主に mysql_native_password)に絞ります。
サーバ起動
import asyncio
async def main():
server = await asyncio.start_server(handle_client, "0.0.0.0", 3307)
async with server:
await server.serve_forever()
非同期で複数接続を同時に処理できます。
asyncio.start_server(handle_client, host, port) で受け取る reader/write は、
MySQL用に“フレーミング”してくれるわけではないため、プロキシ側で パケット境界 を意識する必要があります。
この記事の実装では簡略化のため reader.read(1024) のような読み方をしていますが、
プロトコル実装の本筋としては「4バイトヘッダ(3バイト長 + seq)を読んで、その長さ分のpayloadを読む」が基本になります。
(教材としての説明を優先し、完全な read_exact 実装は省略します)
本来の読み取り(疑似コード)
TCPはストリームなので、パケットは「ちょうどいい長さ」で届きません。
プロトコル実装としては、次のように読むのが基本です。
async def read_packet(reader):
header = await reader.readexactly(4)
payload_len = header[0] | (header[1] << 8) | (header[2] << 16)
seq = header[3]
payload = await reader.readexactly(payload_len)
return seq, payload
MySQLパケットの基本(ここを押さえると読める)
MySQLのパケットは、先頭にヘッダが付きます。
-
payload_length: 3 bytes(リトルエンディアン) -
sequence_id: 1 byte -
payload:payload_lengthbytes
つまりバイト列としては次の形です。
[len0][len1][len2][seq][payload...]
次の create_packet(sequence_id, payload) が、この形に再構築します。
def create_packet(sequence_id, payload):
packet_length = len(payload)
header = struct.pack('<I', packet_length)[:3] + struct.pack('B', sequence_id)
return header + payload
重要ポイント:
- 1クエリの結果は「複数パケット」で返る
- 最後のパケット(EOF/OK)を検出して、そこで転送を止める必要がある
- 大きなクエリや結果は 16MB 近辺で分割される(
MAX_PACKET_LEN)
ハンドシェイク生成
import os
def create_handshake_packet():
protocol_version = b'\x0a'
server_version = b'5.7-proxy\x00'
connection_id = b'\x01\x00\x00\x00'
auth_seed = os.urandom(20)
return protocol_version + server_version + connection_id + auth_seed
MySQL クライアントに対して正規サーバのふりをするための初期パケットを構築しています。
本記事の実装例では、Protocol41のハンドシェイクを組み立てて返しています。
protocol_version=10server_versionconnection_id-
auth_plugin_data_part1(8 bytes) - capability flags(
CLIENT.PROTOCOL_41など) -
character_set/status_flags -
auth_plugin_data_part2(残りのseed) -
auth_plugin_name(例:mysql_native_password)
ここで生成した seed(auth_plugin_data_part1 + part2)が、
クライアントの認証レスポンス検証に使われます。
認証解析
def extract_auth_info(data):
parts = data.split(b'\x00')
user = parts[0].decode()
return user
上の例は超簡略です。
実際の Handshake Response は capability flags や auth response の長さ表現(lenenc)などが絡むので、
プロダクションに寄せるほど“ちゃんとパースする”必要が出ます。
クライアントが送ってきた認証パケットからユーザー名を抽出します。
実装(extract_auth_info)は簡略化しつつも、次を扱っています。
- username
- auth response(scramble)
- db(
CLIENT_CONNECT_WITH_DBが立っている場合) - auth plugin name
Auth Switch Request とは
クライアント側の送信した auth response が空だったり、
指定された plugin が未対応の場合、サーバは 0xFE の Authentication Switch Request を送って
「このプラグインで、こういうseedでやり直して」と促します。
本記事の実装例では authentication_switch_request がこれを生成し、クライアント応答を受け取ります。
クエリルーティング
def route_query(query):
q = query.strip().lower()
if q.startswith("select") or q.startswith("show"):
return "slave"
return "master"
読み取り系はスレーブへ、それ以外はマスターへ送信します。
実装では select/show を slave に寄せていますが、実務では次の注意が必要です。
- トランザクション中の読み取り(
BEGIN〜COMMIT)はslaveに流すと整合性が壊れやすい - セッション変数/一時テーブル/ロックなど、接続状態に依存するものは分岐が難しい
教材としては、まず「read-onlyクエリだけ分ける」から始めるのが良いです。
中継処理
async def relay(src, dst):
while True:
data = await src.read(4096)
if not data:
break
dst.write(data)
await dst.drain()
クライアントと MySQL の間で双方向にデータを転送します。
ただし本記事の実装例は「バイト列をそのまま流す」だけではなく、
- クライアント→プロキシ:
COM_QUERYを解析してSQL文字列を抜き出す - プロキシ→MySQL:
aiomysql.connectでサーバへ接続し、内部API(_write_bytes/write_packet)でクエリを送る - MySQL→プロキシ: MySQLパケット列を読み、終了条件を検出してまとめて返す
という形で、プロトコルを“部分的に解釈”しています。
クエリ送信:COM_QUERYとパケット分割
MySQLのクエリは COM_QUERY (0x03) で送られます。
クエリが長い場合は MAX_PACKET_LEN を超えるので分割が必要です。
本記事の実装例の execute_query は概ね次の方針です。
- 先頭パケット:
[command byte][query...] - 残り: 追加パケットで継続
教材として重要なのは「MySQLは“メッセージ”ではなく“パケット列”」だという点です。
結果セットの終端検出(EOF/OK)
クエリの応答は複数パケットで返ります。
最後に EOF または OK が来るので、そこまで読んでクライアントへ返します。
本記事の実装例は簡略化した終端判定をしています(最後のバイトが 0x00 かつ長さ5など)。
このあたりはMySQLのバージョンや CLIENT.DEPRECATE_EOF の有無で揺れるため、
プロダクションにするなら「OK/ERR/EOFパケットの厳密パース」が必要です。
終端判定(イメージ)
厳密化する場合は「先頭バイト(フィールド数やステータス)とパケット長」などを見て分岐します。
ここでは雰囲気の疑似コードだけ載せます。
def is_err_packet(payload: bytes) -> bool:
return payload[:1] == b'\xff'
def is_ok_packet(payload: bytes) -> bool:
return payload[:1] == b'\x00'
def is_eof_packet(payload: bytes) -> bool:
# EOFは0xFEだが、OKと混同しないために長さ等の条件も見る
return payload[:1] == b'\xfe' and len(payload) < 9
COM_INIT_DB(USE db)の扱い
クライアントが USE db 相当を送ると COM_INIT_DB (0x02) が飛ぶことがあります。
本記事の実装例は data[4] == 0x02 でこれを検出し、以降の接続に config["db"] を付与して反映しています。
実装して分かったこと
・プロトコル実装は境界条件が難しい
・asyncio は I/O 待ちが多い処理と相性が良い
・フレームワークに頼らないと挙動がよく見える
追加で、実務目線での落とし穴も挙げておきます。
-
パケット境界:
read(1024)では分割/結合され得るので、本来は read_exact が必要 - 終端判定: EOF/OK/ERR の扱いは capability で変わる(厳密パースが必要)
- セッション整合: 1接続の中で master/slave を切り替えると、セッション変数やトランザクションで破綻する
- 認証: plugin種類が多く、互換性が一番ハマる
- セキュリティ: プロキシが平文パスワード相当を扱うのでログ出力やメモリ管理に注意
まとめ
本プロキシは MySQL プロトコル理解のための教材として有用です。
このドキュメントは本記事の実装例の構造に合わせて、
- どのパケットを作って
- どこで解析して
- どこで master/slave に振り分け
- どこで終端を判断して返すか
を追えるようにしています。