15
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

go-mysqlを使ったレプリケーション

Last updated at Posted at 2020-12-17

https://github.com/siddontang/go-mysql/ というライブラリを使ったレプリケーションについて書きます。

go-mysql

ググラビリティがとても悪いのですが https://github.com/siddontang/go-mysql/ というGolangのライブラリがありまして、次のような機能があります。

  • MySQLのクライアント
  • MySQLプロトコルを受け付けるサーバ
  • MySQLのレプリケーションプロトコルのハンドラ
  • ...etc

三つ目の「MySQLのレプリケーションプロトコルのハンドラ」が面白い機能で、Golangのプログラムでbinlogを出力するMySQLサーバに接続してレプリケーションを処理することができます。

プログラム例

READMEのexampleをもう少し実装してROWS_EVENTを処理するプログラムが以下のようになります。

package main

import (
	"context"
	"fmt"

	"github.com/siddontang/go-mysql/mysql"
	"github.com/siddontang/go-mysql/replication"
)

func handleRowsEvent(ev *replication.BinlogEvent) {
	rowsEvent := ev.Event.(*replication.RowsEvent)
	//rowsEvent.Dump(os.Stdout)
	fmt.Printf("rows event: table=%s, values=%v\n",
		rowsEvent.Table.Schema, rowsEvent.Rows)
}

func handleQueryEvent(ev *replication.BinlogEvent) {
	queryEvent := ev.Event.(*replication.QueryEvent)
	//queryEvent.Dump(os.Stdout)
	fmt.Printf("query event: table=%s, query=%v\n",
		queryEvent.Schema, queryEvent.Query)
}

func main() {
	cfg := replication.BinlogSyncerConfig{
		ServerID: 100,
		Flavor:   "mysql",
		Host:     "127.0.0.1",
		Port:     13306,
		User:     "root",
		Password: "",
	}

	syncer := replication.NewBinlogSyncer(cfg)

	binlogFile := "14a3896dc697-bin.000001"
	var binlogPos uint32 = 177
	streamer, _ := syncer.StartSync(mysql.Position{Name: binlogFile, Pos: binlogPos})

	for {
		ev, _ := streamer.GetEvent(context.Background())

		switch ev.Header.EventType {
		case replication.WRITE_ROWS_EVENTv2:
			handleRowsEvent(ev)
		case replication.UPDATE_ROWS_EVENTv2:
			handleRowsEvent(ev)
		case replication.DELETE_ROWS_EVENTv2:
			handleRowsEvent(ev)
		case replication.QUERY_EVENT:
			handleQueryEvent(ev)
		case replication.ROTATE_EVENT:
			rotateEvent := ev.Event.(*replication.RotateEvent)
			binlogFile = string(rotateEvent.NextLogName)
		}

		fmt.Printf("current position: %s:%d\n", binlogFile, ev.Header.LogPos)
	}
}

MySQLサーバへの再接続や、binlogのローテーションなんかはライブラリがうまいこと処理してくれます。
なので上記のプログラムを立ち上げっぱなしにしておくと、流れてくるbinlogを処理し続けます。

利用例

https://github.com/siddontang/go-mysql-elasticsearch というMySQLからElasticsearchにレプリケーションするツールで使われているようです。

あと、手前味噌ですが https://github.com/winebarrel/binrpt という自作のRDS向けの本番→ステージングレプリケーションツールでも使用していて、わりと元気に動いています。

注意点

  • 現在のポジションはメモリ上にしかないので、プログラムの再起動後に前回の続きからレプリケーションを続けたい場合、どこかに保存する必要があります
  • 現在のbinlogのファイル名はreplication.BinlogEventに含まれていないのでROTATE_EVENTをハンドルして、新しいファイル名を取得する必要があります
  • RowsEventの場合、値は取得できるんですがカラム名は取得できないので、ソースのMySQLなどから取得する必要があります
  • TABLE_MAP_EVENTが取得できないポジションから始めようとするとErr: table id XXX: invalid table id, no corresponding table map eventでエラーになります
  • MySQLのクライアントとしては完成度があまり高くないように思いました。メモリを結構消費するのと、深追いできてないのですが複数行をScanしたときたまに値の途中が欠けることがあり…(のでbinrptではクライアントとして
    go-sql-driver/mysqを使っています)

まとめ

以前あったC++のMySQL Binlog EventライブラリはすでにMySQL Labsには無いようなので、レプリケーションを自前でごにょごにょするというのはすでに廃れた技術なのかなと思っていたんですが、go-mysqlはなかなか完成度が高くて、いろいろと応用が利きそうな感じでした。

15
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
15
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?