DMMデータ周りの部に所属しているyuuaです
データはときにイレギュラーなデータが紛れ込んでいたり、何かしらの改修をした際に一部データの構造が崩れてしまったり
といった事態が たまによくある
と思います。
そういいった事態をできる限り防ぎ、分析者が使用するデータの品質を担保するためにどういったことをやっているのかまとめたいと思います。
データの特徴を把握する
データの品質監視をするためにまず、データの特徴や構造を把握するため、EDA分析を行いました
EDA分析の詳細には触れませんが...
- EDA分析(探索的データ分析)とはデータの特徴を探求し、構造をようやくすることを目的としたもの
EDA分析は deequ
による profile
やpandas
を使った pandas-profiling
を使用して行っております。
どちらもDataframeに対しprofileを実行することができます。
実際にprofileを実行してみる
実際の数値の具体的な詳細の説明などは各種ドキュメントを確認してください。
deequ ColumnProfilerRunner
手順としてはREADMEに記載の通りです
result = ColumnProfilerRunner(spark) \
.onData(df) \
.run()
for col, profile in result.profiles.items():
print(profile)
dataframeを渡し実行します。
result
を出力する際は result.profiles.foreach
で print
すれば出力できます
Column 'id':
completeness: 1.0
approximate number of distinct values: 5
datatype: String
Column 'name':
completeness: 0.75
approximate number of distinct values: 5
datatype: Fractional
pandas-profiling
こちらも同様ですが、pandasnのdatarrameをprofilingする形になります
profile = ProfileReport(df, title="Pandas Profiling Report")
このようなレポートをHTMLで出力することができます。
ここでは表示していませんが、通常は数値などの具体的な情報やカーディナリティ、欠損率、Uniqなどといったデータが出力されています。
deequもpandas-profilingも jupyter
や zeppelin
などの notebook
を使用することでnotebook上で展開できるので便利です。
deequやpandas-profilingの結果を加味してデータの構造および特徴を調べそこから監視の対象となりえる特徴を抽出し、閾値などに使用し監視を行っています。
データの特徴から監視項目及び閾値を決める
EDA分析のレポートなどからデータの特徴を捉えて決めていきます。
下記は例ですが...
- 欠損が基本的にはありえないデータ列に対しては欠損率 <= 0%
- カーディナリティが
High
もしくはLow
のデータ列に対してはHigh or Low
の閾値をデータのUniq数から算出する - 特定の列のレコードは特定の列のレコードと同値の欠損率である (IDとnameの欠損率は同値)
- 特定の列のレコードは特定の列のレコードに対して欠損率が高い (addressはIDよりも欠損率が高い)
- 特定の列のレコードは特定の列のレコードに対してカーディナリティが低い
- 特定の列のレコードは必ず桁数などといった特徴データが統一されている (ip 等の場合は global ip や マイナス値 プラス値のみ など )
品質監視設計
profile結果から閾値などをきめて実際に監視周りの設計を決め、実装しています。
監視の構成自体は下記のようにいくつか考えられます
- バッチなどで
deepu
にて品質状態を確認し閾値に対して異常があった際はslack
などへ通知する - Great Expectationsを使用してデータに対してテストを行い可視化なども合わせて行う。
- メトリクスとしてデータの状態を飛ばし、アラートを設定する。
etc...
今回はメトリクスとしてデータの状態を datadog
へ送り datadog
上で監視する構成を作成しました。
監視周りはすべて datadog
で管理していたという点と時間をあまりかけずに監視を取りいれられ、ダッシュボードなどで可視化も可能
という点で現時点ではメリットが高いと判断しました。
構成
実際の構成はdatadogでの品質の時系列データの可視化のみだけではなく、データ利用者向けに社内ドキュメント上に品質情報の可視化まで行っています。
-
Glue Trigger
でGlue Job
を呼び出す -
Glue Job
からSparkで抽出したデータの集計結果をdatadog api
を使用してカスタムメトリクスとして送信 -
datadog
上のカスタムメトリクスのデータをcircle ci
で jsonとして取得 - 取得したメトリクスjsonを
GHE page hosting
- hostingしたjsonデータを社内のドキュメントツールに加工して品質状態を表示
このような形で品質のか監視からデータ利用者向けに品質の結果を可視化まで行っています。
Glue Jobでのデータ送信
Glue Jobでは Spark
でデータの集計して datadog
へ送っています。
sampleですが下記のように Spark sql
で対象となる日付のデータを取得し
カラム毎にカーディナリティーや欠損値などをSparkの filter
isNull
などで集計し
その結果を送っています。
def datadogInit():
options = {
'api_key': args['DD_API_KEY']
}
initialize(**options)
def sendMetrics(name, value, date=time.time()):
response = api.Metric.send(
metric=name,
points=[(date, value)],
)
# カーディナリティ
def sendCardinality(df, columns, baseMetricsName):
for column in columns:
value = df.select(column).distinct().count()
name = 'メトリックス名'
sendMetrics(name, value)
# 欠損
def sendMissingCount(df, columns, baseMetricsName):
for column in columns:
value = df.filter(col(column).isNull()).count()
name = 'メトリックス名'
sendMetrics(name, value)
....
...
..
.
datadog
のアラートが設定しやすい形式にデータを加工して送っています。
まとめ
Glue Job
で送られてきたカスタムメトリクスのデータをEDAの結果を踏まえた閾値で datadog
アラートを設定し、データに対する品質の監視をおこなっています。
大量のカラム列などのすべての列の品質を担保する場合は今回の構成では datadog
のカスタムメトリクスの数やアラート設定が膨大になり管理コストはあがる可能性はありますが、そこまで大量でないカラム列やスモールスタートなどに対しては有効に品質の担保が可能です。
DMMではビッグデータ基盤関連の運用を行っており、
様々リソースを活用したプロダクト開発も行っています。
中途採用などもおこなっておりますので、興味のある方は、一度弊社HPなどから
カジュアル面談など、是非ご応募ください。