概要
SEが日々登録している工数データが、そのSEの月々の出勤日数の8割以上であれば、そのSEの工数登録データは正しいとみなす、つまりこのSEが登録しているデータは高品質であると仮定するための ベーステーブルを作成するための手順を説明します。なお、上記8割以上のところを、7割、6割、5割、、、、と判断値を変えることにより最終的な分析データの品質確認も行うことを想定しています。
その高品質データ登録SEを抽出するためのテーブル作成の手順イメージは以下となります。
1.対象となるデータを各々のメタデータからデータを抽出
・工数登録データ : shima
・出勤データ : kintai
・等級データ : grade
2.出勤データを整形
・SEの出勤データの必要項目の選択と等級データからのName情報の連結
・その後、SE毎の月毎の出勤回数を取得 : ikentai
3.工数登録データのデータ登録日の取得
・SE毎の日毎の工数登録回数を取得
・その後、SE毎の月毎の工数登録回数を取得 : imonth
4.高品質データを登録しているSEを判断するためのテーブルの新規作成
・SE単位で月毎の工数登録回数と出勤回数のテーブルを作成 : trustse
ローカル環境
- macOS Monterey 12.0.1
- python 3.8.3
- Azure CLI 2.28.0
前提条件
- Azure環境がすでに用意されていること(テナント/サブスクリプション)。
- ローカル環境に「azure cli」がインストールされていること。
事前準備
Azure 環境の準備
### ローカル環境変数の定義(Databricks関連)
export RG_NAME=rg_ituru_bricks01
export SUBS_NAME=PSG2-01
export DB_WORKSPACE_NAME=dbw_ituru_workspace01
### ローカル環境変数の定義(Gen2関連)
export RG_GEN2=rg-DUPdatalake-01
export STORAGE_ACCOUNT=dupdatalake01
export STORAGE_CONT_BRONZE=bronze
export STORAGE_CONT_SILVER=silver
export STORAGE_CONT_GOLD=gold
## 使用するテナントへのログイン
$ az login --tenant <tenant_id>
## 使用サブスクリプションの定義
$ az account set --subscription $SUBS_NAME
## 使用サブスクリプションの確認(IsDefault=True)
$ az account list --output table
## Azure Databricks 用のリソースグループ作成
$ az group create --name $RG_NAME --location japaneast
既存ストレージアカウント情報の確認
## ストレージアカウントの認証情報の取得
$ az storage account keys list --account-name $STORAGE_ACCOUNT --subscription $SUBS_NAME --resource-group $RG_GEN2 --output table
CreationTime KeyName Permissions Value
-------------------------------- --------- ------------- ---------------------------------------------
2021-12-10T00:42:19.082575+00:00 key1 FULL xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
2021-12-10T00:42:19.082575+00:00 key2 FULL zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz
## ストレージアカウントのコンテナ一覧の取得
$ az storage container list --account-name $STORAGE_ACCOUNT --output table
Name Lease Status Last Modified
-------------- -------------- -------------------------
bronze 2021-12-10T05:42:47+00:00
bronze-parquet 2021-12-20T06:19:47+00:00
rawdata 2021-12-10T05:05:02+00:00
silver 2021-12-13T08:14:08+00:00
silver-parquet 2021-12-21T07:12:27+00:00
## コンテナー内の BLOB の一覧表示
$ az storage blob list --account-name $STORAGE_ACCOUNT --container-name $STORAGE_CONT_BRONZE --output table
Azure Data Bricks の作成
14日間無料のトライアル版で構成します
## Azure Databricks Workspace の作成・・・完了まで数分かかります
az databricks workspace create --resource-group $RG_NAME --name $DB_WORKSPACE_NAME --location japaneast --sku trial
Spark クラスタの作成
-
Azure portal で、作成した Databricks サービスに移動し、「ワークスペースの起動」 を選択します
-
Azure Databricks ポータルにリダイレクトされます。 ポータルで [New Cluster] を選択します
-
以下のパラメータ入力後、「Create Cluster」ボタンを押します。クラスターが作成され、起動されます
項目 値 Cluster Name db_ituru_cluster01 Cluster Mode Single Node Databrickes Runtime Version 9.1 LTS (Scala 2.12, Apache Spark 3.1.2) AutoPilotOprions Terminate after 45 minutes of inactivity Node Type Standard_DS3_v2 (14GB Memory 4Cores)
Notebook の作成
-
Azure Databricks ポータルで [New Notebook] を選択します
-
以下のパラメータ入力後、「Create」ボタンを押します。新規の Notebook 画面が表示されます
項目 値 Name InputTrustSE01 Default Language Python Cluster db_ituru_cluster01
Notebook
Notebook の実装
この記事 の内容を一部ほぼそのまま活用させていただきました。
Blobコンテナーのマウント
# コンテナーのマウント
# 一度マウントすると、Clusterを停止、変更してもマウント状態が維持されます
# マウントされた状態で再度操作を実行するとエラーが発生するため、マウント状態をチェックする
# Blob Storage情報(データ抽出用)
storage = {
"account": "dupdatalake01",
"container": "bronze",
"key": "<storage account key>"
}
# マウント先DBFSディレクトリ(データ抽出用)
mount_point = "/mnt/bronze"
try:
# マウント状態のチェック
mount_dir = mount_point
if mount_dir[-1] == "/":
mount_dir = mount_dir[:-1]
if len(list(filter(lambda x: x.mountPoint == mount_dir, dbutils.fs.mounts()))) > 0:
print("Already mounted.")
mounted = True
else:
mounted = False
# Blob Storageのマウント
if not mounted:
source = "wasbs://{container}@{account}.blob.core.windows.net".format(**storage)
conf_key = "fs.azure.account.key.{account}.blob.core.windows.net".format(**storage)
mounted = dbutils.fs.mount(
source=source,
mount_point = mount_point,
extra_configs = {conf_key: storage["key"]}
)
except Exception as e:
raise e
"mounted: {}".format(mounted)
Blob Storageをマウントした ディレクトリ と ファイル の確認
# ディレクトリの確認
display(dbutils.fs.mounts())
# ファイルの確認(ファイルが存在しないとエラーが発生)
# mount_point = "/mnt/{マウント先ディレクトリ}"
display(dbutils.fs.ls(mount_point))
対象ファイルの読込(データの抽出)
# Blob から 工数登録データファイルの読込 --> PySpark Dataframes 型式
sdf = spark.read\
.option("header", "true").option("inferSchema", "true")\
.json('/mnt/bronze/tech_shima.json')
display(sdf)
# クエリとして扱うときのテーブルを作成
sdf.createOrReplaceTempView("shima")
# Blob から 勤怠データファイルの読込 --> PySpark Dataframes 型式
sdf = spark.read\
.option("header", "true").option("inferSchema", "true")\
.json('/mnt/bronze/tech_kintai.json')
display(sdf)
# クエリとして扱うときのテーブルを作成
sdf.createOrReplaceTempView("kintai")
# Blob から 等級データファイルの読込 --> PySpark Dataframes 型式
sdf = spark.read\
.option("header", "true").option("inferSchema", "true")\
.json('/mnt/bronze/tech_grade.json')
display(sdf)
# クエリとして扱うときのテーブルを作成
sdf.createOrReplaceTempView("grade")
出勤データの整形
# SEの出勤データの必要項目の選択と等級データからのName情報の連結
query_01 = """
SELECT kintai.J_Name, grade.Name, Kintai.K_Year, kintai.K_Month, kintai.K_Days
From kintai
Inner Join grade
On kintai.J_Name = grade.J_Name
"""
qdf = spark.sql(query_01)
display(qdf)
# SE毎の月毎の出勤回数を取得する
# その「月毎」は新たにカラムを追加し毎月1日のdatetime型とする
import pandas as pd
import datetime
# PySpark Dataframes から Pandas への変換
pdf = qdf.toPandas()
# カラム['K_Year', 'K_Month']を連結させて毎月1日のカラムを新たに作成する(datetime型)
pdf['K_Date'] = pd.to_datetime(pdf['K_Year'].astype(str)+'-'+pdf['K_Month'].astype(str)+'-01', format='%Y-%m-%d')
# 不必要なカラムの削除後、Pandas から PySpark Dataframes への変換
df = spark.createDataFrame(pdf.drop(['K_Year', 'K_Month'], axis = 1))
display(df)
# クエリとして扱うときのテーブルを作成
df.createOrReplaceTempView("ikintai")
工数登録データのデータ登録日の取得
# SE毎の日毎の工数登録回数を取得する
query_01 = """
SELECT shima.userid, shima.date, count(shima.date) as input_cnt
From shima
Group by shima.userid, shima.date
Order by shima.userid, shima.date
"""
qdf = spark.sql(query_01)
display(qdf)
# クエリとして扱うときのテーブルを作成
qdf.createOrReplaceTempView("inputse")
# SE毎の月毎の工数登録回数を取得する
query_02 = """
SELECT inputse.userid, date_trunc('MONTH', inputse.date) as dt_month, count(inputse.date) as input_cnt
From inputse
Group by inputse.userid, dt_month
Order by inputse.userid, dt_month
"""
qdf = spark.sql(query_02)
display(qdf)
# クエリとして扱うときのテーブルを作成
qdf.createOrReplaceTempView("imonth")
高品質データを登録しているSEを判断するためのテーブルの新規作成
# SE単位で月毎の工数登録回数と出勤回数のテーブルを作成する
query_01 = """
SELECT imonth.userid, ikintai.J_Name, imonth.dt_month, imonth.input_cnt, ikintai.K_Days
From imonth
Inner Join ikintai
On imonth.userid = ikintai.Name and imonth.dt_month = ikintai.K_Date
Order by imonth.userid, imonth.dt_month
"""
qdf = spark.sql(query_01)
display(qdf)
# クエリとして扱うときのテーブルを作成
qdf.createOrReplaceTempView("trustse")
Azure Data Bricks リソースのクリーンアップ
## リソースをクリーンアップするには、ワークスペースを削除します
az databricks workspace delete --resource-group $RG_NAME --name $DB_WORKSPACE_NAME
## ついでにリソースグループを削除する場合は以下を実行
az group delete --name $RG_NAME
まとめ
データ分析を実施するにあたり、データの品質が大事ということに気が付き、今回の記事を記載。ただ、かなり社内よりの内容なので、忘備録のような扱いになってしまいました、、、、、
参考記事
以下の記事を参考にさせていただきました。感謝申し上げます
Azure Databricks: 3-1. DBFSにBlob Storageをマウント
Spark DataframeのSample Code集