LoginSignup
4
3

More than 5 years have passed since last update.

mysql binlog api

Posted at

introduction

 あるモジュールで、ユニークキー <-> セッションリンク情報をルックアップし、そのデータ(ステータス含む)でネットワーク経路決定、トラフィック制御、コントロールパケット返送を行う処理が一つのリクエストパケット毎に必要である
 以下のイメージのとおり、Counter[Ingress] , Counter[Egress] , Translater[Decap], Translator[Encap] の各モジュールがあり、1機能=1スレッド=1CPU占有 で実装している

 各機能モジュールは、1筐体で40Gbps以上のスループットを目標としており、1スレッド=1CPUあたり:2〜4Mpps(秒間:2〜4メガパケットプロセッシング)以上の高パフォーマンスモジュールとして定義している
 4Mpps=0.25μ秒=250nano秒で1パケットを処理するパフォーマンスレベルであり、当該サービスにおける想定平均パケットサイズ512Byteで換算して、1CPUで8〜16Gbps相当を処理できる数値である、論理的には4コア割当+RSS@NICで目標である40Gbps到達が可能

2〜4Mpps x 512Byte = 8〜16Gbps

w0056.png

binlog - api

<悪い例>

セッションリンク情報をルックアップする処理でパケット到着の都度データベースへ問い合わせクエリを発行するとlocal-cache(memcached等)を採用したとしても、一つのルックアップが50μ秒程度のコスト(秒間20Kクエリ)となり250nsと比較し200倍〜も遅く、ルックアップ処理がボトルネックとなってしまう

<本提案>

セッションリンク情報はシンプルにmysqlデータベースに保存し、binlog - apiを利用し本処理とは分離したスレッドをスレーブデータベース相当とし、binlogデータを各スレッド占有キャッシュ領域に分配する。この仕組みによってCounterx2, Translaterx2は、1コア辺り:2〜4Mpps(Packet Per Second)での高速処理を可能としている

スレッド間データ同期

さらにはスレッド間のデータ送受信を非ロック、データ領域をスレッド独立(占有)として、スレッド間のデータ同期・アクセスボトルネックを除去している

w0049.png

非ロックキュー

1方向非ロックキューは、inline template ヘッダファイルのみで実装しており、c++ with boost 環境に簡易に取り込める

(github のリンク)

 実験評価で10K のトンネルデータの一括更新、差分更新を理論値で各々のサービススレッドへ配信、準リアルタイムで各スレッドローカル領域への反映が完了できる

 従来実装設計では、10sec間隔等でデータベースをポーリングし、変更があったレレコードを各サービススレッドキャッシュに配信する仕組みを用いていた、それでは新データ適用まで、最悪値で、20sec の遅延が発生することが実装評価でわかっていた

 この遅延は本サービスモジュールとしては、以下の理由から致命的な弱点であった

  • トラフィック制限をオーバーしたセッションが、その後20sec程度トラフィックを継続できてしまい、公平性の観点から問題
  • 例えば、プリペイドSIMをイメージすると、料金を支払ってもすぐにセッションが利用可能とならない(ディレイが必要)、さらに料金が不足したとき、すぐにセッションを無効とできない

開発環境

  • OS X El Capitan
  • mysql --version

本番環境

本環境は、NUMA CPU + RSS-NICで構築されるXeon X86 server

mysql  Ver 14.14 Distrib 5.7.18, for osx10.11 (x86_64) using  EditLine wrapper
  • brew search mysql
automysqlbackup                mysql++                        mysql-connector-c ✔            mysql-sandbox                  mysql-utilities ✔              mysql@5.6
mysql ✔                        mysql-cluster                  mysql-connector-c++            mysql-search-replace           mysql@5.5 

mysqld
+ /usr/local/etc/my.cnf

log-bin=mysql-bin
server-id=1001
binlog_format=ROW
  • レプリケーション:master
mysql> show master status;
+------------------+----------+--------------+------------------+-------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000001 |      154 |              |                  |                   |
+------------------+----------+--------------+------------------+-------------------+

mysqlクライアントライブラリ tar をダウンロード

cd mysql-connector-c-6.1.10-src
mkdir ./build
cd ./build
cmake ../
make

binlog api tarをダウンロード

mysql-binary-log-events-1.0.2-labs

  • そのままコンパイルしてもエラー

また、mysql-connectorはローカルビルドしたものを利用したいので、make installしない
mysql-binary-log-events-1.0.2-labs/CMakeLIsts.txt の以下を変更

削除して

find_package(MySQL REQUIRED) 
if(MYSQL_SOURCE_INCLUDE_DIR)
  include_directories(${MYSQL_SOURCE_INCLUDE_DIR})
endif()

これに変更

INCLUDE_DIRECTORIES("../mysql-connector-c-6.1.10-src/include")
INCLUDE_DIRECTORIES("../mysql-connector-c-6.1.10-src/build/include")
LINK_DIRECTORIES(   "/Users/xxxx/xxxx/mysql-connector-c-6.1.10-src/build/libmysql")
SET(MYSQL_LIBRARIES "mysqlclient")
https://answers.launchpad.net/mydumper/+question/239895
mysql5.6 から publicから削除されたとのこと

/usr/local/include/mysql/private
mkdir build
cd build
cmake ../
make

binlog - api イベント処理サンプル

labのサンプルソースはそのままでは動かないのと、主に、iterateの処理が読み取りにくかった。以下は、INSERT/UPDATE/DELETEのイベントから変更が発生したユニークキー値を参照するソース


#include "binlog.h"
//
#include <stdlib.h>
#include <errno.h>
#include <string.h>
// stl
#include <iostream>
#include <ostream>
#include <iomanip>
#include <map>
#include <sstream>
#include <algorithm>


using binary_log::Binary_log;
using binary_log::system::create_transport;
using binary_log::system::Binary_log_driver;

// 変化通知の対象テーブル
#define TARGET_TABLENM  ("log_gy")
// このテーブルで、PKEYが設定されているカラムインデクス
#define PKEY_COLID      (1)
//
int main(int argc, char** argv){
    std::map<int, std::string> tid2tname;
    std::map<int, std::string>::iterator tb_it;
    std::string uri;
    int err;
    // 引数チェック
    if (argc != 2){
        std::cerr << "invalid arguments :ex. program mysql://root:<password>@localhost:3306" << std::endl;
        return(1);
    }
    uri= argv[1];
    // インスタンス準備
    std::auto_ptr<Binary_log_driver> drv(create_transport(uri.c_str()));
    Binary_log          binlog(drv.get());
    Decoder             decode;
    Converter           converter;
    Binary_log_event    *tmpevent = NULL;
    // サーバへ接続
    if ((err = binlog.connect()) != ERR_OK){
        std::cerr << "Unable to setup connection:" <<  str_error(err) << std::endl;
        return(1);
    }
    if (binlog.set_position(4) != ERR_OK){
        std::cerr << "binlog.set_position " << std::endl;
        return(1);
    }
    //
    while (true){
        std::pair<unsigned char *, size_t> buffer_buflen;
        Binary_log_event *event = NULL;
        const char *msg= NULL;
        // 次のイベントを待機
        if ((err = drv->get_next_event(&buffer_buflen)) != ERR_OK){
            std::cerr << str_error(err) << std::endl;
            break;
        }
        // デコード
        if (!(event = decode.decode_event((char*)buffer_buflen.first, buffer_buflen.second, &msg, 1))){
            std::cerr << msg << std::endl;
            break;
        }
        // イベントタイプ毎の処理
        // データ挿入、更新、削除 のみ
        switch(event->get_event_type()){
            case TABLE_MAP_EVENT:
                if (tmpevent != NULL){
                    delete tmpevent;
                }
                tmpevent = NULL;
                // 監視対象テーブルのみ
                if (strncasecmp(TARGET_TABLENM, ((Table_map_event*)event)->m_tblnam.c_str(), strlen(TARGET_TABLENM))== 0){
                    // 直前のTABLE_MAPイベントバッファを、WRITE/UPDATE/DELETE ROWS イベントで利用するため
                    // (メタ情報として利用される)
                    // 一時バッファに移動、1処理前のtableがセットされている場合、一旦解放する
                    tmpevent = event;
                    event = NULL;
                }
                break;
            // mysql5.7のINSERT/UPDATE/DELETEイベントをhook
            // 最初のカラムを、ユニークidで定義しておくことで
            // 変更通知対象が特定できるようになる
            case WRITE_ROWS_EVENT:
            case UPDATE_ROWS_EVENT:
            case DELETE_ROWS_EVENT:
                // 注意:c++ stlに慣れていると、習慣で以下のように書いてしまう
                //      Row_event_set::iteratorは、レコード数が1の時に、rows.begin() == rows.end()
                //      がtrueとなるので、いつもの書き方だと処理されないことになる
                //
                // // 慣例のiterate処理
                // for (auto itr = rows.begin();itr != rows.end();++itr){
                //     ....
                // }
                {   Row_event_set rows((Rows_event*)event, (Table_map_event*)tmpevent);
                    Row_event_set::iterator itr = rows.begin();
                    do{
                        Row_of_fields   rof((*itr));
                        int             colid = 0;
                        // col
                        for(Row_of_fields::iterator itf = rof.begin();itf != rof.end();++itf,colid++){
                            if (colid == PKEY_COLID){
                                std::string str;
                                converter.to(str, (*itf));
                                if (event->get_event_type() == WRITE_ROWS_EVENT){
                                    // 監視対象となる新たなデータが挿入された
                                    // >>> 業務ロジックスレッドへ変更通知(新規)を発行する
                                    std::cout << TARGET_TABLENM << ":INSERT[" << str << "]" << std::endl;
                                }else if (event->get_event_type() == UPDATE_ROWS_EVENT){
                                    // [UPDATE] データに変更があった
                                    // >>> 業務ロジックスレッドへ変更通知(変更)を発行する
                                    std::cout << TARGET_TABLENM << ":UPDATE[" << str << "]" << std::endl;
                                }else if (event->get_event_type() == DELETE_ROWS_EVENT){
                                    // [DELETE] データが削除された
                                    // >>> 業務ロジックスレッドへ変更通知(削除)を発行する
                                    std::cout << TARGET_TABLENM << ":DELETE[" << str << "]" << std::endl;
                                }
                                break;
                            }
                        }
                    }while (++itr != rows.end());
                }
                break;
            default:
                // dont care.
                break;
        }
        // 解放が必要なイベント
        if (event != NULL){
            delete event;
        }
        event = NULL;
    }
    return(0);
}

まとめ

 NUMA, RSS, 非ロックキュー,binlog api を最適配置することで例えば、フルカスタマイズドSDN、リアルタイムオンラインゲーム、チャットエンジン、KeyValue Store キャッシュデータベース等のシステムに適用することが可能である

 いろいろなオンラインサービス(Photonサーバ等)では、基本的にプロトコルスタック上位に実装されている高機能サービスで、本提案で述べているサービスとはレイヤ:スループットレベルが異なります

 NUMA, RSS, 非ロックキュー、binlog apiの最適設計でハードウェアプロダクトレベルのオンラインサービスが可能なことを示すことができたと思います

補遺

 
 プロトコルスタックを介さないこの仕組みは、UHFT等の低レイテンシシステムへの応用も可能だと考えています(一般的にプトロコルスタックを経由することのレイテンシ増加が支配的だと考えているため):未評価

4
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
3