9
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

fluentdでPostgreSQLで起こった変更を取得する

Last updated at Posted at 2018-02-05

先日、PostgreSQLで発生した変更をfluentdに送る方法としてLogical Decodingが使えないかなと思い、fluentdのINPUTプラグインを作ってみたのでそれの使い方をまとめておきます。

このプラグインを使うと、PostgreSQLのLogical Decoding機能(WALをデコードしてどこかに転送する機能)を使って、PostgreSQL上で起こった全ての変更をfluentd上に流すことができます。

データの流れ
(PostgreSQL)->(logical decodingのプラグイン)->(pg-logicalプラグイン)->(fluentd)

SQL経由でDB上の情報を取ってくるのとは異なり、Logical DecodingではPostgreSQLが出力したトランザクションログ(WAL)から変更情報を取ってくるので、他の処理への影響を少なく抑えることができます(トリガを仕掛けたりもいりません)。また、Logical Decoding側もプラグイン形式なっているので、好きな形式に変更結果を表すことができるので、より柔軟にデータを取得できます。今回は、WALに書かれた変更をJSON形式に変更するwal2jsonを使ってみます(※)。

(※)wal2jsonはAmazon RDS for PostgreSQLでも使えるようになりました

1. PostgreSQL側のセットアップ

1-1. PostgreSQLのインストール
PostgreSQL 9.4以降に対応しています。
最低限以下の項目を設定します。

  • wal_level = logicalに設定します
  • max_wal_sendersmax_repilcation_slotsは1以上に設定します
  • pg_hba.confを修正してレプリケーション接続を許可しておきます

1-2. wal2jsonのインストール
pg_logicalプラグインではLogical Decoding機能を使うため、Logical Decodingのプラグインが必要となります。wal2jsonをダウンロードして、ビルドします。

$ git clone git@github.com:eulerto/wal2json.git
$ cd wal2json
$ make USE_PGXS=1
$ sudo MAKE USE_PGXS=1 install

1-3. PostgtreSQLを起動

2. fluentd側のセットアップ

2-1. pg_logicalプラグインをインストール

$ gem get fluent-plugin-pg-logical

2-2. fluentdの設定ファイルを書く

<source>
  @type pg_logical
  host localhost
  port 5432
  user masahiko
  dbname postgres
  slotname wal2json_slot
  create_slot true
  if_not_exists true
</source>
  • pluginオプションには、Logical Decodingで使用するプラグイン名を指定します。今回はwal2jsonを指定します。
  • fluentdの起動と同時に、必要なレプリケーションスロットも作成して欲しいので、create_slotオプションをtrueに指定します
  • 2回目以降の起動でもレプリケーションスロットを作成しようとして失敗するので、if_not_existsオプションをtrueに指定します

3. fluentd起動

起動すると下記のログと共にWALの受信が始まります

$ fluentd -c ./fluent.conf
:
2018-02-04 22:32:22 +0900 [info]: adding source type="pg_logical"
2018-02-04 22:32:22 +0900 [info]: #0 :host=>localhost :dbname=>postgres :port=>5101 :user=>masahiko :tag=> :slotname=>wal2json_slot :plugin=>wal2json :status_interval=>10
2018-02-04 22:32:22 +0900 [info]: #0 starting fluentd worker pid=31291 ppid=31260 worker=0
2018-02-04 22:32:22 +0900 [warn]: #0 super was not called in #start: called it forcedly plugin=Fluent::PgLogicalInput
2018-02-04 22:32:22 +0900 [info]: #0 fluentd worker is now running worker=0
2018-02-04 22:32:22.351157457 +0900 fluent.info: {"worker":0,"message":"fluentd worker is now running worker=0"}

PostgreSQL上での変更を受け取ってみる

PostgreSQL上にテーブルを作成して変更すると以下のようなログが出ます。

=# CREATE TABLE hoge (c int);
=# INSERT INTO hoge VALUES(2);
=# INSERT INTO hoge VALUES(1);
2018-02-05 23:23:42.157474052 +0900 :  "{\"change\":[{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[2]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[1]}]}"

出力されたJSONデータは整形するとこんな感じです。これはwal2jsonの仕様です。

 {                              
     "change": [                
         {                      
             "kind": "insert",  
             "table": "hoge",   
             "schema": "public",
             "columnnames": [   
                 "c"            
             ],                 
             "columntypes": [   
                 "integer"      
             ],                 
             "columnvalues": [  
                 2              
             ]                  
         },                     
         {                      
             "kind": "insert",  
             "table": "hoge",   
             "schema": "public",
             "columnnames": [   
                 "c"            
             ],                 
             "columntypes": [   
                 "integer"      
             ],                 
             "columnvalues": [  
                 1              
             ]                  
         }                      
     ]                          
 }

違うLogical Decodingプラグインを使ってみる

wal2jsonではなく、PostgreSQLのcontribモジュールとして同梱されているtest_decodingプラグインも一緒に使ってみます。

fluentd側の設定は、以下のようにしました。

<source>
  @type pg_logical
  host localhost
  port 5432
  user masahiko
  dbname postgres
  slotname wal2json_slot
  create_slot true
  if_not_exists true
  tag pg.json
</source>

<source>
  @type pg_logical
  host localhost
  port 5432
  user masahiko
  dbname postgres
  slotname testdecode_slot
  create_slot true
  if_not_exists true
  tag pg.decode
</source>

接続先は同じですが、先程の設定に加えて、test_decodingプラグインを使うレプリケーションスロットを設定しました。
(※)レプリケーションスロットは1つの接続からしか使えないので、今回のような場合は2つのレプリケーションスロットを使用します

fluentdを再起動してみます。

2018-02-04 23:18:41 +0900 [info]: adding source type="pg_logical"
2018-02-04 23:18:41 +0900 [info]: #0 :host=>localhost :dbname=>postgres :port=>5101 :user=>masahiko :tag=> :slotname=>wal2json_slot :plugin=>wal2json :status_interval=>10
2018-02-04 23:18:41 +0900 [info]: adding source type="pg_logical"
2018-02-04 23:18:41 +0900 [info]: #0 :host=>localhost :dbname=>postgres :port=>5101 :user=>masahiko :tag=> :slotname=>testdecode_slot :plugin=>test_decoding :status_interval=>10
2018-02-04 23:18:41 +0900 [info]: #0 starting fluentd worker pid=31895 ppid=31864 worker=0
2018-02-04 23:18:41 +0900 [warn]: #0 super was not called in #start: called it forcedly plugin=Fluent::PgLogicalInput
2018-02-04 23:18:41 +0900 [warn]: #0 super was not called in #start: called it forcedly plugin=Fluent::PgLogicalInput
2018-02-04 23:18:41 +0900 [info]: #0 fluentd worker is now running worker=0
2018-02-04 23:18:41 +0900 [info]: #0 pg-logical: could not create replication slot wal2json_slot
2018-02-04 23:18:41.477931908 +0900 fluent.info: {"worker":0,"message":"fluentd worker is now running worker=0"}
2018-02-04 23:18:41.502037085 +0900 fluent.info: {"message":"pg-logical: could not create replication slot wal2json_slot"}

再度、PostgreSQL上のテーブルを変更します。

=# INSERT INTO hoge SELECT generate_series(1,10);

fluentd側では以下のデータを受け取ります。

2018-02-04 15:18:55.184469399 +0900 : "BEGIN 189044"
2018-02-04 15:18:55.184852262 +0900 : "{\"change\":[{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[1]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[2]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[3]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[4]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[5]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[6]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[7]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[8]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[9]},{\"kind\":\"insert\",\"schema\":\"public\",\"table\":\"hoge\",\"columnnames\":[\"c\"],\"columntypes\":[\"integer\"],\"columnvalues\":[10]}]}"
2018-02-04 15:18:55.184961889 +0900 : "table public.hoge: INSERT: c[integer]:1"
2018-02-04 15:18:55.185093425 +0900 : "table public.hoge: INSERT: c[integer]:2"
2018-02-04 15:18:55.185275368 +0900 : "table public.hoge: INSERT: c[integer]:3"
2018-02-04 15:18:55.185367719 +0900 : "table public.hoge: INSERT: c[integer]:4"
2018-02-04 15:18:55.185437581 +0900 : "table public.hoge: INSERT: c[integer]:5"
2018-02-04 15:18:55.185554763 +0900 : "table public.hoge: INSERT: c[integer]:6"
2018-02-04 15:18:55.185648607 +0900 : "table public.hoge: INSERT: c[integer]:7"
2018-02-04 15:18:55.185725397 +0900 : "table public.hoge: INSERT: c[integer]:8"
2018-02-04 15:18:55.185819775 +0900 : "table public.hoge: INSERT: c[integer]:9"
2018-02-04 15:18:55.185897049 +0900 : "table public.hoge: INSERT: c[integer]:10"
2018-02-04 15:18:55.186005774 +0900 : "COMMIT 189044"

上記のように、異なる形式でPostgreSQL上の変更が転送されてきたことがわかります。

9
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?