1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Timeliness and Reliability in the Transmission of Regulatory Reports - The Databricks Blogの翻訳です。

リスクおよび規制に対するコンプライアンスの管理は、どんどん複雑かつコストのかかる取り組みとなっています。2008年のグローバル金融危機以来、規制の変更は500%に増加しており、規制に対応するためのコストを引き上げています。規制に準拠しないこと、SLA違反による罰金(2019年のAMLにおける罰金は100億ドルにもなっており、銀行は常に高い罰金を科せられています)のため、データが不完全な状態でもレポートの処理は行われなくてはなりません。一方、貧弱なデータ品質の追跡レコードに対しても、「管理不十分」であることから「罰金」が科せられます。結果として、多くの金融サービス機関(FSI)が、データの信頼性と即時性のバランスをとることで、貧弱なデータ品質と厳格なSLAと戦い続けることになります。

この規制レポートに関するソリューションアクセラレータにおいては、規制のSLAに対応するために、Delta Live Tables(DLT)がどのようにリアルタイムでの規制データの獲得、処理を保証するのかをデモします。Delta SharingとDelta Live Tablesを組み合わせることで、アナリストは送信される規制データの品質に対して、リアルタイムで自信を持つことができます。この記事では、金融サービス業界のデータモデルと、高いガバナンス標準を達成するクラウドコンピューティングの柔軟性を少ない開発オーバヘッドで組み合わせる際のレイクハウスアーキテクチャの利点をデモンストレーションします。FIREデータモデルが何であるのか、堅牢なデータパイプラインを構築するためにどのようにDLTがインテグレーションされるのかを説明します。

FIREデータモデル

Financial Regulatory data standard(FIRE)は、金融業界における規制システム間でデータを送信する際の一般的な仕様を定義しています。規制データは、ポリシー、モニタリング、監視目的で使用される規制送信、要件、計算に関するデータとなります。FIREデータ標準は、Horizon 2020ファンディングプログラムを通じて、European CommissionOpen Data Institute、ヨーロッパ向けOpen Data Incubatorによってサポートされています。このソリューションの一部として、我々はFIREデータモデルをApache Spark™のパイプラインに取り込むことができるPySparkモジュールを提供しました。

Delta Live Tables

Databricksは最近、データパイプラインのオーケストレーションに関する新製品であるDelta Live Tablesを発表しました。Delta Live Tablesを用いることで、信頼性のあるデータパイプラインを企業規模で構築、管理が容易になります。複数のエクスペクテーションを管理できる能力によって、リアルタイムで不正なレコードを削除あるいはモニタリングすることができるので、Delta Live TablesとFIREデータモデルを統合することのメリットは明白です。以下のアーキテクチャに示す様に、Delta Live Tablesはクラウドストレージに到着する規制データの塊を取り込み(ingest)、コンテンツに**スキーマを適用(schematize)し、一貫性を保つためにFIREデータ仕様に従ってレコードを検証(validate)**します。安全、スケーラブル、透明性のある方法で規制システムと情報を交換するためにDelta Sharingを用いるデモを見ていきましょう。

スキーマの強制

いくつかのデータフォーマット(例:JSONファイル)は構造化されているように「見える」かもしれませんが、スキーマ強制は単に優れたエンジニアリングのプラクティスではありません。エンタープライズの環境、特に規制コンプライアンスの世界においては、スキーマ強制は期待されるフィールドの欠如、削除すべきフィールド、データタイプの完全な評価(例:日付は文字列ではななくdateオブジェクトとして取り扱われるべき)を保証します。また、これは結果的なデータドリフトに対するシステムの耐性試験にもなります。FIRE pysparkモジュールを用いることで、生のレコードのストリームに適用される指定のFIREエンティティ(この例では担保[collateral]エンティティ)を処理するのに必要なSparkスキーマをプログラムから取得することができます。

Python
from fire.spark import FireModel
fire_model = FireModel().load("collateral")
fire_schema = fire_model.schema

以下の例では、到着するCSVファイルにスキーマを強制しています。@dltアノテーションでこのプロセスをデコレーションすることで、Delta Live Tablesに対するエントリーポイントを定義し、生のCSVファイルをマウントされたディレクトリから読み込み、スキーマが強制されたレコードをブロンズレイヤーに書き込みます。

Python
@dlt.create_table()
def collateral_bronze():
  return (
    spark
      .readStream
      .option("maxFilesPerTrigger", "1")
      .option("badRecordsPath", "/path/to/invalid/collateral")
      .format("csv")
      .schema(fire_schema)
      .load("/path/to/raw/collateral")

エクスペクテーションの評価

スキーマの適用とは別に、制約を適用することができます。FIREエンティティのスキーマ定義(collateralのスキーマ定義例をご覧ください)に基づいて、フィールドが必要か不要かを検知することができます。列挙オブジェクト(例:国コード)を指定することで、値の一貫性を保証することができます。スキーマによる技術的制約に加え、FIREモデルでは最小値、最大値、金額、最大個数などビジネス上の制約を定義しています。プログラムがこれら全ての技術的、ビジネス上の制約をFIREデータモデルから取得し、一連のSpark SQLエクスペクテーションとして解釈します。

Python
from fire.spark import FireModel
fire_model = FireModel().load("collateral")
fire_constraints = fire_model.constraints

Delta Live Tablesを用いることで、ユーザーは複数のエクスペクテーションを一度に評価すすることで不正なレコードを削除することができ、シンプルにデータ品質をモニタリングするか、パイプライン全体を停止することができます。我々のシナリオにおいては、この記事で提供されているノート部区にある様に、エクスペクテーションに違反したレコードは削除したいと考え、削除レコードは検疫テーブルに格納しました。

Python
@dlt.create_table()
@dlt.expect_all_or_drop(fire_constraints)
def collateral_silver():
  return dlt.read_stream("collateral_bronze")

数行のコードで、我々のシルバーテーブルが文法的(適切なスキーマ)にも、意味的(適切なエクスペクテーション)にも正しいことを保証することができます。以下に示す様に、コンプライアンスオフィサーはリアルタイムで処理されているレコード件数に対する完全な可視性を得ることができます。この例では、collateralエンティティは92.2%処理されているを確認しました(残りの7.8%は検疫行きとなっています)。

オペレーションデータストア

Delta Live Tablesは実際のデータをdeltaファイルに格納することに加え、オペレーションメトリクスをdeltaフォーマットでsystem/events配下に格納します。ここでは、レイクハウスの標準的なパターンに従い、AutoLoaderを用いて新規オペレーションメトリクスを「サブスクライブ」し、バッチあるいはリアルタイムでシステムメトリクスを処理します。あらゆるデータの更新を追跡するDelta Lakeのトランザクションログを活用することで、企業は自身のチェックポイントプロセスを構築、管理する必要なしに新規メトリクスにアクセスすることができます。

Python
input_stream = spark \
    .readStream \
    .format("delta") \
    .load("/path/to/pipeline/system/events")
      
output_stream = extract_metrics(input_stream)
    
output_stream \
    .writeStream \
    .format("delta") \
    .option("checkpointLocation", "/path/to/checkpoint") \
    .table(metrics_table)

全てのメトリクスがオペレーションストアで集中管理されることで、アナリストはシンプルなダッシュボードや、リアルタイムでデータ品質問題を検知するためにより複雑なアラート機構を構築するためにDatabricks SQLを活用することができます。

Delta Live Tablesによって提供されるデータ品質の透明性と、Delta Lakeフォーマットの不変性の特性を組み合わせることで、金融機関は、規制コンプライアンスに求められるデータボリューム、データ品質の両方にマッチする特定バージョンに「タイムトラベル」することが可能となります。この例では、検疫に格納された7.2%の不正レコードのリプレイはシルバーテーブルにアタッチされた別のDeltaバージョンとなり、このバージョンは規制機関と共有することが可能できます。

SQL
DESCRIBE HISTORY fire.collateral_silver

規制データの送信

データの品質とボリュームに対して完全な自信を持つことによって、金融機関は企業データ交換に対するオープンプロトコルであるDelta Sharingを用いることで、規制システム間で安全に情報を交換することができる様になります。ユーザーが同じプラットフォームを使用することや、データを消費するために複雑なETLパイプライン(例えば、SFTPサーバー経由でのデータファイルへのアクセス)に依存することなしに、Delta Lakeの持つオープンソースの特性によって、データの消費者はPython、Spark、あるいはMI/BIダッシュボード(Tableau、PowerBIなど)から、スキーマが適用されたデータに直接アクセスすることができます。

シルバーテーブルをそのままの状態で共有することもできますが、データ品質が事前に定義された基準を達成しているときのみに規制データを共有したいと考えるかもしれません。この例では、シルバーテーブルを別のバージョンとして、内部ネットワークとは隔離された別の場所(非武装地帯:DMZ)にコピーしエンドユーザーによってアクセスできる様にします。

Python
from delta.tables import *

deltaTable = DeltaTable.forName(spark, "fire.collateral_silver")
deltaTable.cloneAtVersion(
  approved_version, 
  dmz_path, 
  isShallow=False, 
  replace=True
)

spark.sql(
  "CREATE TABLE fire.colleral_gold USING DELTA LOCATION '{}'"
    .format(dmz_path)
)

Delta Sharingオープンソースソリューションはアクセス権を管理するためにsharingサーバーを利用しますが、Databricksではアクセスコントロールポリシーを強制し、集中管理するためにUnity Catalogを活用することで、完全な監査ログ機能を提供し、SQLインタフェースを通じてアクセス管理をシンプルにします。以下の例では、我々の規制テーブルを含むSHAREと、データを共有するRECIPIENTを作成しています。

Python
-- DEFINE OUR SHARING STRATEGY
CREATE SHARE regulatory_reports;

ALTER SHARE regulatory_reports ADD TABLE fire.collateral_gold;
ALTER SHARE regulatory_reports ADD TABLE fire.loan_gold;
ALTER SHARE regulatory_reports ADD TABLE fire.security_gold;
ALTER SHARE regulatory_reports ADD TABLE fire.derivative_gold;

-- CREATE RECIPIENTS AND GRANT SELECT ACCESS
CREATE RECIPIENT regulatory_body;

GRANT SELECT ON SHARE regulatory_reports TO RECIPIENT regulatory_body;

アクセスが許可された規制機関、ユーザーは、このプロセスを通じて交換されるパーソナルアクセストークンを用いて背後のデータにアクセスすることができます。Delta Sharingの詳細に関しては、製品ページを参照いただくか、Databricks担当にお問い合わせください。

ご自身のコンプライアンスに対する保証試験

一連のノートブックおよびDelta Live Tablesのジョブを通じて、規制データの取り込み、処理、検証、送信におけるレイクハウスアーキテクチャのメリットをデモンストレーションしました。特に、一般的なデータモデル(FIRE)と柔軟性のあるオーケストレーションエンジン(Delta Live Tables)を組み合わせることで容易に実現される規制パイプラインの一貫性、完全性、即時性を保証するという企業のニーズに取り組みました。Delta Sharingの機能を用いることで、報告の要件への適合、運用コストの削減、新規標準への適用を行いつつも、どのようにFSIに様々な規制システム間でのデータ交換に対する完全な透明性、信頼性をもたらすのかを説明しました。

添付のノートブックを用いてFIREデータパイプラインを確認していただき、金融サービス向け最新のソリューションに関してはSolution Accelerators Hubを訪れてください。

Databricks 無料トライアル

Databricks 無料トライアル

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?