LoginSignup
4
6

More than 5 years have passed since last update.

データベースの全ての変更履歴をログテーブルに残す

Last updated at Posted at 2019-02-07

新しいシステムを作るとき、数年ごとに毎回検索している気がする。
今まではORMのsaveメソッドでhstoreに保存したり、独自のログ書き込み処理をアプリ側で書いて重要な処理のときだけ呼び出すようにしたり、とか。

今度はDB側のトリガーで全ての処理を終わらせる版でやってみる。

これがあれば論理削除フラグで悩んだりしなくて良くなる。

create table foo(
  id int,
  body text
);
create table bar(
  bar_id serial primary key,
  name text not null,
  created_at timestamp not null default current_timestamp
);

create table all_log(
  created_at timestamp not null default current_timestamp,
  clock_time timestamp not null default clock_timestamp(),
  type int not null,
  table_name text not null,
  column_data jsonb
);

取り敢えずサンプルテーブルfoo bar とログテーブルall_logを用意する。
データはjsonb型。
時刻はトランザクションの時刻と実際の時刻を分ける。トランザクションの時刻でグループ化して実際の時刻で並び替えするので。
アプリの時刻、例えばWebのリクエスト時刻とかに統一するのも良さそうだけど、受け取るのが大変だしそういうのは各テーブルの中に作って関数インデックスを張れば良いという感じで。

そしてトリガーファンクションを書く。

create or replace function insert_log() returns trigger as $insert_log$
  begin
    if (tg_op = 'DELETE') then
      insert into all_log(type, table_name, column_data)
        select 3, tg_table_name, to_json(old);
    elsif (tg_op = 'UPDATE') then
      insert into all_log(type, table_name, column_data)
        select 2, tg_table_name, to_json(new);
    elsif (tg_op = 'INSERT') then
      insert into all_log(type, table_name, column_data)
        select 1, tg_table_name, to_json(new);
    end if;
    return null;
  end;
$insert_log$ language plpgsql;

to_jsonで変更するデータを全てまとめて一行で登録する。
既にデータがある状態で後からトリガーを設定する場合は、'UPDATE' で oldとnewの両方を保存した方が安全かもしれない。

あとはポチポチと各テーブルにトリガーを設定していく。

create trigger insert_log_foo
  after insert or update or delete on foo
    for each row execute procedure insert_log();

create trigger insert_log_bar
  after insert or update or delete on bar
    for each row execute procedure insert_log();

ログを取らないテーブルに対してはトリガーを設定しなければ良い。テーブル単位で決める。あと、見ての通りアカウントテーブルの削除ログだけ残そうということもすぐ出来そう。

取り敢えずサンプルデータ登録、いつものgenerate_seriesを実行。

insert into bar(name) values('bar1');
insert into bar(name) values('bar2');

insert into foo select c, 'foo ' || c from generate_series(1, 100000) as t(c);
insert into foo select c, 'foo ' || c from generate_series(1, 100) as t(c), generate_series(1, 1000);

この時点でログが入っているはずである。

select * from all_log where table_name = 'bar';
         created_at         |         clock_time         | type | table_name |                                column_data                                
----------------------------+----------------------------+------+------------+---------------------------------------------------------------------------
 2019-02-07 20:40:23.019615 | 2019-02-07 20:40:23.021567 |    1 | bar        | {"name": "bar1", "bar_id": 1, "created_at": "2019-02-07T20:40:23.019615"}
 2019-02-07 20:40:23.024225 | 2019-02-07 20:40:23.024563 |    1 | bar        | {"name": "bar2", "bar_id": 2, "created_at": "2019-02-07T20:40:23.024225"}

select count(*) from all_log where table_name = 'foo';
 count  
--------
 300000

fooテーブルのidカラムがキーとして、ログを調べる。アカウントテーブルのアカウントID 3000 のユーザーのログを調べる感じで。

update foo set body = 'changed' where id = 3000;

select count(*) from all_log where table_name = 'foo' and column_data->'id' = '3000';
 count 
-------
     2

select * from all_log where table_name = 'foo' and column_data->'id' = '3000' order by clock_time;
         created_at         |         clock_time         | type | table_name |           column_data            
----------------------------+----------------------------+------+------------+----------------------------------
 2019-02-07 20:33:03.382874 | 2019-02-07 20:33:03.543161 |    1 | foo        | {"id": 3000, "body": "foo 3000"}
 2019-02-07 20:46:57.83346  | 2019-02-07 20:46:57.864389 |    2 | foo        | {"id": 3000, "body": "changed"}

で、機能的にはこれで良いんだけど速度的にはまったく良くない。

explain analyze select * from all_log where table_name = 'foo' and column_data->'id' = '3000' order by clock_time;
                                                          QUERY PLAN                                                          
------------------------------------------------------------------------------------------------------------------------------
 Gather Merge  (cost=6920.55..7066.39 rows=1250 width=68) (actual time=68.150..70.988 rows=2 loops=1)
   Workers Planned: 2
   Workers Launched: 2
   ->  Sort  (cost=5920.52..5922.09 rows=625 width=68) (actual time=61.171..61.172 rows=1 loops=3)
         Sort Key: clock_time
         Sort Method: quicksort  Memory: 25kB
         Worker 0:  Sort Method: quicksort  Memory: 25kB
         Worker 1:  Sort Method: quicksort  Memory: 25kB
         ->  Parallel Seq Scan on all_log  (cost=0.00..5891.50 rows=625 width=68) (actual time=39.386..61.117 rows=1 loops=3)
               Filter: ((table_name = 'foo'::text) AND ((column_data -> 'id'::text) = '3000'::jsonb))
               Rows Removed by Filter: 100000
 Planning Time: 0.079 ms
 Execution Time: 71.017 ms

こんな単純なクエリでパラレルクエリが頑張っているのを初めて見た。
indexを設定する。

create index log_foo_btree
  on all_log(table_name, (column_data->'id'), clock_time desc)
  where column_data->'id' is not null;


explain analyze select * from all_log where table_name = 'foo' and column_data->'id' = '3000' order by clock_time;
                                                              QUERY PLAN                                                               
---------------------------------------------------------------------------------------------------------------------------------------
 Index Scan Backward using log_foo_btree on all_log  (cost=0.42..2266.80 rows=1500 width=68) (actual time=0.042..0.044 rows=2 loops=1)
   Index Cond: ((table_name = 'foo'::text) AND ((column_data -> 'id'::text) = '3000'::jsonb))
 Planning Time: 0.227 ms
 Execution Time: 0.065 ms

1000倍になった。
さくっと書いたけど実は部分インデックスのwheretable_name = 'foo'にしたらプランが大きく変わって少し遅くなったとかしたので、利用状況に合わせて色々試す必要があるかもしれない。

テーブル変更時はどうなるのかと言うと

alter table foo rename body to content;
alter table foo add note text;
update foo set note = 'comment...' where id = 3000;
select * from all_log where table_name = 'foo' and column_data->'id' = '3000' order by clock_time desc;
         created_at         |         clock_time         | type | table_name |                       column_data                        
----------------------------+----------------------------+------+------------+----------------------------------------------------------
 2019-02-07 21:00:53.176525 | 2019-02-07 21:00:53.205109 |    2 | foo        | {"id": 3000, "note": "comment...", "content": "changed"}
 2019-02-07 20:46:57.83346  | 2019-02-07 20:46:57.864389 |    2 | foo        | {"id": 3000, "body": "changed"}
 2019-02-07 20:33:03.382874 | 2019-02-07 20:33:03.543161 |    1 | foo        | {"id": 3000, "body": "foo 3000"}

まあそうなるよねという感じ。

更新のときに差分だけ登録した方が良いのかとも考えたけど、そういう複雑なことをするのはアプリ側で、jsonだしJavaScriptとかフロント側が担当するのが良い気がしている。

一応このログは何かトラブルがあった時に見る用で、アプリの機能としては何もするつもりが無いんだけど、jsonb_to_record を使えばjsonbを簡単に行に展開できるので、以前のバージョンに戻したり削除を取り消したりするのも楽にできそうだ。

ここに書いたトリガーは
https://www.postgresql.jp/document/10/html/plpgsql-trigger.html
ここの公式マニュアルを参考にした。

4
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
6