https://github.com/siddontang/go-mysql/ というライブラリを使ったレプリケーションについて書きます。
go-mysql
ググラビリティがとても悪いのですが https://github.com/siddontang/go-mysql/ というGolangのライブラリがありまして、次のような機能があります。
- MySQLのクライアント
- MySQLプロトコルを受け付けるサーバ
- MySQLのレプリケーションプロトコルのハンドラ
- ...etc
三つ目の「MySQLのレプリケーションプロトコルのハンドラ」が面白い機能で、Golangのプログラムでbinlogを出力するMySQLサーバに接続してレプリケーションを処理することができます。
プログラム例
READMEのexampleをもう少し実装してROWS_EVENTを処理するプログラムが以下のようになります。
package main
import (
"context"
"fmt"
"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
)
func handleRowsEvent(ev *replication.BinlogEvent) {
rowsEvent := ev.Event.(*replication.RowsEvent)
//rowsEvent.Dump(os.Stdout)
fmt.Printf("rows event: table=%s, values=%v\n",
rowsEvent.Table.Schema, rowsEvent.Rows)
}
func handleQueryEvent(ev *replication.BinlogEvent) {
queryEvent := ev.Event.(*replication.QueryEvent)
//queryEvent.Dump(os.Stdout)
fmt.Printf("query event: table=%s, query=%v\n",
queryEvent.Schema, queryEvent.Query)
}
func main() {
cfg := replication.BinlogSyncerConfig{
ServerID: 100,
Flavor: "mysql",
Host: "127.0.0.1",
Port: 13306,
User: "root",
Password: "",
}
syncer := replication.NewBinlogSyncer(cfg)
binlogFile := "14a3896dc697-bin.000001"
var binlogPos uint32 = 177
streamer, _ := syncer.StartSync(mysql.Position{Name: binlogFile, Pos: binlogPos})
for {
ev, _ := streamer.GetEvent(context.Background())
switch ev.Header.EventType {
case replication.WRITE_ROWS_EVENTv2:
handleRowsEvent(ev)
case replication.UPDATE_ROWS_EVENTv2:
handleRowsEvent(ev)
case replication.DELETE_ROWS_EVENTv2:
handleRowsEvent(ev)
case replication.QUERY_EVENT:
handleQueryEvent(ev)
case replication.ROTATE_EVENT:
rotateEvent := ev.Event.(*replication.RotateEvent)
binlogFile = string(rotateEvent.NextLogName)
}
fmt.Printf("current position: %s:%d\n", binlogFile, ev.Header.LogPos)
}
}
MySQLサーバへの再接続や、binlogのローテーションなんかはライブラリがうまいこと処理してくれます。
なので上記のプログラムを立ち上げっぱなしにしておくと、流れてくるbinlogを処理し続けます。
利用例
https://github.com/siddontang/go-mysql-elasticsearch というMySQLからElasticsearchにレプリケーションするツールで使われているようです。
あと、手前味噌ですが https://github.com/winebarrel/binrpt という自作のRDS向けの本番→ステージングレプリケーションツールでも使用していて、わりと元気に動いています。
注意点
- 現在のポジションはメモリ上にしかないので、プログラムの再起動後に前回の続きからレプリケーションを続けたい場合、どこかに保存する必要があります
- 現在のbinlogのファイル名はreplication.BinlogEventに含まれていないのでROTATE_EVENTをハンドルして、新しいファイル名を取得する必要があります
- RowsEventの場合、値は取得できるんですがカラム名は取得できないので、ソースのMySQLなどから取得する必要があります
- TABLE_MAP_EVENTが取得できないポジションから始めようとすると
Err: table id XXX: invalid table id, no corresponding table map event
でエラーになります - MySQLのクライアントとしては完成度があまり高くないように思いました。メモリを結構消費するのと、深追いできてないのですが複数行をScanしたときたまに値の途中が欠けることがあり…(のでbinrptではクライアントとして
go-sql-driver/mysqを使っています)
まとめ
以前あったC++のMySQL Binlog EventライブラリはすでにMySQL Labsには無いようなので、レプリケーションを自前でごにょごにょするというのはすでに廃れた技術なのかなと思っていたんですが、go-mysqlはなかなか完成度が高くて、いろいろと応用が利きそうな感じでした。