1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

冪等性を実現するデータ基盤のバッチデータ処理パターン

Posted at

はじめに

最近データ基盤について説明をすることが多くあるのですが、バッチデータ処理のパターンについていい感じにまとまっているものがあると便利だなーと思っていました。

過去に私が書いた記事(「1年前の自分が読みたかった、データエンジニアリング入門」で1トピックとして取り上げていたのですが、もう少し加筆して個別にまとめておくと良さそうではということで、改めて記事にすることにしました。

冪等性とは何か

データ処理のパターンについて説明する前に、まずは非常に重要な概念である冪等性について抑えておきましょう。

冪等性(べきとうせい)とは、「ある操作を何度行っても同じ結果になること」を意味しています。データエンジニアリングの文脈で言い換えると、

データ処理を行った際に、

  • データの欠損が生じない
  • データの重複が生じない

処理ができていれば、その処理は何度行っても同じ結果になり、冪等性が担保されると言えます。

なぜこれが大事かというと、データ処理はネットワークやシステムの不具合によって、処理が失敗することがしばしばあるからです。失敗したときに、再実行すればよい形にしておくことで、運用負荷が格段に下がります。

データ処理パターン

では、ここから具体的な処理パターンを取り上げていきます。

以下の記載はBigQueryを念頭においた内容になっています。

洗い替え

はじめに考えるべき方法は、洗い替え処理です。もともとあったデータをなくしてまるっと入れ替える洗い替え処理は、冪等性を担保する最もシンプルなパターンになります。

以下のようなときに利用するといいでしょう。

  • データ量が少なく、ソースシステムに負荷をかけない
  • 処理時間があまりかからず、後続の処理を遅れさせない
  • 処理量やAPIのコール数など、ソースシステムの制限を圧迫しない

具体の処理としては、以下の2つがあります。

Replace

これは既存のテーブルを削除(Drop)して、新たにテーブルを作り替える形です。メリットとしては、取得元のデータのスキーマが変わったとしても、その変化に追従することができます。一方で、テーブルを削除してしまうことで、タイムトラベル(システムで保持される過去データ)を利用したデータの復旧などが難しくなるデメリットもあります。

Truncate Insert

これはテーブルを保持したままデータのみを削除(Truncate)し、スキーマが保持されたテーブルに新たなデータを挿入する形です。Replaceと異なり、テーブルが保持されることでデータの復旧をしやすいことがメリットになります。一方で、スキーマの変更に追従しないため、予期せぬ影響が出ることがあります。

カラムが追加された場合、追記処理がエラーになります。処理方法によっては、テーブルが空の状態になる(冪等でなくなる)可能性があります。また、仮にカラムが削除された場合は、削除されたカラムにNullのレコードが入ることになります。

洗い替えまとめ

私個人としては、それぞれの特性を踏まえて、開発初期ではある程度スキーマが変わることを想定してReplaceで処理を組み、要件が固まってからはTruncate Insertに変更するというなやり方もしばしば行います。

なお、具体の処理はサービスによって差異があることもあるので、技術仕様を踏まえた上で採用するように気をつけましょう。

差分更新

洗い替えでは問題が生じるようになってきたら、次に考えるのが差分更新です。利用を検討する場面は、洗い替えを実施する場合と逆で、以下のいずれかが当てはまるときです。

  • データ量が多く、ソースシステムに負荷をかけてしまう
  • 処理時間が長くなって、後続の処理を遅れさせてしまう
  • 処理量やAPIのコール数など、ソースシステムの制限が圧迫されてきている

差分更新では、処理については効率化されますが、基本的にただ再実行をするのがベースとなる洗い替えより設定ミスのリスクや運用負荷が上がるので、きちんとその点を認識の上で利用するようにしましょう。

次から具体的な処理方法について説明しますが、ポイントとなるのは元データの更新方法です。

Insert

ログデータなど、元データに対して新たなデータが新規作成のみされるとき、Insertの処理が対応する方法になります。このとき差分更新なので、単純なInsertではなく、

  • インクリメンタルなIDをベースに、前回処理をしたよりも大きいものを追記
  • 作成日時をベースに、前回処理をしたものよりも後のものを追記
  • ファイル名をベースに、前回処理をしたものよりも後のものを追記

といった処理になります。「前回処理をしたもの」は処理のタイミングでどこまで処理をしたかを記録しておくことや、取り込み済のデータから読み取ることが考えられます。

取り込み済のデータから読み取る場合、1カラムとはいえフルスキャンをすることになりかねません。そこで、データ量によっては差分のみのテーブルを間におく事で、そのテーブルから前回処理した最後のレコードを特定し、追加分だけ洗い替えで間のテーブルに保管しつつ、それを全量のテーブルに書き込むような形も考えられるでしょう。

Delete & Insert

Google Analyticsのデータが有名ですが、新規作成に加えて一定の新しいデータだけ更新処理がされるものでは、Delete & Insertの処理をすることが多くあります。そのままGoogle Analyticsを例にすると、公式ドキュメントには下記のような記載があります。

データは異なる間隔で処理されるため、更新頻度はさまざまです。たとえば、当日データは 1 日を通して更新されるため、一部のデータにより迅速にアクセスできます。一方、日次データはより包括的(より多くのデータソースを利用)であり、データの処理により長い時間がかかるため 1 日に 1 回のみ更新されます。
データの処理には 24~48 時間かかることがあります。この期間中、レポートのデータに変化が生じる場合があります。

このような仕様になっているため、更新が生じうる部分のデータを一度削除して、その部分のデータを入れ直すということがよく行われます。具体的に言うと、今から3日前以降のデータを削除して、3日前以降のデータを改めて取得して追記すると、更新された分まで確実に取得できることになります。

なお今回はGoogle Analyticsなのでこの形で問題ないですが、一般にN日前からのデータを削除して、再度N日前からのデータを追記し直すというのは有効です。

このとき、N日という数値をハードコードするのではなく、変数として定義しておくとリカバリの容易性を上げることになります。すなわち、デフォルト値としては仮にN=3だとして運用していて、例えば7日前のデータに不備があると判明したときには、N=7として変数を指定して実行する事で、異常レコードを速やかに修正することができます。

また、このDelete & Insertの応用として、update_atの値をもとに処理をするとレコードの更新が直近のみでないときも冪等な処理にすることができます。ただし、update_atにパーティションを張っていない場合、取り込み側のテーブルはフルスキャンすることになりますが。

Upsert(Update & Insert)

前述のようなレコードの新規追加と更新がある場合、Merge処理を実行することでUpdateとInsertに追従することができます。この処理は、DeleteとInsertに分けて書くよりも楽ですしInsertでエラーが起きてデータが異常になるリスクも回避することができます。(Transactionを設定すればよいというのは、今回は割愛します)

なお、このとき注意するのが削除レコードの取扱いです。データベースにおけるレコードの削除は、2つのパターンがあり、

  • 物理削除:レコードをデータとして削除する
  • 論理削除:is_deletedのような削除フラグのカラムがあり、これがfalseからtrueに変更される

というものです。

データベースに慣れていない人にはなじみがないかもしれませんが、誤操作に伴う復旧を考慮したときに、論理削除の形式は便利なものになります。一方で、法令等への準拠にあたり、物理削除が必要になってきたり、データを保持したくないがために物理削除をすることもあるでしょう。

Upsertで対応できるのは、UpdateとInsertということで、論理削除の処理形式をとっているときに限定されるのに注意してください。

なお、Upsert処理をする際にスキャン量を減らすには一工夫必要で、例えば下記の記事が参考になります。

Merge

レコードの新規追加と更新に加えて削除がある場合、それらに追従するためのMerge処理を使うことになります。バッチ処理でこれを行う場合、ソースシステムから全量取得して、書き込み先のデータには存在するが、取得元のデータには存在しないものを削除されたレコードとして認識して削除することになります。

なお、これをそのまま行おうとすると、結局全レコード取得して全レコードに対してデータ比較をかけることになるので、データの処理量を減らすことはできません。何らかの工夫をしない限り、結局洗い替えと何も変わらない形になってしまいます。

そこでデータの処理量を減らすには、ストリーミング処理であるログベースのCDC(Change Data Capture:変更データキャプチャ)が利用されます。これは、データベースの変更のログをソーステーブルから取得して、それをターゲットテーブルに適用することでデータの複製が行えます。

おわりに

とりあえずざっと書き出してみましたが、やや雑なので後でもう少し書き直そうと思います。あと図表がないとイメージわかないですね・・・

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?