Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
9
Help us understand the problem. What is going on with this article?
@winebarrel

go-mysqlを使ったレプリケーション

https://github.com/siddontang/go-mysql/ というライブラリを使ったレプリケーションについて書きます。

go-mysql

ググラビリティがとても悪いのですが https://github.com/siddontang/go-mysql/ というGolangのライブラリがありまして、次のような機能があります。

  • MySQLのクライアント
  • MySQLプロトコルを受け付けるサーバ
  • MySQLのレプリケーションプロトコルのハンドラ
  • ...etc

三つ目の「MySQLのレプリケーションプロトコルのハンドラ」が面白い機能で、Golangのプログラムでbinlogを出力するMySQLサーバに接続してレプリケーションを処理することができます。

プログラム例

READMEのexampleをもう少し実装してROWS_EVENTを処理するプログラムが以下のようになります。

package main

import (
    "context"
    "fmt"

    "github.com/siddontang/go-mysql/mysql"
    "github.com/siddontang/go-mysql/replication"
)

func handleRowsEvent(ev *replication.BinlogEvent) {
    rowsEvent := ev.Event.(*replication.RowsEvent)
    //rowsEvent.Dump(os.Stdout)
    fmt.Printf("rows event: table=%s, values=%v\n",
        rowsEvent.Table.Schema, rowsEvent.Rows)
}

func handleQueryEvent(ev *replication.BinlogEvent) {
    queryEvent := ev.Event.(*replication.QueryEvent)
    //queryEvent.Dump(os.Stdout)
    fmt.Printf("query event: table=%s, query=%v\n",
        queryEvent.Schema, queryEvent.Query)
}

func main() {
    cfg := replication.BinlogSyncerConfig{
        ServerID: 100,
        Flavor:   "mysql",
        Host:     "127.0.0.1",
        Port:     13306,
        User:     "root",
        Password: "",
    }

    syncer := replication.NewBinlogSyncer(cfg)

    binlogFile := "14a3896dc697-bin.000001"
    var binlogPos uint32 = 177
    streamer, _ := syncer.StartSync(mysql.Position{Name: binlogFile, Pos: binlogPos})

    for {
        ev, _ := streamer.GetEvent(context.Background())

        switch ev.Header.EventType {
        case replication.WRITE_ROWS_EVENTv2:
            handleRowsEvent(ev)
        case replication.UPDATE_ROWS_EVENTv2:
            handleRowsEvent(ev)
        case replication.DELETE_ROWS_EVENTv2:
            handleRowsEvent(ev)
        case replication.QUERY_EVENT:
            handleQueryEvent(ev)
        case replication.ROTATE_EVENT:
            rotateEvent := ev.Event.(*replication.RotateEvent)
            binlogFile = string(rotateEvent.NextLogName)
        }

        fmt.Printf("current position: %s:%d\n", binlogFile, ev.Header.LogPos)
    }
}

MySQLサーバへの再接続や、binlogのローテーションなんかはライブラリがうまいこと処理してくれます。
なので上記のプログラムを立ち上げっぱなしにしておくと、流れてくるbinlogを処理し続けます。

利用例

https://github.com/siddontang/go-mysql-elasticsearch というMySQLからElasticsearchにレプリケーションするツールで使われているようです。

あと、手前味噌ですが https://github.com/winebarrel/binrpt という自作のRDS向けの本番→ステージングレプリケーションツールでも使用していて、わりと元気に動いています。

注意点

  • 現在のポジションはメモリ上にしかないので、プログラムの再起動後に前回の続きからレプリケーションを続けたい場合、どこかに保存する必要があります
  • 現在のbinlogのファイル名はreplication.BinlogEventに含まれていないのでROTATE_EVENTをハンドルして、新しいファイル名を取得する必要があります
  • RowsEventの場合、値は取得できるんですがカラム名は取得できないので、ソースのMySQLなどから取得する必要があります
  • TABLE_MAP_EVENTが取得できないポジションから始めようとするとErr: table id XXX: invalid table id, no corresponding table map eventでエラーになります
  • MySQLのクライアントとしては完成度があまり高くないように思いました。メモリを結構消費するのと、深追いできてないのですが複数行をScanしたときたまに値の途中が欠けることがあり…(のでbinrptではクライアントとして go-sql-driver/mysqを使っています)

まとめ

以前あったC++のMySQL Binlog EventライブラリはすでにMySQL Labsには無いようなので、レプリケーションを自前でごにょごにょするというのはすでに廃れた技術なのかなと思っていたんですが、go-mysqlはなかなか完成度が高くて、いろいろと応用が利きそうな感じでした。

9
Help us understand the problem. What is going on with this article?
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away

Comments

No comments
Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account Login
9
Help us understand the problem. What is going on with this article?