Edited at

fluentdでPostgreSQLで起こった変更を取得する

More than 1 year has passed since last update.

先日、PostgreSQLで発生した変更をfluentdに送る方法としてLogical Decodingが使えないかなと思い、fluentdのINPUTプラグインを作ってみたのでそれの使い方をまとめておきます。

このプラグインを使うと、PostgreSQLのLogical Decoding機能(WALをデコードしてどこかに転送する機能)を使って、PostgreSQL上で起こった全ての変更をfluentd上に流すことができます。


データの流れ

(PostgreSQL)->(logical decodingのプラグイン)->(pg-logicalプラグイン)->(fluentd)


SQL経由でDB上の情報を取ってくるのとは異なり、Logical DecodingではPostgreSQLが出力したトランザクションログ(WAL)から変更情報を取ってくるので、他の処理への影響を少なく抑えることができます(トリガを仕掛けたりもいりません)。また、Logical Decoding側もプラグイン形式なっているので、好きな形式に変更結果を表すことができるので、より柔軟にデータを取得できます。今回は、WALに書かれた変更をJSON形式に変更するwal2jsonを使ってみます(※)。

(※)wal2jsonはAmazon RDS for PostgreSQLでも使えるようになりました


1. PostgreSQL側のセットアップ

1-1. PostgreSQLのインストール

PostgreSQL 9.4以降に対応しています。

最低限以下の項目を設定します。



  • wal_level = logicalに設定します


  • max_wal_sendersmax_repilcation_slotsは1以上に設定します

  • pg_hba.confを修正してレプリケーション接続を許可しておきます

1-2. wal2jsonのインストール

pg_logicalプラグインではLogical Decoding機能を使うため、Logical Decodingのプラグインが必要となります。wal2jsonをダウンロードして、ビルドします。

$ git clone git@github.com:eulerto/wal2json.git

$ cd wal2json
$ make USE_PGXS=1
$ sudo MAKE USE_PGXS=1 install

1-3. PostgtreSQLを起動


2. fluentd側のセットアップ

2-1. pg_logicalプラグインをインストール

$ gem get fluent-plugin-pg-logical

2-2. fluentdの設定ファイルを書く

<source>

@type pg_logical
host localhost
port 5432
user masahiko
dbname postgres
slotname wal2json_slot
create_slot true
if_not_exists true
</source>



  • pluginオプションには、Logical Decodingで使用するプラグイン名を指定します。今回はwal2jsonを指定します。

  • fluentdの起動と同時に、必要なレプリケーションスロットも作成して欲しいので、create_slotオプションをtrueに指定します

  • 2回目以降の起動でもレプリケーションスロットを作成しようとして失敗するので、if_not_existsオプションをtrueに指定します


3. fluentd起動

起動すると下記のログと共にWALの受信が始まります

$ fluentd -c ./fluent.conf

:
2018-02-04 22:32:22 +0900 [info]: adding source type="pg_logical"
2018-02-04 22:32:22 +0900 [info]: #0 :host=>localhost :dbname=>postgres :port=>5101 :user=>masahiko :tag=> :slotname=>wal2json_slot :plugin=>wal2json :status_interval=>10
2018-02-04 22:32:22 +0900 [info]: #0 starting fluentd worker pid=31291 ppid=31260 worker=0
2018-02-04 22:32:22 +0900 [warn]: #0 super was not called in #start: called it forcedly plugin=Fluent::PgLogicalInput
2018-02-04 22:32:22 +0900 [info]: #0 fluentd worker is now running worker=0
2018-02-04 22:32:22.351157457 +0900 fluent.info: {"worker":0,"message":"fluentd worker is now running worker=0"}


PostgreSQL上での変更を受け取ってみる

PostgreSQL上にテーブルを作成して変更すると以下のようなログが出ます。

=# CREATE TABLE hoge (c int);

=# INSERT INTO hoge VALUES(2);
=# INSERT INTO hoge VALUES(1);

2018-02-05 23:23:42.157474052 +0900 :  "{\"change\":[{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[2]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[1]}]}"

出力されたJSONデータは整形するとこんな感じです。これはwal2jsonの仕様です。

 {                              

"change": [
{
"kind": "insert",
"table": "hoge",
"schema": "public",
"columnnames": [
"c"
],
"columntypes": [
"integer"
],
"columnvalues": [
2
]
},
{
"kind": "insert",
"table": "hoge",
"schema": "public",
"columnnames": [
"c"
],
"columntypes": [
"integer"
],
"columnvalues": [
1
]
}
]
}


違うLogical Decodingプラグインを使ってみる

wal2jsonではなく、PostgreSQLのcontribモジュールとして同梱されているtest_decodingプラグインも一緒に使ってみます。

fluentd側の設定は、以下のようにしました。

<source>

@type pg_logical
host localhost
port 5432
user masahiko
dbname postgres
slotname wal2json_slot
create_slot true
if_not_exists true
tag pg.json
</source>

<source>
@type pg_logical
host localhost
port 5432
user masahiko
dbname postgres
slotname testdecode_slot
create_slot true
if_not_exists true
tag pg.decode
</source>

接続先は同じですが、先程の設定に加えて、test_decodingプラグインを使うレプリケーションスロットを設定しました。

(※)レプリケーションスロットは1つの接続からしか使えないので、今回のような場合は2つのレプリケーションスロットを使用します

fluentdを再起動してみます。

2018-02-04 23:18:41 +0900 [info]: adding source type="pg_logical"

2018-02-04 23:18:41 +0900 [info]: #0 :host=>localhost :dbname=>postgres :port=>5101 :user=>masahiko :tag=> :slotname=>wal2json_slot :plugin=>wal2json :status_interval=>10
2018-02-04 23:18:41 +0900 [info]: adding source type="pg_logical"
2018-02-04 23:18:41 +0900 [info]: #0 :host=>localhost :dbname=>postgres :port=>5101 :user=>masahiko :tag=> :slotname=>testdecode_slot :plugin=>test_decoding :status_interval=>10
2018-02-04 23:18:41 +0900 [info]: #0 starting fluentd worker pid=31895 ppid=31864 worker=0
2018-02-04 23:18:41 +0900 [warn]: #0 super was not called in #start: called it forcedly plugin=Fluent::PgLogicalInput
2018-02-04 23:18:41 +0900 [warn]: #0 super was not called in #start: called it forcedly plugin=Fluent::PgLogicalInput
2018-02-04 23:18:41 +0900 [info]: #0 fluentd worker is now running worker=0
2018-02-04 23:18:41 +0900 [info]: #0 pg-logical: could not create replication slot wal2json_slot
2018-02-04 23:18:41.477931908 +0900 fluent.info: {"worker":0,"message":"fluentd worker is now running worker=0"}
2018-02-04 23:18:41.502037085 +0900 fluent.info: {"message":"pg-logical: could not create replication slot wal2json_slot"}

再度、PostgreSQL上のテーブルを変更します。

=# INSERT INTO hoge SELECT generate_series(1,10);

fluentd側では以下のデータを受け取ります。

2018-02-04 15:18:55.184469399 +0900 : "BEGIN 189044"

2018-02-04 15:18:55.184852262 +0900 : "{\"change\":[{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[1]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[2]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[3]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[4]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[5]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[6]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[7]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[8]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[9]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[10]}]}"
2018-02-04 15:18:55.184961889 +0900 : "table public.hoge: INSERT: c[integer]:1"
2018-02-04 15:18:55.185093425 +0900 : "table public.hoge: INSERT: c[integer]:2"
2018-02-04 15:18:55.185275368 +0900 : "table public.hoge: INSERT: c[integer]:3"
2018-02-04 15:18:55.185367719 +0900 : "table public.hoge: INSERT: c[integer]:4"
2018-02-04 15:18:55.185437581 +0900 : "table public.hoge: INSERT: c[integer]:5"
2018-02-04 15:18:55.185554763 +0900 : "table public.hoge: INSERT: c[integer]:6"
2018-02-04 15:18:55.185648607 +0900 : "table public.hoge: INSERT: c[integer]:7"
2018-02-04 15:18:55.185725397 +0900 : "table public.hoge: INSERT: c[integer]:8"
2018-02-04 15:18:55.185819775 +0900 : "table public.hoge: INSERT: c[integer]:9"
2018-02-04 15:18:55.185897049 +0900 : "table public.hoge: INSERT: c[integer]:10"
2018-02-04 15:18:55.186005774 +0900 : "COMMIT 189044"

上記のように、異なる形式でPostgreSQL上の変更が転送されてきたことがわかります。