LoginSignup
3
1

More than 1 year has passed since last update.

データの監視をして品質の低下を防ぎたい

Last updated at Posted at 2022-03-11

DMMデータ周りの部に所属しているyuuaです
データはときにイレギュラーなデータが紛れ込んでいたり、何かしらの改修をした際に一部データの構造が崩れてしまったり
といった事態が たまによくある と思います。
そういいった事態をできる限り防ぎ、分析者が使用するデータの品質を担保するためにどういったことをやっているのかまとめたいと思います。

データの特徴を把握する

データの品質監視をするためにまず、データの特徴や構造を把握するため、EDA分析を行いました

EDA分析の詳細には触れませんが...

  • EDA分析(探索的データ分析)とはデータの特徴を探求し、構造をようやくすることを目的としたもの

EDA分析は deequ による profilepandas を使った pandas-profiling を使用して行っております。

どちらもDataframeに対しprofileを実行することができます。

実際にprofileを実行してみる

実際の数値の具体的な詳細の説明などは各種ドキュメントを確認してください。

deequ ColumnProfilerRunner

手順としてはREADMEに記載の通りです

deequ.profile.py
result = ColumnProfilerRunner(spark) \
  .onData(df) \
  .run()

for col, profile in result.profiles.items():
    print(profile)

dataframeを渡し実行します。
result を出力する際は result.profiles.foreachprint すれば出力できます

Column 'id':
 	completeness: 1.0
	approximate number of distinct values: 5
	datatype: String

Column 'name':
 	completeness: 0.75
	approximate number of distinct values: 5
	datatype: Fractional

pandas-profiling

こちらも同様ですが、pandasnのdatarrameをprofilingする形になります

pandas.py
profile = ProfileReport(df, title="Pandas Profiling Report")

スクリーンショット 2022-03-11 14.17.36.png

このようなレポートをHTMLで出力することができます。
ここでは表示していませんが、通常は数値などの具体的な情報やカーディナリティ、欠損率、Uniqなどといったデータが出力されています。

deequもpandas-profilingも jupyterzeppelin などの notebook を使用することでnotebook上で展開できるので便利です。


deequやpandas-profilingの結果を加味してデータの構造および特徴を調べそこから監視の対象となりえる特徴を抽出し、閾値などに使用し監視を行っています。

データの特徴から監視項目及び閾値を決める

EDA分析のレポートなどからデータの特徴を捉えて決めていきます。

下記は例ですが...

  • 欠損が基本的にはありえないデータ列に対しては欠損率 <= 0%
  • カーディナリティが High もしくは Low のデータ列に対しては High or Low の閾値をデータのUniq数から算出する
  • 特定の列のレコードは特定の列のレコードと同値の欠損率である (IDとnameの欠損率は同値)
  • 特定の列のレコードは特定の列のレコードに対して欠損率が高い (addressはIDよりも欠損率が高い)
  • 特定の列のレコードは特定の列のレコードに対してカーディナリティが低い
  • 特定の列のレコードは必ず桁数などといった特徴データが統一されている (ip 等の場合は global ip や マイナス値 プラス値のみ など )

品質監視設計

profile結果から閾値などをきめて実際に監視周りの設計を決め、実装しています。
監視の構成自体は下記のようにいくつか考えられます

  • バッチなどで deepu にて品質状態を確認し閾値に対して異常があった際は slack などへ通知する
  • Great Expectationsを使用してデータに対してテストを行い可視化なども合わせて行う。

  • メトリクスとしてデータの状態を飛ばし、アラートを設定する。

etc...

今回はメトリクスとしてデータの状態を datadog へ送り datadog 上で監視する構成を作成しました。
監視周りはすべて datadog で管理していたという点と時間をあまりかけずに監視を取りいれられ、ダッシュボードなどで可視化も可能
という点で現時点ではメリットが高いと判断しました。

構成

実際の構成はdatadogでの品質の時系列データの可視化のみだけではなく、データ利用者向けに社内ドキュメント上に品質情報の可視化まで行っています。

monitoring_batch_system.png

  • Glue TriggerGlue Job を呼び出す
  • Glue Job からSparkで抽出したデータの集計結果を datadog api を使用してカスタムメトリクスとして送信
  • datadog 上のカスタムメトリクスのデータを circle ci で jsonとして取得
  • 取得したメトリクスjsonを GHE page hosting
  • hostingしたjsonデータを社内のドキュメントツールに加工して品質状態を表示

このような形で品質のか監視からデータ利用者向けに品質の結果を可視化まで行っています。

Glue Jobでのデータ送信

Glue Jobでは Spark でデータの集計して datadog へ送っています。

sampleですが下記のように Spark sql で対象となる日付のデータを取得し
カラム毎にカーディナリティーや欠損値などをSparkの filter isNull などで集計し
その結果を送っています。

send.py
def datadogInit():
    options = {
        'api_key': args['DD_API_KEY']
    }
    initialize(**options)

def sendMetrics(name, value, date=time.time()):
    response = api.Metric.send(
        metric=name,
        points=[(date, value)],
    )

# カーディナリティ
def sendCardinality(df, columns, baseMetricsName):
    for column in columns:
        value = df.select(column).distinct().count()
        name = 'メトリックス名'
        sendMetrics(name, value)

# 欠損
def sendMissingCount(df, columns, baseMetricsName):
    for column in columns:
        value = df.filter(col(column).isNull()).count()
        name = 'メトリックス名'
        sendMetrics(name, value)
....
...
..
.

datadog のアラートが設定しやすい形式にデータを加工して送っています。

まとめ

Glue Job で送られてきたカスタムメトリクスのデータをEDAの結果を踏まえた閾値で datadog アラートを設定し、データに対する品質の監視をおこなっています。
大量のカラム列などのすべての列の品質を担保する場合は今回の構成では datadog のカスタムメトリクスの数やアラート設定が膨大になり管理コストはあがる可能性はありますが、そこまで大量でないカラム列やスモールスタートなどに対しては有効に品質の担保が可能です。

DMMではビッグデータ基盤関連の運用を行っており、
様々リソースを活用したプロダクト開発も行っています。
中途採用などもおこなっておりますので、興味のある方は、一度弊社HPなどから
カジュアル面談など、是非ご応募ください。

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1