4
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

はじめに

これは Python + asyncio で実装した MySQL プロトコル対応の自作プロキシの解説記事です。
本記事では実際のコードとともに、どのように MySQL プロトコルを扱い、どこでルーティングしているかを説明します。

この記事の前提は「MySQLクライアント(mysql / アプリ)に対してプロキシが MySQLサーバっぽく振る舞い、
受け取ったクエリを master / slave に振り分けて中継する」ことです。

目的は 教材(プロトコル理解) であり、プロダクション品質の完成形(互換性や安全性が十分)ではありません。

この記事は「コード全文を記事に同梱しない」前提で、要点となる部分だけを抜粋・疑似コード化して説明します。

動作前提(ざっくり)

実装例はPythonの非同期MySQLクライアントを利用して upstream(master/slave)へ接続し、
クライアント側にはMySQLサーバとして振る舞います。

  • Python(3.10+ を想定)
  • asyncio
  • upstream接続: aiomysql(内部的に pymysql の定義を参照する箇所もあります)

※ この記事は環境構築の手順を主目的にしないため、詳細な手順は省略します。

全体構成

クライアント → プロキシ → MySQL(master / slave)

プロキシは TCP レベルでクライアントと MySQL の間に入り、プロトコルを中継・解析します。

起動フロー

  1. クライアント接続を待ち受ける
  2. ハンドシェイクを返す
  3. 認証
  4. クエリ受信と振り分け

本記事の実装例では、プロトコル上の状態遷移をざっくり次のように扱います。

  • Handshake(サーバ→クライアント)
  • Handshake Response(クライアント→サーバ)
  • (必要なら)Auth Switch Request(サーバ→クライアント)→ Auth Switch Response(クライアント→サーバ)
  • OK / ERR
  • COM_QUERY / COM_INIT_DB(クライアント→サーバ)
  • 結果セット(サーバ→クライアント、最後に EOF / OK)

※ MySQLプロトコルはバージョン差分が大きいので、ここでは実装が対応している範囲(主に mysql_native_password)に絞ります。

サーバ起動

import asyncio

async def main():
    server = await asyncio.start_server(handle_client, "0.0.0.0", 3307)
    async with server:
        await server.serve_forever()

非同期で複数接続を同時に処理できます。

asyncio.start_server(handle_client, host, port) で受け取る reader/write は、
MySQL用に“フレーミング”してくれるわけではないため、プロキシ側で パケット境界 を意識する必要があります。

この記事の実装では簡略化のため reader.read(1024) のような読み方をしていますが、
プロトコル実装の本筋としては「4バイトヘッダ(3バイト長 + seq)を読んで、その長さ分のpayloadを読む」が基本になります。

(教材としての説明を優先し、完全な read_exact 実装は省略します)

本来の読み取り(疑似コード)

TCPはストリームなので、パケットは「ちょうどいい長さ」で届きません。
プロトコル実装としては、次のように読むのが基本です。

async def read_packet(reader):
    header = await reader.readexactly(4)
    payload_len = header[0] | (header[1] << 8) | (header[2] << 16)
    seq = header[3]
    payload = await reader.readexactly(payload_len)
    return seq, payload

MySQLパケットの基本(ここを押さえると読める)

MySQLのパケットは、先頭にヘッダが付きます。

  • payload_length: 3 bytes(リトルエンディアン)
  • sequence_id: 1 byte
  • payload: payload_length bytes

つまりバイト列としては次の形です。

[len0][len1][len2][seq][payload...]

次の create_packet(sequence_id, payload) が、この形に再構築します。

def create_packet(sequence_id, payload):
    packet_length = len(payload)
    header = struct.pack('<I', packet_length)[:3] + struct.pack('B', sequence_id)
    return header + payload

重要ポイント:

  • 1クエリの結果は「複数パケット」で返る
  • 最後のパケット(EOF/OK)を検出して、そこで転送を止める必要がある
  • 大きなクエリや結果は 16MB 近辺で分割される(MAX_PACKET_LEN

ハンドシェイク生成

import os

def create_handshake_packet():
    protocol_version = b'\x0a'
    server_version = b'5.7-proxy\x00'
    connection_id = b'\x01\x00\x00\x00'
    auth_seed = os.urandom(20)
    return protocol_version + server_version + connection_id + auth_seed

MySQL クライアントに対して正規サーバのふりをするための初期パケットを構築しています。

本記事の実装例では、Protocol41のハンドシェイクを組み立てて返しています。

  • protocol_version=10
  • server_version
  • connection_id
  • auth_plugin_data_part1(8 bytes)
  • capability flags(CLIENT.PROTOCOL_41 など)
  • character_set / status_flags
  • auth_plugin_data_part2(残りのseed)
  • auth_plugin_name(例: mysql_native_password

ここで生成した seed(auth_plugin_data_part1 + part2)が、
クライアントの認証レスポンス検証に使われます。

認証解析

def extract_auth_info(data):
    parts = data.split(b'\x00')
    user = parts[0].decode()
    return user

上の例は超簡略です。
実際の Handshake Response は capability flags や auth response の長さ表現(lenenc)などが絡むので、
プロダクションに寄せるほど“ちゃんとパースする”必要が出ます。

クライアントが送ってきた認証パケットからユーザー名を抽出します。

実装(extract_auth_info)は簡略化しつつも、次を扱っています。

  • username
  • auth response(scramble)
  • db(CLIENT_CONNECT_WITH_DB が立っている場合)
  • auth plugin name

Auth Switch Request とは

クライアント側の送信した auth response が空だったり、
指定された plugin が未対応の場合、サーバは 0xFEAuthentication Switch Request を送って
「このプラグインで、こういうseedでやり直して」と促します。

本記事の実装例では authentication_switch_request がこれを生成し、クライアント応答を受け取ります。

クエリルーティング

def route_query(query):
    q = query.strip().lower()
    if q.startswith("select") or q.startswith("show"):
        return "slave"
    return "master"

読み取り系はスレーブへ、それ以外はマスターへ送信します。

実装では select/show を slave に寄せていますが、実務では次の注意が必要です。

  • トランザクション中の読み取り(BEGINCOMMIT)はslaveに流すと整合性が壊れやすい
  • セッション変数/一時テーブル/ロックなど、接続状態に依存するものは分岐が難しい

教材としては、まず「read-onlyクエリだけ分ける」から始めるのが良いです。

中継処理

async def relay(src, dst):
    while True:
        data = await src.read(4096)
        if not data:
            break
        dst.write(data)
        await dst.drain()

クライアントと MySQL の間で双方向にデータを転送します。

ただし本記事の実装例は「バイト列をそのまま流す」だけではなく、

  • クライアント→プロキシ: COM_QUERY を解析してSQL文字列を抜き出す
  • プロキシ→MySQL: aiomysql.connect でサーバへ接続し、内部API(_write_bytes / write_packet)でクエリを送る
  • MySQL→プロキシ: MySQLパケット列を読み、終了条件を検出してまとめて返す

という形で、プロトコルを“部分的に解釈”しています。

クエリ送信:COM_QUERYとパケット分割

MySQLのクエリは COM_QUERY (0x03) で送られます。
クエリが長い場合は MAX_PACKET_LEN を超えるので分割が必要です。

本記事の実装例の execute_query は概ね次の方針です。

  • 先頭パケット: [command byte][query...]
  • 残り: 追加パケットで継続

教材として重要なのは「MySQLは“メッセージ”ではなく“パケット列”」だという点です。

結果セットの終端検出(EOF/OK)

クエリの応答は複数パケットで返ります。
最後に EOF または OK が来るので、そこまで読んでクライアントへ返します。

本記事の実装例は簡略化した終端判定をしています(最後のバイトが 0x00 かつ長さ5など)。
このあたりはMySQLのバージョンや CLIENT.DEPRECATE_EOF の有無で揺れるため、
プロダクションにするなら「OK/ERR/EOFパケットの厳密パース」が必要です。

終端判定(イメージ)

厳密化する場合は「先頭バイト(フィールド数やステータス)とパケット長」などを見て分岐します。
ここでは雰囲気の疑似コードだけ載せます。

def is_err_packet(payload: bytes) -> bool:
    return payload[:1] == b'\xff'

def is_ok_packet(payload: bytes) -> bool:
    return payload[:1] == b'\x00'

def is_eof_packet(payload: bytes) -> bool:
    # EOFは0xFEだが、OKと混同しないために長さ等の条件も見る
    return payload[:1] == b'\xfe' and len(payload) < 9

COM_INIT_DB(USE db)の扱い

クライアントが USE db 相当を送ると COM_INIT_DB (0x02) が飛ぶことがあります。
本記事の実装例は data[4] == 0x02 でこれを検出し、以降の接続に config["db"] を付与して反映しています。

実装して分かったこと

・プロトコル実装は境界条件が難しい
・asyncio は I/O 待ちが多い処理と相性が良い
・フレームワークに頼らないと挙動がよく見える

追加で、実務目線での落とし穴も挙げておきます。

  • パケット境界: read(1024) では分割/結合され得るので、本来は read_exact が必要
  • 終端判定: EOF/OK/ERR の扱いは capability で変わる(厳密パースが必要)
  • セッション整合: 1接続の中で master/slave を切り替えると、セッション変数やトランザクションで破綻する
  • 認証: plugin種類が多く、互換性が一番ハマる
  • セキュリティ: プロキシが平文パスワード相当を扱うのでログ出力やメモリ管理に注意

まとめ

本プロキシは MySQL プロトコル理解のための教材として有用です。

このドキュメントは本記事の実装例の構造に合わせて、

  • どのパケットを作って
  • どこで解析して
  • どこで master/slave に振り分け
  • どこで終端を判断して返すか

を追えるようにしています。

4
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?