0
0

More than 1 year has passed since last update.

Azure Databricks で 高品質データを抽出してみました

Last updated at Posted at 2021-12-24

概要

SEが日々登録している工数データが、そのSEの月々の出勤日数の8割以上であれば、そのSEの工数登録データは正しいとみなす、つまりこのSEが登録しているデータは高品質であると仮定するための ベーステーブルを作成するための手順を説明します。なお、上記8割以上のところを、7割、6割、5割、、、、と判断値を変えることにより最終的な分析データの品質確認も行うことを想定しています。

その高品質データ登録SEを抽出するためのテーブル作成の手順イメージは以下となります。

1.対象となるデータを各々のメタデータからデータを抽出
  ・工数登録データ   : shima
  ・出勤データ      : kintai
  ・等級データ      : grade

2.出勤データを整形
  ・SEの出勤データの必要項目の選択と等級データからのName情報の連結
  ・その後、SE毎の月毎の出勤回数を取得      : ikentai

3.工数登録データのデータ登録日の取得
  ・SE毎の日毎の工数登録回数を取得
  ・その後、SE毎の月毎の工数登録回数を取得   : imonth

4.高品質データを登録しているSEを判断するためのテーブルの新規作成
  ・SE単位で月毎の工数登録回数と出勤回数のテーブルを作成  : trustse

ローカル環境

  • macOS Monterey 12.0.1
  • python 3.8.3
  • Azure CLI 2.28.0

前提条件

  1. Azure環境がすでに用意されていること(テナント/サブスクリプション)。
  2. ローカル環境に「azure cli」がインストールされていること。

事前準備

Azure 環境の準備

### ローカル環境変数の定義(Databricks関連)
export RG_NAME=rg_ituru_bricks01
export SUBS_NAME=PSG2-01
export DB_WORKSPACE_NAME=dbw_ituru_workspace01

### ローカル環境変数の定義(Gen2関連)
export RG_GEN2=rg-DUPdatalake-01
export STORAGE_ACCOUNT=dupdatalake01
export STORAGE_CONT_BRONZE=bronze
export STORAGE_CONT_SILVER=silver
export STORAGE_CONT_GOLD=gold

## 使用するテナントへのログイン
$ az login --tenant <tenant_id>

## 使用サブスクリプションの定義
$ az account set --subscription $SUBS_NAME

## 使用サブスクリプションの確認(IsDefault=True)
$ az account list --output table

## Azure Databricks 用のリソースグループ作成
$ az group create --name $RG_NAME --location japaneast

既存ストレージアカウント情報の確認

## ストレージアカウントの認証情報の取得
$ az storage account keys list --account-name $STORAGE_ACCOUNT --subscription $SUBS_NAME --resource-group $RG_GEN2 --output table
CreationTime                      KeyName    Permissions    Value
--------------------------------  ---------  -------------  ---------------------------------------------
2021-12-10T00:42:19.082575+00:00  key1       FULL           xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
2021-12-10T00:42:19.082575+00:00  key2       FULL           zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz

## ストレージアカウントのコンテナ一覧の取得
$ az storage container list --account-name $STORAGE_ACCOUNT --output table
Name            Lease Status    Last Modified
--------------  --------------  -------------------------
bronze                          2021-12-10T05:42:47+00:00
bronze-parquet                  2021-12-20T06:19:47+00:00
rawdata                         2021-12-10T05:05:02+00:00
silver                          2021-12-13T08:14:08+00:00
silver-parquet                  2021-12-21T07:12:27+00:00

## コンテナー内の BLOB の一覧表示
$ az storage blob list --account-name $STORAGE_ACCOUNT --container-name $STORAGE_CONT_BRONZE --output table

Azure Data Bricks の作成

14日間無料のトライアル版で構成します

## Azure Databricks Workspace の作成・・・完了まで数分かかります
az databricks workspace create --resource-group $RG_NAME --name $DB_WORKSPACE_NAME --location japaneast --sku trial

Spark クラスタの作成

  1. Azure portal で、作成した Databricks サービスに移動し、「ワークスペースの起動」 を選択します
  2. Azure Databricks ポータルにリダイレクトされます。 ポータルで [New Cluster] を選択します
  3. 以下のパラメータ入力後、「Create Cluster」ボタンを押します。クラスターが作成され、起動されます

    項目
    Cluster Name db_ituru_cluster01
    Cluster Mode Single Node
    Databrickes Runtime Version 9.1 LTS (Scala 2.12, Apache Spark 3.1.2)
    AutoPilotOprions Terminate after 45 minutes of inactivity
    Node Type Standard_DS3_v2 (14GB Memory 4Cores)

Notebook の作成

  1. Azure Databricks ポータルで [New Notebook] を選択します
  2. 以下のパラメータ入力後、「Create」ボタンを押します。新規の Notebook 画面が表示されます

    項目
    Name InputTrustSE01
    Default Language Python
    Cluster db_ituru_cluster01

Notebook

Notebook の実装

この記事 の内容を一部ほぼそのまま活用させていただきました。

Blobコンテナーのマウント

cmd_1
# コンテナーのマウント
# 一度マウントすると、Clusterを停止、変更してもマウント状態が維持されます
# マウントされた状態で再度操作を実行するとエラーが発生するため、マウント状態をチェックする

# Blob Storage情報(データ抽出用)
storage = {
  "account": "dupdatalake01",
  "container": "bronze",
  "key": "<storage account key>"
}

# マウント先DBFSディレクトリ(データ抽出用)
mount_point = "/mnt/bronze"

try:
  # マウント状態のチェック
  mount_dir = mount_point
  if mount_dir[-1] == "/":
    mount_dir = mount_dir[:-1]
  if len(list(filter(lambda x: x.mountPoint == mount_dir, dbutils.fs.mounts()))) > 0:
    print("Already mounted.")
    mounted = True
  else:
    mounted = False

  # Blob Storageのマウント
  if not mounted:
    source = "wasbs://{container}@{account}.blob.core.windows.net".format(**storage)
    conf_key = "fs.azure.account.key.{account}.blob.core.windows.net".format(**storage)

    mounted = dbutils.fs.mount(
      source=source,
      mount_point = mount_point,
      extra_configs = {conf_key: storage["key"]}
    ) 

except Exception as e:
  raise e

"mounted: {}".format(mounted)

Blob Storageをマウントした ディレクトリ と ファイル の確認

cmd_2
# ディレクトリの確認
display(dbutils.fs.mounts())

# ファイルの確認(ファイルが存在しないとエラーが発生)
# mount_point = "/mnt/{マウント先ディレクトリ}"
display(dbutils.fs.ls(mount_point))

対象ファイルの読込(データの抽出)

cmd_3
# Blob から 工数登録データファイルの読込 --> PySpark Dataframes 型式
sdf = spark.read\
    .option("header", "true").option("inferSchema", "true")\
    .json('/mnt/bronze/tech_shima.json')

display(sdf)

# クエリとして扱うときのテーブルを作成
sdf.createOrReplaceTempView("shima")

# Blob から 勤怠データファイルの読込 --> PySpark Dataframes 型式
sdf = spark.read\
    .option("header", "true").option("inferSchema", "true")\
    .json('/mnt/bronze/tech_kintai.json')

display(sdf)

# クエリとして扱うときのテーブルを作成
sdf.createOrReplaceTempView("kintai")

# Blob から 等級データファイルの読込 --> PySpark Dataframes 型式
sdf = spark.read\
    .option("header", "true").option("inferSchema", "true")\
    .json('/mnt/bronze/tech_grade.json')

display(sdf)

# クエリとして扱うときのテーブルを作成
sdf.createOrReplaceTempView("grade")

出勤データの整形

cmd_4
# SEの出勤データの必要項目の選択と等級データからのName情報の連結
query_01 = """
SELECT kintai.J_Name, grade.Name, Kintai.K_Year, kintai.K_Month, kintai.K_Days
From kintai
Inner Join grade
On kintai.J_Name = grade.J_Name
"""

qdf = spark.sql(query_01)
display(qdf)

# SE毎の月毎の出勤回数を取得する
# その「月毎」は新たにカラムを追加し毎月1日のdatetime型とする
import pandas as pd
import datetime

# PySpark Dataframes から Pandas への変換
pdf = qdf.toPandas()

# カラム['K_Year', 'K_Month']を連結させて毎月1日のカラムを新たに作成する(datetime型)
pdf['K_Date'] = pd.to_datetime(pdf['K_Year'].astype(str)+'-'+pdf['K_Month'].astype(str)+'-01', format='%Y-%m-%d')

# 不必要なカラムの削除後、Pandas から PySpark Dataframes への変換
df = spark.createDataFrame(pdf.drop(['K_Year', 'K_Month'], axis = 1))
display(df)

# クエリとして扱うときのテーブルを作成
df.createOrReplaceTempView("ikintai")

image.png

工数登録データのデータ登録日の取得

cmd_5
# SE毎の日毎の工数登録回数を取得する
query_01 = """
SELECT shima.userid, shima.date, count(shima.date) as input_cnt
From shima
Group by shima.userid, shima.date
Order by shima.userid, shima.date
"""

qdf = spark.sql(query_01)
display(qdf)

# クエリとして扱うときのテーブルを作成
qdf.createOrReplaceTempView("inputse")

# SE毎の月毎の工数登録回数を取得する
query_02 = """
SELECT inputse.userid, date_trunc('MONTH', inputse.date) as dt_month, count(inputse.date) as input_cnt
From inputse
Group by inputse.userid, dt_month
Order by inputse.userid, dt_month
"""

qdf = spark.sql(query_02)
display(qdf)

# クエリとして扱うときのテーブルを作成
qdf.createOrReplaceTempView("imonth")

image.png

高品質データを登録しているSEを判断するためのテーブルの新規作成

cmd_6
# SE単位で月毎の工数登録回数と出勤回数のテーブルを作成する
query_01 = """
SELECT imonth.userid, ikintai.J_Name, imonth.dt_month, imonth.input_cnt, ikintai.K_Days
From imonth
Inner Join ikintai
On imonth.userid = ikintai.Name and imonth.dt_month = ikintai.K_Date
Order by imonth.userid, imonth.dt_month
"""

qdf = spark.sql(query_01)
display(qdf)

# クエリとして扱うときのテーブルを作成
qdf.createOrReplaceTempView("trustse")

image.png


Azure Data Bricks リソースのクリーンアップ

## リソースをクリーンアップするには、ワークスペースを削除します
az databricks workspace delete --resource-group $RG_NAME --name $DB_WORKSPACE_NAME

## ついでにリソースグループを削除する場合は以下を実行
az group delete --name $RG_NAME

まとめ

データ分析を実施するにあたり、データの品質が大事ということに気が付き、今回の記事を記載。ただ、かなり社内よりの内容なので、忘備録のような扱いになってしまいました、、、、、

参考記事

以下の記事を参考にさせていただきました。感謝申し上げます
Azure Databricks: 3-1. DBFSにBlob Storageをマウント
Spark DataframeのSample Code集

0
0
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0