0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

MapRAdvent Calendar 2017

Day 15

MapR-DBのChangeDataCapture(CDC)機能を使ってみる

Last updated at Posted at 2018-09-11

本記事はMapR 6.0から導入されたChange Data Capture (CDC)機能のイントロになります。
CDCはMapR-DBのテーブルへの変更を捕捉し、変更データをMapR Event Streams(MapR ES)に送信します。
変更データとは具体的にはinsert, update, deleteクエリによる変更となります。
本記事でCDCについてすべてを紹介することは出来ませんので、まずは概要と使い方について紹介したいと思います。
CDCの管理や他の機能との比較については後日行う予定です。
後日を待たずに英語でドキュメントが読みたい!っという方はこちらあたりからどうぞ!

概要

冒頭に説明したようにCDCはMapR-DBのテーブルへの変更を捕捉し、これをMapR ESに送信する仕組みとなります。
概要図は以下のようになります。

architecture

一番左のソースとなるテーブルに変更が加えられた際に、これがGateway Serviceに通知され、Gateway Serviceが通知された内容をMapR ESに送信します。
MapR ESに格納された後は、ユーザ側でConsumerを作成し、Databaseに入れるなり解析ツールに食わせるなりなんなり出来る、というわけです。
具体的な想定例には以下のようなものがあります

  • MapR-DBテーブルへの変更をトラックし、これをリアルタイムで処理
  • ElasticSearchやSolrといった検索インデックス、マテリアライズド・ビューにデータをキャッシュしつつ、MapR-DBとDWHやデータマートとをリアルタイムで同期させる
  • トランザクションやレポート目的で異なる用途用にそれぞれMapR-DBインスタンスを管理しつつ、リアルタイム解析用にリアルタイムで同期させておく
  • 任意の外部システムからMapR-DBへの変更を包括的にConsumeする機能を提供する

MapR ESへのトピックにデータが送信される順番はgatewayにデータが送信される順番と同じになります。
CDCが送信されるMapR ESはソーステーブルと同一クラスタ上でも異なるクラスタ上でも構いません。
データの流れはソーステーブルからMapR-ESまで片道の一方向となりますが、1つのソーステーブルの変更を1つのトピックに書き出す(例1)だけではなく、複数の異なるトピックに書き出す場合(例2)、異なるソーステーブルからそれぞれ異なるトピックに書き出す場合(例3)など、様々なデータモデルを設計することが出来ます。

  • 例1. 1つのソーステーブルの変更を1つのトピックに書き出す
    例1

  • 例2. 1つのソーステーブルの変更を複数のトピックに書き出す
    例2

  • 例3. 複数のソーステーブルの変更を複数のトピックに書き出す
    例3

ただし、ソーステーブルとトピックのモデルを設定した後にはトピックパーティションの数は固定されますので気をつけましょう。
その他の例も含めたデータモデルの詳細についてはこちらをどうぞ。

使ってみる

CDCの設定

それでは早速CDCを使ってみましょう。
今回は上述の例1のように一つのソーステーブルに対し同一クラスタ内の一つのトピックを設定します。
Consumerの挙動としてはトピックへの書き込み後にその内容を参照するだけのものとします。
テーブルはこれまでも使ってきたJSONテーブルです。

maprdb mapr:> find /user/mapr/db/test
{"_id":"2000","PROP":{"intelligence":72,"lead":80,"power":90},"USERNAME":"信","total_score":242}
{"_id":"2001","PROP":{"intelligence":88,"lead":98,"power":78},"USERNAME":"嬴政","total_score":264}
{"_id":"2002","PROP":{"intelligence":80,"lead":75,"power":78},"USERNAME":"漂","total_score":233}
{"_id":"2003","PROP":{"intelligence":90,"lead":85,"power":80},"USERNAME":"昌文君","total_score":255}
{"_id":"2004","PROP":{"intelligence":88,"lead":85,"power":60},"USERNAME":"河了貂","total_score":233}
{"_id":"2005","PROP":{"intelligence":86,"lead":84,"power":80},"USERNAME":"壁","total_score":250}
{"_id":"2006","PROP":{"intelligence":87,"lead":80,"power":95},"USERNAME":"羌瘣","total_score":262}
{"_id":"2007","PROP":{"intelligence":76,"lead":68,"power":55},"USERNAME":"成蟜","total_score":199}
{"_id":"2008","PROP":{"intelligence":80,"lead":65,"power":50},"USERNAME":"竭氏","total_score":195}
{"_id":"2009","PROP":{"intelligence":62,"lead":61,"power":87},"USERNAME":"左慈","total_score":210}
10 document(s) found.

次はGatewayの設定です。
こちらも今回は同一クラスタ上へデータを送信するので、以前使った設定を使いまわします。

そして、いよいよCDCの設定です。
設定は下の用に'maprcli'コマンド2つで終了です。
ここでは/user/mapr/stream/cdcパスに存在するストリームのcdcTopic1というトピックにCDCを書き込みます。

[mapr@ip-172-31-5-104 ~]$ hadoop fs -mkdir /user/mapr/stream
[mapr@ip-172-31-5-104 ~]$ maprcli stream create -ischangelog true -path /user/mapr/stream/cdc
Warning: produce/consume/topic permissions defaulting to creator. To change, execute 'maprcli stream edit -path /user/mapr/stream/cdc -produceperm <ACE> -consumeperm <ACE> -topicperm <ACE>'
[mapr@ip-172-31-5-104 ~]$ maprcli table changelog add -path /user/mapr/db/test -changelog /user/mapr/stream/cdc:cdcTopic1

作成が終わったらmaprcli table changelogコマンドから内容の確認を行います。

[mapr@ip-172-31-5-104 ~]$ maprcli table changelog list -path /user/mapr/db/test -json
{
        "timestamp":1534060345679,
        "timeofday":"2018-08-12 07:52:25.679 GMT+0000 AM",
        "status":"OK",
        "total":1,
        "data":[
                {
                        "cluster":"single",
                        "changelog":"/user/mapr/stream/cdc:cdcTopic1",
                        "changelogStream":"/user/mapr/stream/cdc",
                        "replicaState":"REPLICA_STATE_REPLICATING",
                        "paused":false,
                        "throttle":false,
                        "idx":1,
                        "networkencryption":false,
                        "synchronous":false,
                        "networkcompression":"lz4",
                        "propagateExistingData":true,
                        "isUptodate":true,
                        "minPendingTS":0,
                        "maxPendingTS":0,
                        "bytesPending":0,
                        "putsPending":0,
                        "bucketsPending":0,
                        "uuid":"d36c3679-b885-32e4-586d-035fe46f5b00",
                        "copyTableCompletionPercentage":100
                }
        ]
}

changelogカラムが設定されていますね。

Consumerの準備

続いてConsumerの準備になります。
ConsumerはJava OJAI CDC APIを利用して実装する必要があります。
Java OJAI CDC APIで取り扱う主なクラスは以下の3つです

クラス 役割
ChangeDataReader ChangeDataRecordオブジェクト内の個々のnodeの変更をパーサ
ChangeDataRecord ソーステーブル内の単一ドキュメントに実行された一連の変更命令のカプセル化
ChangeNode ドキュメント内の単一フィールドへの変更のカプセル化

これらを使ったMapRのエンジニアによる実装例では、MapR ESから読み取ったCDCデータを更に別のストリームにpushする方法を紹介しています。
demo

本件ではよりシンプルにMapR-ESから読み込んだデータをJSON形式で標準出力させるデモアプリのDumpCDCを使ってみます。
ただし、こちらのConsumerは本稿で実施するクエリにのみ対応しておりますので、より汎用的に対応させるためには改修が必要となります。

$ git clone https://github.com/tgib23/mapr-db-cdc-sample
$ cd mapr-db-cdc-sample
$ git checkout -b cdc_dump origin/cdc_dump
$ mvn package

ビルドが終わったら以下のように引数にストリームのパスとトピックを指定して実行します

$ java -cp maprdb-cdc-sample-1.0-SNAPSHOT.jar:`mapr clientclasspath` com.mapr.samples.db.cdc.json.DumpCDC <ストリームのパス> <トピック>

この段階で以下のようにテーブルに変更を加えてみます。

maprdb mapr:> insert /user/mapr/db/test --value '{"_id":"3011", "USERNAME":"王騎", "total_score":286, "PROP":{"intellligence":95, "power":98, "lead":93}}'
Document with id: ""3011"" inserted.

maprdb mapr:> update /user/mapr/db/test --id "3011" --m '{ "$set":[{"USERNAME":"王騎(おうき)"},{"total_score":290} ] }'
Document with id: "3011" updated.

maprdb mapr:> delete /user/mapr/db/test --id "3011"
Document with id: "3011" deleted.

DumpCDCを実行中の標準出力には以下のように表示されました

[mapr@ip-172-31-10-111 tmp]$ java -cp maprdb-cdc-sample-1.0-SNAPSHOT.jar:`mapr clientclasspath` com.mapr.samples.db.cdc.json.DumpCDC /user/mapr/stream/cdc cdcTopic1
==== Start DumpCDC with /user/mapr/stream/cdc:cdcTopic1===
{
  "type" : RECORD_INSERT,
  "documentId : 3011,
  "recordOpTime : 1536673675167,
  "recordServerOpTime : 1536673675167,
  "insertData" : {
    "PROP : {"intellligence":95,"lead":93,"power":98},
    "USERNAME : 王騎,
    "total_score : 286.0  }
}
{
  "type" : RECORD_UPDATE,
  "documentId : 3011,
  "recordOpTime : 1536673999565,
  "recordServerOpTime : 1536673999565,
  "updateData" : {
    USERNAME : 王騎(おうき)
 }
  "updateData" : {
    total_score : 290.0
 }
}
{
  "type" : RECORD_DELETE,
  "documentId : 3011,
  "recordOpTime : 1536674058925,
  "recordServerOpTime : 1536674058925,
}

出力されましたね。
表示についてはおかしいところがありますが、とりあえずCDCの挙動については確認することが出来ました。

まとめ

以上のようにCDCを使うことで、テーブル単位でのデータの詳細な変更をトラックすることが可能となります。
CDCの管理など今回取り扱えなかった項目については、別エントリにて紹介する予定です。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?