こんにちは。株式会社 エーティーエルシステムズ 中村です。
Education Data Lake Export を用いることで、Microsoft 365 Education テナントに関する「名簿データ」と「アクティビティデータ」をデータレイク経由で取得することができます。
データは CSV 形式で出力されますが、CSV データにはヘッダ行がなく、スキーマとして出力される Common Data Model (CDM) に沿ってデータを取得する必要があります。
この記事では、Education Data Lake Export で出力されたデータを Synapse Analytics において CDM をもとに読み込む方法について解説します。
環境
本記事では、以下の環境で動作確認をしています。
- Azure Synapse Analytics
- Apache Spark 3.3
- Python 3.10
- Common Data Model (jsonSchemaSemanticVersion) 1.1.0
Common Data Model (CDM) とは
Common Data Model は、Microsoft などが提供するデータモデルです。アプリケーション間で相互に利用できるよう標準化されたデータモデルとなっています。
Common Data Model については、@yomatsum さんによる Common Data Model を紐解く が分かりやすいです。
PySpark でスキーマを付けて読み込むメソッド
詳細な情報は後述しますが、まずは結論から。次のようなメソッドを用意します。
from pyspark.sql.types import *
from pyspark.sql import functions as F
def load_csv_with_cdm(source_csv_path, cdm_schema_path, entity_name):
"""
CDMスキーマをもとに、CSVを読み込みヘッダー付きデータフレームを返す
source_csv_path: 元データとするCSVファイルのパス
cdm_schema_path: CDMスキーマファイルのパス
entity_name: エンティティ名
"""
# CDMのスキーマ
cdm_schema = StructType([
StructField("jsonSchemaSemanticVersion", StringType(), True),
StructField("imports", ArrayType(StructType([
StructField('corpusPath',StringType(), True)
])), True),
StructField("definitions", ArrayType(StructType([
StructField("entityName", StringType(), True),
StructField("hasAttributes", ArrayType(StructType([
StructField("name", StringType(), True),
StructField("dataType", StructType([
StructField("dataTypeReference", StringType(), True)
]), True),
StructField("dataFormat", StringType(), True),
StructField("description", StringType(), True)
])), True)
])), True)
])
# スキーマを付けて、CDM JSONを読み込む
df_schema = spark.read.load(cdm_schema_path, format="json", multiline=True, schema=cdm_schema)
# エンティティ名に合致するスキーマのみとする
df_schema = df_schema.withColumn("definition", F.explode("definitions"))
df_schema = df_schema.withColumn("entityName", F.col("definition.entityName"))
df_schema = df_schema.filter(F.col("entityName") == entity_name)
df_schema = df_schema.limit(1)
# 属性の名前とデータ型を attributeName と attributeType として取り出す
# データ型は、dataType.dataTypeReference か dataFormat のいずれかで記載されている
df_schema = df_schema.withColumn("attribute", F.explode("definition.hasAttributes"))
df_schema = df_schema.withColumn("attributeName", F.col("attribute.name"))
df_schema = df_schema.withColumn("attributeRawType",
F.when(F.col("attribute.dataType.dataTypeReference").isNotNull(), F.col("attribute.dataType.dataTypeReference"))
.when(F.col("attribute.dataFormat").isNotNull(), F.col("attribute.dataFormat"))
)
# [値名, データ型] の配列にする
df_schema = df_schema.withColumn("attributeType",
F.when(F.lower("attributeRawType") == "datetime", F.lit("timestamp"))
.when(F.lower("attributeRawType") == "int", F.lit("integer"))
.when(F.lower("attributeRawType") == "int32", F.lit("integer"))
.when(F.lower("attributeRawType") == "guid", F.lit("string"))
.when(F.lower("attributeRawType") == "time", F.lit("string"))
.otherwise(F.col("attributeRawType"))
)
df_schema = df_schema.select("attributeName", "attributeType")
origin_schema = [[row.attributeName, row.attributeType] for row in df_schema.collect()]
# PySparkで利用できるスキーマに変換する
fields = []
for col_schema in origin_schema:
col_name = col_schema[0]
col_dtype = col_schema[1]
fields.append(StructField(col_name, globals()[col_dtype.lower().capitalize() + "Type"](), True))
spark_schema = StructType(fields)
# CDMをもとに作成したスキーマを用いて、CSVを読み込んでヘッダー付きデータフレームを作成
df = spark.read.load(source_csv_path, format="csv", header=False, schema=spark_schema)
return df
このメソッドは以下のように利用します。
date = "yyyy-MM-dd" # 対象の日付に変更
roster_datetime = "yyyy-MM-ddThh-mm-ss" # 対象の日時に変更
activity_schema_version = "vX.Y.Z" # アクティビティスキーマのファイルバージョンに変更
activedirectory_schema_version = "vX.Y.Z" # ADスキーマのファイルバージョンに変更
roster_schema_version = "vX.Y.Z" # Rosterスキーマのファイルバージョンに変更
activity_schema_path = "abfss://*****@***********.dfs.core.windows.net/M365/schema/activity." + activity_schema_version + ".cdm.json"
activedirectory_schema_path = "abfss://*****@***********.dfs.core.windows.net/M365/schema/activedirectory." + activedirectory_schema_version + ".cdm.json"
roster_schema_path = "abfss://*****@***********.dfs.core.windows.net/M365/schema/roster." + roster_schema_version + ".cdm.json"
# Activityの場合
activity_entity_name = "TechActivity"
activity_abfss_path = "abfss://*****@***********.dfs.core.windows.net/M365/activity/" + date + "/*.csv"
df_activity = load_csv_with_cdm(activity_abfss_path, activity_schema_path, activity_entity_name)
display(df_activity)
# Roster AadGroupの場合
aadgroup_entity_name = "AadGroup"
aadgroup_abfss_path = "abfss://*****@***********.dfs.core.windows.net/M365/roster/" + roster_datetime + "/" + aadgroup_entity_name + "/*.csv"
df_aadgroup = load_csv_with_cdm(aadgroup_abfss_path, aadgroup_schema_path, aadgroup_entity_name)
display(df_aadgroup)
# Roster Personの場合
person_entity_name = "Person"
person_abfss_path = "abfss://*****@***********.dfs.core.windows.net/M365/roster/" + roster_datetime + "/" + person_entity_name + "/*.csv"
df_person = load_csv_with_cdm(person_abfss_path, person_schema_path, person_entity_name)
display(df_person)
これを実行すると、Education Data Lake Export のデータをスキーマ付きで読み込むことができます。
Education Data Lake Export におけるデータ構造
Education Data Lake Export でデータレイクに出力させると、以下のディレクトリ構造で格納されます。
M365
| current.manifest.cdm.json
|
+--- activity
| | activity.manifest.cdm.json
| |
| \--- {yyyy-MM-dd}
| ApplicationUsage.Part001.csv
| ApplicationUsage.Part002.csv
| ApplicationUsage.Part003.csv
+--- roster
| | current.roster.manifest.cdm.json
| |
| \--- {yyyy-MM-dd'T'hh-mm-ss}
| | snapshot.roster.manifest.cdm.json
| |
| +--- AadGroup
| | part-00000-*.csv
| |
| +--- AadGroupMembership
| | part-00000-*.csv
| |
| \--- AadUser
| part-00000-*.csv
\--- schema
activedirectory.{vX.Y.Z}.cdm.json
activity.{vX.Y.Z}.cdm.json
roster.{vX.Y.Z}.cdm.json
このうち、「名簿データ」が roster ディレクトリに格納されるデータ、「アクティビティデータ」が activity ディレクトリに格納されるデータとなります。
それぞれ、「名簿データ」は snapshot.roster.manifest.cdm.json
、「アクティビティデータ」は activity.manifest.cdm.json
にデータ構造が記載されており、それらをもとにスキーマを選択します。
実際にこれらを読むと、以下の関連性を確認できます。
エンティティディレクトリ | エンティティ名 | スキーマファイル名 |
---|---|---|
activity | TechActivity | activity.{vX.Y.Z}.cdm.json |
roster/AadGroup | AadGroup | activedirectory.{vX.Y.Z}.cdm.json |
roster/AadGroupMembership | AadGroupMembership | activedirectory.{vX.Y.Z}.cdm.json |
roster/AadUser | AadUser | activedirectory.{vX.Y.Z}.cdm.json |
roster/AadUserPersonMapping | AadUserPersonMapping | roster.{vX.Y.Z}.cdm.json |
roster/Course | Course | roster.{vX.Y.Z}.cdm.json |
roster/CourseGradeLevel | CourseGradeLevel | roster.{vX.Y.Z}.cdm.json |
roster/CourseSubject | CourseSubject | roster.{vX.Y.Z}.cdm.json |
roster/Enrollment | Enrollment | roster.{vX.Y.Z}.cdm.json |
roster/Organization | Organization | roster.{vX.Y.Z}.cdm.json |
roster/Person | Person | roster.{vX.Y.Z}.cdm.json |
roster/PersonDemographic | PersonDemographic | roster.{vX.Y.Z}.cdm.json |
roster/PersonDemographicEthnicity | PersonDemographicEthnicity | roster.{vX.Y.Z}.cdm.json |
roster/PersonDemographicPersonFlag | PersonDemographicPersonFlag | roster.{vX.Y.Z}.cdm.json |
roster/PersonDemographicRace | PersonDemographicRace | roster.{vX.Y.Z}.cdm.json |
roster/PersonEmailAddress | PersonEmailAddress | roster.{vX.Y.Z}.cdm.json |
roster/PersonIdentifier | PersonIdentifier | roster.{vX.Y.Z}.cdm.json |
roster/PersonOrganizationRole | PersonOrganizationRole | roster.{vX.Y.Z}.cdm.json |
roster/PersonPhoneNumber | PersonPhoneNumber | roster.{vX.Y.Z}.cdm.json |
roster/PersonRelationship | PersonRelationship | roster.{vX.Y.Z}.cdm.json |
roster/RefDefinition | RefDefinition | roster.{vX.Y.Z}.cdm.json |
roster/Section | Section | roster.{vX.Y.Z}.cdm.json |
roster/SectionGradeLevel | SectionGradeLevel | roster.{vX.Y.Z}.cdm.json |
roster/SectionSession | SectionSession | roster.{vX.Y.Z}.cdm.json |
roster/SectionSubject | SectionSubject | roster.{vX.Y.Z}.cdm.json |
roster/Session | Session | roster.{vX.Y.Z}.cdm.json |
roster/SourceSystem | SourceSystem | roster.{vX.Y.Z}.cdm.json |
この関連性をもとに、load_csv_with_cdm メソッドの引数にエンティティ名やスキーマパスを渡し、読み込むという流れになります。
さいごに
Education Data Lake Export で出力された CSV データを Common Data Model を用いてスキーマ付きで読み込むことができました。
Education Data Lake Export は日本での利用者数が少なく、情報もあまり多くないため手探りで試し実装することが多いですが、その分うまくいったときは嬉しいものです。
この記事の内容が参考になったら「いいね」をお願いします