本記事はMapR 6.0から導入されたChange Data Capture (CDC)機能のイントロになります。
CDCはMapR-DBのテーブルへの変更を捕捉し、変更データをMapR Event Streams(MapR ES)に送信します。
変更データとは具体的にはinsert, update, deleteクエリによる変更となります。
本記事でCDCについてすべてを紹介することは出来ませんので、まずは概要と使い方について紹介したいと思います。
CDCの管理や他の機能との比較については後日行う予定です。
後日を待たずに英語でドキュメントが読みたい!っという方はこちらあたりからどうぞ!
概要
冒頭に説明したようにCDCはMapR-DBのテーブルへの変更を捕捉し、これをMapR ESに送信する仕組みとなります。
概要図は以下のようになります。
一番左のソースとなるテーブルに変更が加えられた際に、これがGateway Serviceに通知され、Gateway Serviceが通知された内容をMapR ESに送信します。
MapR ESに格納された後は、ユーザ側でConsumerを作成し、Databaseに入れるなり解析ツールに食わせるなりなんなり出来る、というわけです。
具体的な想定例には以下のようなものがあります
- MapR-DBテーブルへの変更をトラックし、これをリアルタイムで処理
- ElasticSearchやSolrといった検索インデックス、マテリアライズド・ビューにデータをキャッシュしつつ、MapR-DBとDWHやデータマートとをリアルタイムで同期させる
- トランザクションやレポート目的で異なる用途用にそれぞれMapR-DBインスタンスを管理しつつ、リアルタイム解析用にリアルタイムで同期させておく
- 任意の外部システムからMapR-DBへの変更を包括的にConsumeする機能を提供する
MapR ESへのトピックにデータが送信される順番はgatewayにデータが送信される順番と同じになります。
CDCが送信されるMapR ESはソーステーブルと同一クラスタ上でも異なるクラスタ上でも構いません。
データの流れはソーステーブルからMapR-ESまで片道の一方向となりますが、1つのソーステーブルの変更を1つのトピックに書き出す(例1)だけではなく、複数の異なるトピックに書き出す場合(例2)、異なるソーステーブルからそれぞれ異なるトピックに書き出す場合(例3)など、様々なデータモデルを設計することが出来ます。
ただし、ソーステーブルとトピックのモデルを設定した後にはトピックパーティションの数は固定されますので気をつけましょう。
その他の例も含めたデータモデルの詳細についてはこちらをどうぞ。
使ってみる
CDCの設定
それでは早速CDCを使ってみましょう。
今回は上述の例1のように一つのソーステーブルに対し同一クラスタ内の一つのトピックを設定します。
Consumerの挙動としてはトピックへの書き込み後にその内容を参照するだけのものとします。
テーブルはこれまでも使ってきたJSONテーブルです。
maprdb mapr:> find /user/mapr/db/test
{"_id":"2000","PROP":{"intelligence":72,"lead":80,"power":90},"USERNAME":"信","total_score":242}
{"_id":"2001","PROP":{"intelligence":88,"lead":98,"power":78},"USERNAME":"嬴政","total_score":264}
{"_id":"2002","PROP":{"intelligence":80,"lead":75,"power":78},"USERNAME":"漂","total_score":233}
{"_id":"2003","PROP":{"intelligence":90,"lead":85,"power":80},"USERNAME":"昌文君","total_score":255}
{"_id":"2004","PROP":{"intelligence":88,"lead":85,"power":60},"USERNAME":"河了貂","total_score":233}
{"_id":"2005","PROP":{"intelligence":86,"lead":84,"power":80},"USERNAME":"壁","total_score":250}
{"_id":"2006","PROP":{"intelligence":87,"lead":80,"power":95},"USERNAME":"羌瘣","total_score":262}
{"_id":"2007","PROP":{"intelligence":76,"lead":68,"power":55},"USERNAME":"成蟜","total_score":199}
{"_id":"2008","PROP":{"intelligence":80,"lead":65,"power":50},"USERNAME":"竭氏","total_score":195}
{"_id":"2009","PROP":{"intelligence":62,"lead":61,"power":87},"USERNAME":"左慈","total_score":210}
10 document(s) found.
次はGatewayの設定です。
こちらも今回は同一クラスタ上へデータを送信するので、以前使った設定を使いまわします。
そして、いよいよCDCの設定です。
設定は下の用に'maprcli'コマンド2つで終了です。
ここでは/user/mapr/stream/cdc
パスに存在するストリームのcdcTopic1
というトピックにCDCを書き込みます。
[mapr@ip-172-31-5-104 ~]$ hadoop fs -mkdir /user/mapr/stream
[mapr@ip-172-31-5-104 ~]$ maprcli stream create -ischangelog true -path /user/mapr/stream/cdc
Warning: produce/consume/topic permissions defaulting to creator. To change, execute 'maprcli stream edit -path /user/mapr/stream/cdc -produceperm <ACE> -consumeperm <ACE> -topicperm <ACE>'
[mapr@ip-172-31-5-104 ~]$ maprcli table changelog add -path /user/mapr/db/test -changelog /user/mapr/stream/cdc:cdcTopic1
作成が終わったらmaprcli table changelog
コマンドから内容の確認を行います。
[mapr@ip-172-31-5-104 ~]$ maprcli table changelog list -path /user/mapr/db/test -json
{
"timestamp":1534060345679,
"timeofday":"2018-08-12 07:52:25.679 GMT+0000 AM",
"status":"OK",
"total":1,
"data":[
{
"cluster":"single",
"changelog":"/user/mapr/stream/cdc:cdcTopic1",
"changelogStream":"/user/mapr/stream/cdc",
"replicaState":"REPLICA_STATE_REPLICATING",
"paused":false,
"throttle":false,
"idx":1,
"networkencryption":false,
"synchronous":false,
"networkcompression":"lz4",
"propagateExistingData":true,
"isUptodate":true,
"minPendingTS":0,
"maxPendingTS":0,
"bytesPending":0,
"putsPending":0,
"bucketsPending":0,
"uuid":"d36c3679-b885-32e4-586d-035fe46f5b00",
"copyTableCompletionPercentage":100
}
]
}
changelogカラムが設定されていますね。
Consumerの準備
続いてConsumerの準備になります。
ConsumerはJava OJAI CDC APIを利用して実装する必要があります。
Java OJAI CDC APIで取り扱う主なクラスは以下の3つです
クラス | 役割 |
---|---|
ChangeDataReader | ChangeDataRecordオブジェクト内の個々のnodeの変更をパーサ |
ChangeDataRecord | ソーステーブル内の単一ドキュメントに実行された一連の変更命令のカプセル化 |
ChangeNode | ドキュメント内の単一フィールドへの変更のカプセル化 |
これらを使ったMapRのエンジニアによる実装例では、MapR ESから読み取ったCDCデータを更に別のストリームにpushする方法を紹介しています。
本件ではよりシンプルにMapR-ESから読み込んだデータをJSON形式で標準出力させるデモアプリのDumpCDCを使ってみます。
ただし、こちらのConsumerは本稿で実施するクエリにのみ対応しておりますので、より汎用的に対応させるためには改修が必要となります。
$ git clone https://github.com/tgib23/mapr-db-cdc-sample
$ cd mapr-db-cdc-sample
$ git checkout -b cdc_dump origin/cdc_dump
$ mvn package
ビルドが終わったら以下のように引数にストリームのパスとトピックを指定して実行します
$ java -cp maprdb-cdc-sample-1.0-SNAPSHOT.jar:`mapr clientclasspath` com.mapr.samples.db.cdc.json.DumpCDC <ストリームのパス> <トピック>
この段階で以下のようにテーブルに変更を加えてみます。
maprdb mapr:> insert /user/mapr/db/test --value '{"_id":"3011", "USERNAME":"王騎", "total_score":286, "PROP":{"intellligence":95, "power":98, "lead":93}}'
Document with id: ""3011"" inserted.
maprdb mapr:> update /user/mapr/db/test --id "3011" --m '{ "$set":[{"USERNAME":"王騎(おうき)"},{"total_score":290} ] }'
Document with id: "3011" updated.
maprdb mapr:> delete /user/mapr/db/test --id "3011"
Document with id: "3011" deleted.
DumpCDCを実行中の標準出力には以下のように表示されました
[mapr@ip-172-31-10-111 tmp]$ java -cp maprdb-cdc-sample-1.0-SNAPSHOT.jar:`mapr clientclasspath` com.mapr.samples.db.cdc.json.DumpCDC /user/mapr/stream/cdc cdcTopic1
==== Start DumpCDC with /user/mapr/stream/cdc:cdcTopic1===
{
"type" : RECORD_INSERT,
"documentId : 3011,
"recordOpTime : 1536673675167,
"recordServerOpTime : 1536673675167,
"insertData" : {
"PROP : {"intellligence":95,"lead":93,"power":98},
"USERNAME : 王騎,
"total_score : 286.0 }
}
{
"type" : RECORD_UPDATE,
"documentId : 3011,
"recordOpTime : 1536673999565,
"recordServerOpTime : 1536673999565,
"updateData" : {
USERNAME : 王騎(おうき)
}
"updateData" : {
total_score : 290.0
}
}
{
"type" : RECORD_DELETE,
"documentId : 3011,
"recordOpTime : 1536674058925,
"recordServerOpTime : 1536674058925,
}
出力されましたね。
表示についてはおかしいところがありますが、とりあえずCDCの挙動については確認することが出来ました。
まとめ
以上のようにCDCを使うことで、テーブル単位でのデータの詳細な変更をトラックすることが可能となります。
CDCの管理など今回取り扱えなかった項目については、別エントリにて紹介する予定です。