先日、PostgreSQLで発生した変更をfluentdに送る方法としてLogical Decodingが使えないかなと思い、fluentdのINPUTプラグインを作ってみたのでそれの使い方をまとめておきます。
このプラグインを使うと、PostgreSQLのLogical Decoding機能(WALをデコードしてどこかに転送する機能)を使って、PostgreSQL上で起こった全ての変更をfluentd上に流すことができます。
(PostgreSQL)->(logical decodingのプラグイン)->(pg-logicalプラグイン)->(fluentd)
SQL経由でDB上の情報を取ってくるのとは異なり、Logical DecodingではPostgreSQLが出力したトランザクションログ(WAL)から変更情報を取ってくるので、他の処理への影響を少なく抑えることができます(トリガを仕掛けたりもいりません)。また、Logical Decoding側もプラグイン形式なっているので、好きな形式に変更結果を表すことができるので、より柔軟にデータを取得できます。今回は、WALに書かれた変更をJSON形式に変更するwal2jsonを使ってみます(※)。
(※)wal2jsonはAmazon RDS for PostgreSQLでも使えるようになりました
1. PostgreSQL側のセットアップ
1-1. PostgreSQLのインストール
PostgreSQL 9.4以降に対応しています。
最低限以下の項目を設定します。
-
wal_level = logical
に設定します -
max_wal_senders
とmax_repilcation_slots
は1以上に設定します - pg_hba.confを修正してレプリケーション接続を許可しておきます
1-2. wal2jsonのインストール
pg_logicalプラグインではLogical Decoding機能を使うため、Logical Decodingのプラグインが必要となります。wal2jsonをダウンロードして、ビルドします。
$ git clone git@github.com:eulerto/wal2json.git
$ cd wal2json
$ make USE_PGXS=1
$ sudo MAKE USE_PGXS=1 install
1-3. PostgtreSQLを起動
2. fluentd側のセットアップ
2-1. pg_logicalプラグインをインストール
$ gem get fluent-plugin-pg-logical
2-2. fluentdの設定ファイルを書く
<source>
@type pg_logical
host localhost
port 5432
user masahiko
dbname postgres
slotname wal2json_slot
create_slot true
if_not_exists true
</source>
-
plugin
オプションには、Logical Decodingで使用するプラグイン名を指定します。今回はwal2jsonを指定します。 - fluentdの起動と同時に、必要なレプリケーションスロットも作成して欲しいので、
create_slot
オプションをtrue
に指定します - 2回目以降の起動でもレプリケーションスロットを作成しようとして失敗するので、
if_not_exists
オプションをtrue
に指定します
3. fluentd起動
起動すると下記のログと共にWALの受信が始まります
$ fluentd -c ./fluent.conf
:
2018-02-04 22:32:22 +0900 [info]: adding source type="pg_logical"
2018-02-04 22:32:22 +0900 [info]: #0 :host=>localhost :dbname=>postgres :port=>5101 :user=>masahiko :tag=> :slotname=>wal2json_slot :plugin=>wal2json :status_interval=>10
2018-02-04 22:32:22 +0900 [info]: #0 starting fluentd worker pid=31291 ppid=31260 worker=0
2018-02-04 22:32:22 +0900 [warn]: #0 super was not called in #start: called it forcedly plugin=Fluent::PgLogicalInput
2018-02-04 22:32:22 +0900 [info]: #0 fluentd worker is now running worker=0
2018-02-04 22:32:22.351157457 +0900 fluent.info: {"worker":0,"message":"fluentd worker is now running worker=0"}
PostgreSQL上での変更を受け取ってみる
PostgreSQL上にテーブルを作成して変更すると以下のようなログが出ます。
=# CREATE TABLE hoge (c int);
=# INSERT INTO hoge VALUES(2);
=# INSERT INTO hoge VALUES(1);
2018-02-05 23:23:42.157474052 +0900 : "{\"change\":[{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[2]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[1]}]}"
出力されたJSONデータは整形するとこんな感じです。これはwal2jsonの仕様です。
{
"change": [
{
"kind": "insert",
"table": "hoge",
"schema": "public",
"columnnames": [
"c"
],
"columntypes": [
"integer"
],
"columnvalues": [
2
]
},
{
"kind": "insert",
"table": "hoge",
"schema": "public",
"columnnames": [
"c"
],
"columntypes": [
"integer"
],
"columnvalues": [
1
]
}
]
}
違うLogical Decodingプラグインを使ってみる
wal2jsonではなく、PostgreSQLのcontribモジュールとして同梱されているtest_decoding
プラグインも一緒に使ってみます。
fluentd側の設定は、以下のようにしました。
<source>
@type pg_logical
host localhost
port 5432
user masahiko
dbname postgres
slotname wal2json_slot
create_slot true
if_not_exists true
tag pg.json
</source>
<source>
@type pg_logical
host localhost
port 5432
user masahiko
dbname postgres
slotname testdecode_slot
create_slot true
if_not_exists true
tag pg.decode
</source>
接続先は同じですが、先程の設定に加えて、test_decodingプラグインを使うレプリケーションスロットを設定しました。
(※)レプリケーションスロットは1つの接続からしか使えないので、今回のような場合は2つのレプリケーションスロットを使用します
fluentdを再起動してみます。
2018-02-04 23:18:41 +0900 [info]: adding source type="pg_logical"
2018-02-04 23:18:41 +0900 [info]: #0 :host=>localhost :dbname=>postgres :port=>5101 :user=>masahiko :tag=> :slotname=>wal2json_slot :plugin=>wal2json :status_interval=>10
2018-02-04 23:18:41 +0900 [info]: adding source type="pg_logical"
2018-02-04 23:18:41 +0900 [info]: #0 :host=>localhost :dbname=>postgres :port=>5101 :user=>masahiko :tag=> :slotname=>testdecode_slot :plugin=>test_decoding :status_interval=>10
2018-02-04 23:18:41 +0900 [info]: #0 starting fluentd worker pid=31895 ppid=31864 worker=0
2018-02-04 23:18:41 +0900 [warn]: #0 super was not called in #start: called it forcedly plugin=Fluent::PgLogicalInput
2018-02-04 23:18:41 +0900 [warn]: #0 super was not called in #start: called it forcedly plugin=Fluent::PgLogicalInput
2018-02-04 23:18:41 +0900 [info]: #0 fluentd worker is now running worker=0
2018-02-04 23:18:41 +0900 [info]: #0 pg-logical: could not create replication slot wal2json_slot
2018-02-04 23:18:41.477931908 +0900 fluent.info: {"worker":0,"message":"fluentd worker is now running worker=0"}
2018-02-04 23:18:41.502037085 +0900 fluent.info: {"message":"pg-logical: could not create replication slot wal2json_slot"}
再度、PostgreSQL上のテーブルを変更します。
=# INSERT INTO hoge SELECT generate_series(1,10);
fluentd側では以下のデータを受け取ります。
2018-02-04 15:18:55.184469399 +0900 : "BEGIN 189044"
2018-02-04 15:18:55.184852262 +0900 : "{\"change\":[{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[1]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[2]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[3]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[4]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[5]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[6]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[7]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[8]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[9]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[10]}]}"
2018-02-04 15:18:55.184961889 +0900 : "table public.hoge: INSERT: c[integer]:1"
2018-02-04 15:18:55.185093425 +0900 : "table public.hoge: INSERT: c[integer]:2"
2018-02-04 15:18:55.185275368 +0900 : "table public.hoge: INSERT: c[integer]:3"
2018-02-04 15:18:55.185367719 +0900 : "table public.hoge: INSERT: c[integer]:4"
2018-02-04 15:18:55.185437581 +0900 : "table public.hoge: INSERT: c[integer]:5"
2018-02-04 15:18:55.185554763 +0900 : "table public.hoge: INSERT: c[integer]:6"
2018-02-04 15:18:55.185648607 +0900 : "table public.hoge: INSERT: c[integer]:7"
2018-02-04 15:18:55.185725397 +0900 : "table public.hoge: INSERT: c[integer]:8"
2018-02-04 15:18:55.185819775 +0900 : "table public.hoge: INSERT: c[integer]:9"
2018-02-04 15:18:55.185897049 +0900 : "table public.hoge: INSERT: c[integer]:10"
2018-02-04 15:18:55.186005774 +0900 : "COMMIT 189044"
上記のように、異なる形式でPostgreSQL上の変更が転送されてきたことがわかります。