0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

「データ指向アプリケーションデザイン」第 III 部 (導出データ) を読んだ

Last updated at Posted at 2022-08-23

読んだので個人的なメモを残す。
exhaustiveなメモではない。


第Ⅲ部では、以下のような複数のデータシステム群を結合し、一貫したアプリケーションアーキテクチャを作成する際の問題を検証する。

  • データストア
  • インデックス
  • キャッシュ
  • 分析システム

データを保存して処理するシステムは2種類に分類できる。

  • 記録のシステム (Systems of record)
    • source of truth
    • 正規化されている。
  • 導入データシステム
    • = 他の既存データを変換・処理して得られた結果。
    • 例えばキャッシュ、非正規化された値、インデックス、マテリアライズドビュー。
    • 例えば読み取りのクエリのパフォーマンスを良くする。

第10章 バッチ処理

本章ではMapReduce及びいくつかのバッチ処理アルゴリズムとフレームワークを見ていく。

[比較参考] UNIXツールによるバッチ処理

📝awk, sed, grep, sort, uniq, xargsといったコマンドを組み合わせる

コマンドの連鎖で例えばログファイルを極めて容易に分析できる。なぜなら

  • 一様なインターフェース
    • 入力、出力ともにファイルディスクリプタ
  • ロジックと結線の分離
    • = プログラムは入力がどこから来てどこへ行くのか気にしない
    • 必要なのはstdinから入力を読み込み、stdoutへ書き出すこと
  • 透過性があり実験しやすい
    • Unixコマンドへの入力ファイルはイミュータブル(書き換えられない)
    • パイプラインを好きな場所で終わらせてlessにパイプして出力を確認できる

MapReduce

Unixのツール群の最大の制約は、それらが単一マシン上でしか動作しないこと。

Hadoopのようなツール群を使うことで、これを複数マシンへスケールさせられる。

2019年時点で最大のHDFS環境は数万台のマシン上で動作し、ストレージ容量合計は数百ペタバイト

  • 1つのMapReduceジョブ = 1つのmapper処理 ->1つのreduce処理
    • mapper処理のタスク数=入力ディレクトリ内のファイルブロック数
    • reducer処理のタスク数=ジョブの作成者が設定
  • コールバック関数(mapper, reducer)は状態を持たず、指定された出力以外に副作用を外部に及ぼさない
    • =MapReduceジョブの実行は、出力生成以外の副作用はない
    • =タスクをリトライしても、ジョブの最終出力はフォールトが生じなかった場合と同じになる
    • 👉️バッチ処理のジョブコードで耐障害性のための仕組みの実装を気にしなくて良い
    • また、バッチ処理のメンテナンスも容易になる
  • MapReduceジョブは、分散ファイルシステム上のファイルを読み書きする
    • Hadoopでは、HDFSのディレクトリがジョブの入力。その各ファイル/ファイルブロックが個別のmapタスクで処理されるパーティション。
    • HDFSでは、はネームノード(中央サーバー)が各マシンのデーモンプロセスを管理
  • 耐障害性のために、ファイルブロックは複数マシンにレプリケーションされる
  • ジョブの実行は
    • 入力ファイル群をレコードに分割する
    • mapper関数を呼び、各入力レコードをキーバリューペア(複数キーも可能)に変換する。
    • reducer関数により、ソート順を保ちながらそれらをソートする。
    • mapper関数の出力がreducer関数の入力に渡される場合、暗黙のうちにソートされる。これは(#mapper, #reducer)=(3, 2)の場合、以下で実現される。
      • mapperの出力はm1, m2, m3ノードのそれぞれのディスク上で、[r1, r2] にパーティショニングされる。これにはキーのハッシュ値を利用。
      • mapperの出力完了後に、スケジューラが各reducer群に出力ファイルのフェチが可能になったことを伝達する。
      • reducerは各m1, m2, m3ノードのそれぞれから、自分のノードのパーティションに相当する出力を取得する。
  • ジョブの依存関係を明示的に指定するために、ワークフロースケジューラー(Luigi, Airflow, Pinball, etc)が使用できる。

🟢ワークフローにおいて、fact table(例、click_events)の各レコードに対し、それに関連するdimension table(例、users)のレコードをJOINして価値ある結果を見いだしたいことがある。

このJOINの方法としては、

  • reducer側で行う
    • ETLを使用してdimensionテーブルのコピーをfact tableと同じ分散ファイルシステム上におく。
    • ソートマージ結合
      • これは「同じキーを持つレコードを同じ場所に持ってくる」パターン🟥
      • mapper1でfact tableのレコードが処理後、JOINしたいキーによってパーティションする
      • mapper2でdimension tableのレコードをmapper1と同じキーでパーティションする
      • 👉️ reducerでは、(keyの順番, それに対応するdimension record -> fact records) の順番にソートされる。=secondary sort。これによりreducerは単一ノード上でJOINのロジックを容易く実行できる。
  • mapper側で行う
    • ブロードキャストハッシュ結合
      • mapperのみ使用、reducerは使わない
      • それぞれのmapperが起動時に、JOIN対象のdimension tableをインメモリのハッシュテーブルにロードする。これを使いmapperの処理のなかでJOINを行う。
      • dimension tableが、各メモリのメモリに収まるくらい小規模な場合に使用可能
    • パーティション化ハッシュ結合
      • 入力の時点でdimension tableのレコード群と、fact tableのレコード群を、同じキーでパーティションする
      • -> joinしたいレコード群は全て同じパーティション内にある
      • いずれかのレコード群をハッシュとしてメモリ上に保持する
    • map側マージ結合
      • ↑で、同一キーによるパーティション化のみならず、そのキー内でのソートも行われる。
      • マージソートを利用できるので、いずれのレコード群もメモリにロードしなくても済む

⚠️「同じキーを持つレコードを同じ場所に持ってくる🟥」パターンでは、スキュー(=ホットスポット)が発生しうる。

この緩和策としては

  • skewed joinメソッド(Pigなど)
  • 2ステージでグループ化を行う(Hiveなど)

🟢バッチワークフローの出力の例は

  • 検索エンジン用のインデックス(昔のGoogle)
  • 機械学習システムのclassifier
  • レコメンデーションシステム
    • これらはバッチジョブで内部作成したデータファイルに書き込み -> アプリケーション用の読み取り専用のDBへバルクロードすべし。

🟢分散DBと比較すると、Hadoopは

  • ストレージの多様性
    • Hadoopでは事前に決めた特定のデータモデルに従う必要はない。
  • 処理モデルの多様性
    • =自分の作成したコードを大規模なデータセットに対して容易に実行できる柔軟性がある。
  • 障害対策に敏感でない
    • 副作用がないため、個々のタスクの粒度で処理のリトライが可能であるため
  • 非常に頻繁にディスク書き込みが行われる
    • データセットがメモリに収めるには大きすぎるため
  • (Javaなど)言語の既存ライブラリを利用し任意のコードで処理を書くことができる

🟢MapReduce は、中間出力を実体化(materialization = ファイルに書き出す)する。先行するジョブの全てのタスクの完了を待つ必要があるため、ワークフローの実行速度は低下する。一方でUnixパイプは、中間状態を完全には実体化せずインメモリバッファを用いて順次ストリーミングしている。

👉map reduceのこれらの問題は、データフローエンジン(Spark, Tez, Flink, etc on Hadoop)で対処されており、高速である。これは

  • mapper, reducerの代わりにoperatorを指定する -> 処理を柔軟に書ける
  • mapとreduceのステージ間で行われていた負荷の高いソートをデフォルトでは行わない
  • ワークフロー内の全てのJOINとデータの依存関係は明示的に示される
  • オペレータ間の中間状態はメモリかローカルのディスクに保存される。HDFSへは書き込まない
    • 障害発生により中間データを喪失した場合には、再計算を行う。(計算結果にランダム性がなく決定的であることが必要)
    • ただし、最初の入力と最終的な出力先はHDFS。

なお、PigやHiveやCascadingで実装されたワークフローはコードを修正することなく単純な設定の変更のみでMapReduceからTezあるいはSparkに切り替えられる。

🟢グラフアルゴリズムの並列化(=再帰的な処理)には、Pregelモデルを使用した Apache Giraph, SparkのGraphX API, FlinkのGelly APIを用いる。

📝Hive、Prestoは、Hadoop上で動作するクエリエンジン。クエリ対象として指定できるのは、カラムナーデータフォーマットの orc, perquetファイル。

第11章 ストリーム処理

バッチ処理は入力データが有限。一方、ストリーム処理は入力データが有限ではない。

👉️ストリーミング処理では、処理を頻繁に(毎秒等)行ったり、イベントが生じるたびに連続的に行ったりする。

ストリーミングでは、複数のProducerに生成され、複数のConsumerに消費されるPublish/Subscribeモデルが適している。

  • Producerのメッセージ送信速度にConsumerの消費速度が追い付かない場合の対策は
    • システムがメッセージをドロップする
    • キューにメッセージをバッファリングする = メッセージブローカー
    • バックプレッシャーによりフロー制御を行う(例えばProducerが送信を行えないようブロックする)
  • 耐障害性が必要かはアプリケーションに強く依存する。必要な場合、
    • プロデューサーからコンシューマーへ直接メッセージングする。方法としては
      • UDPマルチキャストを使用する
      • WebhookとしてProducerがConsumerへのリクエストを行う
      • ConsumerがProsucerへのリクエストを行う
    • メッセージブローカを使用する
      • =メッセージストリームの扱いに最適化されたDB

メッセージブローカーはストリーム処理に有用

メッセージブローカーは、

  • 永続性の問題をアプリケーションから引き剥がし担当する
  • コンシューマの処理を非同期にできる

以下メッセージブローカーの説明。大きく2タイプを紹介していた。

  • 旧来型のJMS/AMQP標準のブローカー
    • 新しいメッセージを受信時、コンシューマへ通知する -> コンシュマーが処理を完了し承認(ack)を送信 -> ブローカーはそのメッセージを削除する
      • = push型?
      • 👉ブローカーはメッセージを自動的に削除するのでキューは短い前提。
    • ◉メモリにメッセージ群が収まらない場合にのみディスクに書き出す
    • 同一トピックを読み取るコンシューマーのパターンを二つ紹介する。
      • ロードバランシング
        • 一つの処理を並列化できる
      • ファンアウト
        • コンシューマーの処理は独立している。
    • ブローカーは承認を得られなかった場合、他のコンシューマにそのメッセージを再送信する
      • 👉ロードバランシングで再送信が行われるとメッセージの順序が入れ替わることがある😢
    • メッセージ処理の負担が大きいかもしれず(なぜ?)、メッセージ単位で処理を並列化したく(ログベースでも可能では?)、メッセージの順番はそれほど重要でない場合に適している
    • e.g. RabbitMQ, ActiveMQ, Google Cloud Pub/Subなどでサポートされている
  • ログベースメッセージブローカー
    • ◉メッセージはディスク上にログとして書き出される
    • ログをパーティション化することでスループットを高められる。(数百万メッセージ毎秒まで)
      • ブローカーが各パーティション内で単調増加するシーケンス番号をメッセージに振り当てる。各コンシューマーはオフセットとしてこれを保持する。(一つのコンシューマクライアントが複数のパーティションから読み取ることもある)
      • 👉単一パーティション内ではメッセージの順序が保証される👍
      • ブローカーは各コンシューマーグループへメッセージをファンアウトし、各グループ内のクライアントたち(1スレッド1クライアント)にメッセージをロードバランシングできる。
    • コンシューマーは処理済みのオフセットを前進させてどこまで読み取ったかを管理する
    • ログはリングバッファ(循環バッファ)として全てディスク上に保存され、通常数日から数週間分のメッセージを保存できる。
    • コンシューマの読み取りが大きく遅れている場合には、ブローカーが警告を発する。これを人間の運用担当者が確認し、低速なコンシューマを修正する。
    • メッセージ処理のスループットが高くなければならず、一つ一つのメッセージ処理は高速に行え(なぜ?)、メッセージの順番が重要である場合に適している
    • e.g. Apache Kafka, Amazon Kinesis Stream

📝2019年時点で、典型的な大型HDは6TBの容量を持ち、シーケンシャルな書き込みのスループットは150MB/s -> 上限スピードで書き込むと11時間でいっぱいになる

ストリーム処理を用いた設計

ここまで、ストリーム処理をアプリケーションを導入するのに有用なメッセージブローカーを紹介した。

このセクションでは、ストリーム処理をアプリケーションに導入する際のアーキテクチャを紹介している。

ヘテロなデータシステム(複数の異なる技術を組み合わせている)で生じる問題は、

  • [並行性の問題] 二重書き込み(dual writes)
    • 記録のレコードを二つのクライアントが順に更新したとする。導出データシステム内の相当レコードの更新が逆順に行われた場合、二つのデータソース間でデータ不整合が発生する。
  • [耐障害性の問題] 記録のレコードは成功するが、導出データシステムの更新は失敗する
    • 要はアトミックに更新されていない。

これらを解決するためにイベントストリームの考え方をシステムに導入する。

  • 変更データキャプチャ(CDC)
    • 記録のレコード(leaderと見なせる)への更新をストリームとして導出データシステム(followerと見なせる)が利用する。
    • 記録のシステムがリーダー、導入データシステムをフォロワーと捉えることができる
    • 実装方法は①データベーストリガーを利用したり、②レプリケーションログを自前でパースしたり、など。
    • Kafkaなどではログコンパクションがサポートされている。これは各レコードの最新バージョン以外を破棄できる。
    • 近年のDBでは第1級のインターフェイスとしてサポートされるようになってきている。
  • イベントソーシング
    • = システムが全ての生のイベントを恒久的に保存でき、その完全なイベントログを必要に応じて再処理できるようにすること。DBのレコードの更新や削除は禁止。
    • この用途に特化したDBの例は、Event Storeなど。

これらは、

  • データを書き込む形式から、データを読み込む形式を分離することができる。(クエリ責務分離)
    • 書き込みはイベントをあるがままに書き込む。
    • 読み取り用に最適化したビューを構築する。用途に合わせてその都度ビューを構築することで、既存のシステムの修正が不要になる。(これはスキーマのマイグレーションより容易である。)
  • メリットは、リカバリーの容易性と過去イベントの分析が可能なこと。
  • 欠点は以下二つ。
    • (1) イベントログのコンシューマが非同期であることにより、イベントログの生成後、導出データシステムの読み取り時に自分の書き込みが反映されていないことがあること。(= read after write が保障されない) 👉対処法は
      • そもそもイベント書き込み時に、ビューの更新を同期的に行う。このためには、イベントログと読み取りのビューを同じストレージシステムに作成するか、分散トランザクションを使用する
      • ビューを使用せず、現在の状態を毎回イベントログから算出する。
    • (2) 複数のイベントログが作成され、両方がビューの同一レコードを更新するとする。この際にビューへの反映の順番が反転し、ビューがイベントログから算出されるものと異なってしまう。
      • あるビューのレコードを更新しうるイベントログをメッセージブローカのただ一つのパーティションにのみ蓄積していく。(シングルスレッドの)ログのコンシューマはこのパーティションを読み取る。
  • イミュータブルにログを保持するが、例えば以下の場合には管理上データを削除する必要が発生する。
    • プライバシーの規定から退会ユーザーの個人情報の削除の必要がある場合
    • データ保護の法律が不正確な情報の削除を求めている場合
    • 誤って流出したセンシティブな情報を封じ込める場合

ストリーム処理の利用例

  • モニタリング、つまり特定の事象が生じた時にアラートを発する
  • 複合イベント処理 (complex event processing)
    • = ストリーム中の特定のパターンのイベントを検索するルールをクエリやGUIでCEPエンジンに指定する。処理エンジンはストリームを消費し、マッチするものが見つかったら、新たなイベント(complex event)を生成する。
    • CEPエンジンの例は、Esper, Apamaなど
  • ストリームでの検索
    • (複合イベント処理との違いが分からん😅)
    • クエリにインデックスを付けてマッチするかもしれないクエリの集合を狭めることができる。
  • ストリーム分析
    • 大量のイベントに対するメトリクスの集計や統計を求める
    • Apache Storm, Spark Streaming, Kafka Streamsなどの分散ストリーム処理フレームワークがサポートしている。ホストされたサービスは、Google Cloud Dataflow, Azure Stream Analytics, Amazon Kinesis Data Analytics。
  • マテリアライズドビューの管理
    • DB、キャッシュ、検索インデックスの構築
    • イベントソーシングにおけるアプリケーションの状態。
    • SamzaやKafka Streamsは、Kafkaのログコンパクション上に構築されているのでサポートしている。

その他

🟢時間に関する考察(event timeとprocessing time)

event timeとprocessing timeを混乱してはいけない。

  • 多くのストリーム処理フレームワークはprocessing timeを利用している。
  • 例えばリクエストレートの計算は、event timeで集計すべき。(ストリームプロセッサをデプロイし直したら、イベントのprocessは再起動後にまとめて行われるため)

「event timeがあるtime window内に含まれるイベントが全て完了した」ことを確認することは難しい。これはwindowの完了時刻後に遅れてやってくるはぐれ(straggler)イベントの存在を考える必要があるから。この対応策は例えば以下。

  • はぐれイベントは無視する。ドロップされたはぐれイベント数をメトリクスとして追跡し、閾値を超えたらアラートを発するようにする。
  • ストリームプロセッサがはぐれイベントを受け取ったら、そのことを関連システムに伝播する訂正(correction)を配信する。
  • ストリームに「今後はt以前のタイムスタンプを持つイベントは発生しない」ことを示す特別なメッセージを流す。

windowの種類

  • tumbling window
    • = 幅固定、開始点が定期的、window同士は重複しない
  • hopping window
    • = 幅固定、開始点が定期的、window同士は重複する
  • sliding window
    • = 幅固定、開始点が連続的、ゆえにwindow同士は重複する
  • session window
    • = 幅固定でない、開始点はまばら、window同士は重複しない、windowに含まれない期間も存在する

🟢ストリームのJOINの実現方法

  • ストリーム - ストリーム結合
    • = window内でのイベント同士の結合
    • [実装方法] ストリームプロセッサ内で「window内にどのイベントが発生したか」という状態を保持しておく。JOINできたらJOIIN結果を表すイベントを生成する。JOINできなかった場合は、そのままのイベントを生成する。
  • ストリーム - テーブル結合
    • = streamのenrich。ストリーム内の各イベント(fact)に対して、それに関連する情報(dimension)をエンリッチする。
    • [実装方法1] ストリームプロセッサは各イベントの処理タイミングで、DBから関連レコードをルックアップする
      • 👎処理速度が低速
      • 👎DBが過負荷に陥る可能性あり
    • [実装方法2] DBのコピーをプロセッサにロードしておく。(メモリ上のハッシュテーブル、大きければローカルディスク内のインデックス)。DBのコピーの更新のためには、元のDBのchangelogを保持するストリームを作成し、これを消費する。
    • ⚠️ slowly changing dimensionに対応するため、各イベントにはevent timeを持たせ、時間によって変化するDBのレコード(税率等)はバージョン違いを保持する。
  • テーブル - テーブル結合
    • = materializd veiwの更新の"タイミング"をイベントストリームで検知する。
    • [実装方法] viewを構成する複数の元データの更新をそれぞれストリームに流す。ストリームプロセッサはいずれかのストリームからイベントを受け取った時点でviewを再計算する。

🟢ストリームプロセッサに耐障害性を持たせる

第9章で述べたMapReduceのバッチ処理はexactly-once(正確にはeffectively-once)である。
入力が無限であるストリームにおいてこれを実現するためには以下の方法がある。

  • マイクロバッチ処理 + チェックポイント処理
    • ストリームを小さなブロック(1s程度)に分割し、それぞれのブロックでバッチ処理を行う
    • ブロック同士の境目のチェックポイントごとに、現在の状態を永続性のあるストレージに書き出す
    • フレームワーク内部ではexactly-onceセマンティクスを提供するが、外部へ副作用が発生する場合はそうでない。
    • 使用例: Spark Streaming
  • アトミックなコミット
    • ストリーム処理のフレームワーク内部で、状態の変更と外部へ向けたメッセージの作成をアトミックに作成する。
    • 使用例: Google Cloud Dataflow, VoltDB, 追加予定: Apache Kafka
  • 冪等性
    • 以下の前提をおいた上で、メタデータを使用することで操作を冪等にできる
      • 処理は決定的である
      • 同じ値を並行に他のノードが更新しない
      • ある処理ノードから他のノードへfail overする際にはフェンシングを使用する
    • 例: Kafkaでメッセージを消費する際には読み取り完了のオフセットを記録し、それを永続化しておく。

障害後に状態を再構築する方法の例は、

  • 個々のメッセージの処理完了のたびに、その状態をリモートのDBに保存し、これをレプリケーションする
  • 個々のメッセージの処理完了のたびに(?)、状態をストリームプロセッサのローカルディスクに保存し、それを定期的にレプリケーションする
  • そもそも状態のレプリケーションが必要がない場合もある。状態を入力ストリームから再構築できる場合など。

第12章 データシステムの未来

TODO: 読む

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?