概要
プロジェクトではtd-agentを利用してBigQueryに対してStreamingInsertしている。
なお、使わせて頂いてるgemはこちら
https://github.com/fluent-plugins-nursery/fluent-plugin-bigquery
BigQueryのStreamingInsertではデータの整合性を維持するために、insertIdというものを指定することによりレコードの重複などを防いでいる。
しかし、このinsertIdって何を指定するのがいいんだ?
insert予定のレコードを全部引っ付けてぶん投げる
元々取っていた手法はこれ
設定例
<filter hoge>
@type record_transformer
<record>
bq_insert_id ${record.values.map(&:to_s).join)}
</record>
</filter>
<match hoge>
@type bigquery_insert
insert_id_field $.bq_insert_id
...
</match>
ログなどの取り扱いをする際はそうそう同じ行のログなんざでないものである。ミリ秒単位で同じログが出力されるならそれはログ出力部分の実装を修正したほうがいいとさえ思う。
なので、理論的には上の設定で重複は防げるはずである。
ただ、この方法を使うと通信の際1つのレコードにつき2倍のデータ容量を要してしまう。
出力されるログが微量であるなら問題ないが、高トラフィックのサーバーで1つのログの長さが長くなるとこの容量というのは意外と馬鹿にできない物になる。
それに余り過度にStreamingInsertしようとするとBigQuery側からエラーとしてリクエストを突き返される事になりそれがbufferとしてたまり続けてしまうという問題も引き起こしてしまう。
td-agentのbufferはたまり続けると設定値を超えた分を破棄してしまうのでログの欠損にも繋がってしまうのでなるべく避けたい。
そもそも論公式には insertId フィールドの長さ: 128
という制限があったりする。(ただ、運用上明らかにこれより長い文字列を付与して送信したことがあったがエラーにはならなかった。td-agent側で文字列をぶった切ってる?)
なのでInsertIdをなるべく短くする
プロジェクト仲間にこの事を相談したら下記のような方法を提案してくれた
設定例
<filter hoge>
@type record_transformer
enable_ruby true
<record>
bq_insert_id ${Digest::SHA1.hexdigest(([Time.now.to_f] + record.values).map(&:to_s).join)}
</record>
</filter>
<match hoge>
@type bigquery_insert
insert_id_field $.bq_insert_id
...
</match>
Digest::SHA1
はrequireしないと呼び出せないと勘違いしていたが、どうもtd-agent(fluentd)内ではそのまま呼び出せるらしい。これによってハッシュ化できてしまうので文字列の長さを気にする必要がなくなった。
この方法では厳密にはユニークとは言えないが、BigQueryのStreamingBufferの中ログが存在してる時間だけユニークとみなされればいいだけなので要件はほぼほぼ満たせたと思う。