LoginSignup
14
10

More than 5 years have passed since last update.

PostgreSQLのLogical Decoding機能についての紹介

Last updated at Posted at 2015-08-20

PostgreSQL 9.4から実装されているLogical Decodingについて調査した結果をまとめます。
個人的にかなりアツい機能なのですが、ネット上に日本語の情報がほとんどない?ので紹介も兼ねています。

Logical Decodingとは

PostgreSQLに対して実施された変更を横から抽出するための機能です。

将来的に、

  • 論理レプリケーション:特定データベース、特定テーブルのみレプリケーションetc.
  • PostgreSQLサーバ間で更新を伝播しあうことで、マルチマスタなDBサーバクラスタを実現
  • PostgreSQLから更新差分を引き出して、他業務のDBやHadoopなどに効率的にデータ連携

といった進化をしていくと期待されています。

変更情報を抜き出してくるのは、PostgreSQLのトランザクションログ(WAL)からです。このトランザクションログは人間で読める形では保存されていないのですが、これを論理的に変換する機能なのでこのような名称がついています。

具体的には、

BEGIN;
INSERT INTO foo VALUES (103, 'inserted');
DELETE FROM foo WHERE a = 82;
UPDATE foo SET b = 'updated' WHERE a = 21;
COMMIT;

のような更新を実施した場合、Logical Decodingを使って変更情報を引き出すと、以下のような情報が得られます。

postgres=# SELECT * FROM pg_logical_slot_get_changes('rep_slot', NULL, NULL);
 location  | xid  |                            data
-----------+------+-------------------------------------------------------------
 0/1737AE8 | 1896 | BEGIN 1896
 0/1737AE8 | 1896 | table public.foo: INSERT: a[integer]:103 b[text]:'inserted'
 0/1737BB8 | 1896 | table public.foo: DELETE: a[integer]:82
 0/1737C00 | 1896 | table public.foo: UPDATE: a[integer]:21 b[text]:'updated'
 0/1737C98 | 1896 | COMMIT 1896
(5 rows)

どうでしょう。やや出力表現に癖がありますが、実施された変更がちゃんと抜き出せていますね。

出力はプラグイン化されていて、上の例はPostgreSQL本体に付属しているサンプルプラグインであるtest_decodingを利用したものです。

出力プラグイン

ネットを探索したところ、現時点では以下のような出力プラグインが公開されています。

decorder_raw by michaelpq

かなりSQLっぽい形で出力してくれます。上と同じ例を使うと、

# SELECT * FROM pg_logical_slot_get_changes('rep_slot_2', NULL, NULL);
 location  | xid  |                           data
-----------+------+-----------------------------------------------------------
 0/1737F60 | 1897 | INSERT INTO public.foo (a, b) VALUES (104, 'inserted');
 0/1739A80 | 1897 | DELETE FROM public.foo WHERE a = 81;
 0/1739B08 | 1897 | UPDATE public.foo SET a = 23, b = 'updated' WHERE a = 23;
(3 rows)

こんな感じです。ほぼ再現してくれてますね。このまま別のDBにSQLとして流し込めるレベルです。

wal2json by eulerto

JSON形式で出力してくれます。こんな感じ。

[local] 18452 postgres=# SELECT * FROM pg_logical_slot_get_changes('rep_slot_3',
 NULL, NULL);
 location  | xid  |                           data
-----------+------+-----------------------------------------------------------
 0/1739D20 | 1898 | {                                                        +
           |      |         "xid": 1898,                                     +
           |      |         "change": [
 0/1739D20 | 1898 |                 {                                        +
           |      |                         "kind": "insert",                +
           |      |                         "schema": "public",              +
           |      |                         "table": "foo",                  +
           |      |                         "columnnames": ["a", "b"],       +
           |      |                         "columntypes": ["int4", "text"], +
           |      |                         "columnvalues": [105, "inserted"]+
           |      |                 }
 0/173B8B0 | 1898 |                 ,{                                       +
           |      |                         "kind": "delete",                +
           |      |                         "schema": "public",              +
           |      |                         "table": "foo",                  +
           |      |                         "oldkeys": {                     +
           |      |                                 "keynames": ["a"],       +
           |      |                                 "keytypes": ["int4"],    +
           |      |                                 "keyvalues": [80]        +
           |      |                         }                                +
           |      |                 }
 0/173B8F8 | 1898 |                 ,{                                       +
           |      |                         "kind": "update",                +
           |      |                         "schema": "public",              +
           |      |                         "table": "foo",                  +
           |      |                         "columnnames": ["a", "b"],       +
           |      |                         "columntypes": ["int4", "text"], +
           |      |                         "columnvalues": [24, "updated"], +
           |      |                         "oldkeys": {                     +
           |      |                                 "keynames": ["a"],       +
           |      |                                 "keytypes": ["int4"],    +
           |      |                                 "keyvalues": [24]        +
           |      |                         }                                +
           |      |                 }
 0/173B9D0 | 1898 |         ]                                                +
           |      | }
(5 rows)

decoderbufs by xstevens

Protocol Buffersの形式で出力してくれます。動作未確認。

bottledwater-pg by confluentinc

変更情報を引き出してApache Kafkaに格納してくれるツールです。これまたアツイ!!

紹介ブログを読むと、Kafkaを仲介してキャッシュや分析用のHadoop、監視などにデータを伝播させるアーキテクチャが構想されていて、とても興味深いです。こちらの動作未確認。

Logical Decodingの使い方

難しくないです。

PostgreSQL設定

postgresql.confにて、

  • wal_level = logical
  • max_replication_slot に1以上の値をセット

としておく必要があります。変更反映にはPostgreSQLの再起動が必要です。
後述しますが、変更情報の読み出し時にレプリケーション接続を利用する場合は上記に加えて

  • postgresql.confにてmax_wal_senders に1以上の値をセット
  • pg_hba.confにレプリケーション接続で接続するユーザの情報を追加

が必要です。

出力プラグインのインストール

インストール方法はプラグインによって異なります。
本体付属のtest_decodingの場合、PostgreSQLのソースコードのルートディレクトリから

$ cd contrib/test_decoding
$ make
# make insatll

でインストールできます。

論理レプリケーション・スロットの作成

Logical Decodingで変更情報を読み出すための口を作ります。
読み出す方法は、

  • SQL
  • レプリケーション接続

の2種類があります。ここではSQLを使う場合を説明します。
PostgreSQLにpsqlを使ってDBスーパーユーザで接続してpg_create_logical_replication_slot関数を実行します。

postgres=# SELECT * FROM pg_create_logical_replication_slot('rep_slot', 'test_decoding');

この関数の引数は、

  • 作成する論理レプリケーション・スロットの名前
  • 利用する出力プラグインの名前

です。

変更情報の読み出し

SQLで読み出す場合、変更情報を

  • pg_logical_slot_get_changes
  • pg_logical_slot_peek_changes

の2種類の関数で抽出することができます。2つの違いはgetとpeekとあるように、変更情報をキューと見立てて、キューから情報を取り出す(get)か、取り出さずにチェックする(peek)という違いです。

2つとも引数は同じで

  • 読み出し先の論理レプリケーション・スロットの名前
  • 読み出しを終えるポイント(トランザクションログのIDであるLSN、もしくは行数で指定)
  • 出力プラグインのオプションをkeyとvalueで指定

です。

例えば、

postgres=# SELECT * FROM pg_logical_slot_get_changes('rep_slot', NULL, 10, 'include-timestamp', 'on');
 location  | xid  |                            data
-----------+------+-------------------------------------------------------------
 0/1737F60 | 1897 | BEGIN 1897
 0/1737F60 | 1897 | table public.foo: INSERT: a[integer]:104 b[text]:'inserted'
 0/1739A80 | 1897 | table public.foo: DELETE: a[integer]:81
 0/1739B08 | 1897 | table public.foo: UPDATE: a[integer]:23 b[text]:'updated'
 0/1739BA0 | 1897 | COMMIT 1897 (at 2015-08-20 22:50:23.85002+09)
 0/1739D20 | 1898 | BEGIN 1898
 0/1739D20 | 1898 | table public.foo: INSERT: a[integer]:105 b[text]:'inserted'
 0/173B8B0 | 1898 | table public.foo: DELETE: a[integer]:80
 0/173B8F8 | 1898 | table public.foo: UPDATE: a[integer]:24 b[text]:'updated'
 0/173B9D0 | 1898 | COMMIT 1898 (at 2015-08-20 22:55:17.529238+09)
(10 rows)

だと、rep_slotという名前のスロットから10行だけ変更行を読み出しています。その際、出力プラグインtest_decodingにinclude-timestamp = onというオプションを与えることで、変更がコミットされた時のタイムスアンプが出力されるようになります。

もっと詳しい使い方

ドキュメントを参照してください。
http://www.postgresql.jp/document/9.4/html/logicaldecoding.html

終わりに

いかがでしょうか。

9.4以前のPostgreSQLでは同様のことをしようとすると、

  • Slony-Iのようなツールを使う
  • APもしくはトリガを使って、更新差分をテーブルとして保管しておく
  • バッチ処理でDBからデータをダンプして、更新差分を作成して連携先に渡す

といった手間がかかっていました。

Logical Decodingは、今までPostgreSQLに閉じていたデータを簡単に、そして効率的に外部に連携するための仕組みとして期待の持てる機能だと思います。

14
10
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
10