1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Education Data Lake Export のデータをスキーマ付きで読み込む

Last updated at Posted at 2024-05-22

こんにちは。株式会社 エーティーエルシステムズ 中村です。

Education Data Lake Export を用いることで、Microsoft 365 Education テナントに関する「名簿データ」と「アクティビティデータ」をデータレイク経由で取得することができます。
データは CSV 形式で出力されますが、CSV データにはヘッダ行がなく、スキーマとして出力される Common Data Model (CDM) に沿ってデータを取得する必要があります。

この記事では、Education Data Lake Export で出力されたデータを Synapse Analytics において CDM をもとに読み込む方法について解説します。

環境

本記事では、以下の環境で動作確認をしています。

Common Data Model (CDM) とは

Common Data Model は、Microsoft などが提供するデータモデルです。アプリケーション間で相互に利用できるよう標準化されたデータモデルとなっています。

Common Data Model については、@yomatsum さんによる Common Data Model を紐解く が分かりやすいです。

PySpark でスキーマを付けて読み込むメソッド

詳細な情報は後述しますが、まずは結論から。次のようなメソッドを用意します。

from pyspark.sql.types import *
from pyspark.sql import functions as F

def load_csv_with_cdm(source_csv_path, cdm_schema_path, entity_name):
      """
      CDMスキーマをもとに、CSVを読み込みヘッダー付きデータフレームを返す

      source_csv_path: 元データとするCSVファイルのパス
      cdm_schema_path: CDMスキーマファイルのパス
      entity_name: エンティティ名
      """
      # CDMのスキーマ
      cdm_schema = StructType([
          StructField("jsonSchemaSemanticVersion", StringType(), True),
          StructField("imports", ArrayType(StructType([
                  StructField('corpusPath',StringType(), True)
          ])), True),
          StructField("definitions", ArrayType(StructType([
              StructField("entityName", StringType(), True),
              StructField("hasAttributes", ArrayType(StructType([
                  StructField("name", StringType(), True),
                  StructField("dataType", StructType([
                      StructField("dataTypeReference", StringType(), True)
                  ]), True),
                  StructField("dataFormat", StringType(), True),
                  StructField("description", StringType(), True)
              ])), True)
          ])), True)
      ])

      # スキーマを付けて、CDM JSONを読み込む
      df_schema = spark.read.load(cdm_schema_path, format="json", multiline=True, schema=cdm_schema)

      # エンティティ名に合致するスキーマのみとする
      df_schema = df_schema.withColumn("definition", F.explode("definitions"))
      df_schema = df_schema.withColumn("entityName", F.col("definition.entityName"))
      df_schema = df_schema.filter(F.col("entityName") == entity_name)
      df_schema = df_schema.limit(1)

      # 属性の名前とデータ型を attributeName と attributeType として取り出す
      # データ型は、dataType.dataTypeReference か dataFormat のいずれかで記載されている
      df_schema = df_schema.withColumn("attribute", F.explode("definition.hasAttributes"))
      df_schema = df_schema.withColumn("attributeName", F.col("attribute.name"))
      df_schema = df_schema.withColumn("attributeRawType",
          F.when(F.col("attribute.dataType.dataTypeReference").isNotNull(), F.col("attribute.dataType.dataTypeReference"))
          .when(F.col("attribute.dataFormat").isNotNull(), F.col("attribute.dataFormat"))
      )

      # [値名, データ型] の配列にする
      df_schema = df_schema.withColumn("attributeType", 
          F.when(F.lower("attributeRawType") == "datetime", F.lit("timestamp"))
          .when(F.lower("attributeRawType") == "int", F.lit("integer"))
          .when(F.lower("attributeRawType") == "int32", F.lit("integer"))
          .when(F.lower("attributeRawType") == "guid", F.lit("string"))
          .when(F.lower("attributeRawType") == "time", F.lit("string"))
          .otherwise(F.col("attributeRawType"))
      )
      df_schema = df_schema.select("attributeName", "attributeType")
      origin_schema = [[row.attributeName, row.attributeType] for row in df_schema.collect()]

      # PySparkで利用できるスキーマに変換する
      fields = []
      for col_schema in origin_schema:
          col_name = col_schema[0]
          col_dtype = col_schema[1]
          fields.append(StructField(col_name, globals()[col_dtype.lower().capitalize() + "Type"](), True))
      spark_schema = StructType(fields)

      # CDMをもとに作成したスキーマを用いて、CSVを読み込んでヘッダー付きデータフレームを作成
      df = spark.read.load(source_csv_path, format="csv", header=False, schema=spark_schema)

      return df

このメソッドは以下のように利用します。

date = "yyyy-MM-dd" # 対象の日付に変更
roster_datetime = "yyyy-MM-ddThh-mm-ss" # 対象の日時に変更

activity_schema_version = "vX.Y.Z" # アクティビティスキーマのファイルバージョンに変更
activedirectory_schema_version = "vX.Y.Z" # ADスキーマのファイルバージョンに変更
roster_schema_version = "vX.Y.Z"  # Rosterスキーマのファイルバージョンに変更

activity_schema_path = "abfss://*****@***********.dfs.core.windows.net/M365/schema/activity." + activity_schema_version + ".cdm.json"
activedirectory_schema_path = "abfss://*****@***********.dfs.core.windows.net/M365/schema/activedirectory." + activedirectory_schema_version + ".cdm.json"
roster_schema_path = "abfss://*****@***********.dfs.core.windows.net/M365/schema/roster." + roster_schema_version + ".cdm.json"

# Activityの場合
activity_entity_name = "TechActivity"
activity_abfss_path = "abfss://*****@***********.dfs.core.windows.net/M365/activity/" + date + "/*.csv"

df_activity = load_csv_with_cdm(activity_abfss_path, activity_schema_path, activity_entity_name)
display(df_activity)

# Roster AadGroupの場合
aadgroup_entity_name = "AadGroup"
aadgroup_abfss_path = "abfss://*****@***********.dfs.core.windows.net/M365/roster/" + roster_datetime + "/" + aadgroup_entity_name + "/*.csv"

df_aadgroup = load_csv_with_cdm(aadgroup_abfss_path, aadgroup_schema_path, aadgroup_entity_name)
display(df_aadgroup)

# Roster Personの場合
person_entity_name = "Person"
person_abfss_path = "abfss://*****@***********.dfs.core.windows.net/M365/roster/" + roster_datetime + "/" + person_entity_name + "/*.csv"

df_person = load_csv_with_cdm(person_abfss_path, person_schema_path, person_entity_name)
display(df_person)

これを実行すると、Education Data Lake Export のデータをスキーマ付きで読み込むことができます。

Education Data Lake Export におけるデータ構造

Education Data Lake Export でデータレイクに出力させると、以下のディレクトリ構造で格納されます。

M365
|   current.manifest.cdm.json
|
+--- activity
|   |   activity.manifest.cdm.json
|   |
|   \--- {yyyy-MM-dd}
|           ApplicationUsage.Part001.csv
|           ApplicationUsage.Part002.csv
|           ApplicationUsage.Part003.csv
+--- roster
|   |   current.roster.manifest.cdm.json
|   |
|   \--- {yyyy-MM-dd'T'hh-mm-ss}
|       |   snapshot.roster.manifest.cdm.json
|       |
|       +--- AadGroup
|       |       part-00000-*.csv
|       |
|       +--- AadGroupMembership
|       |       part-00000-*.csv
|       |
|       \--- AadUser
|               part-00000-*.csv
\--- schema
        activedirectory.{vX.Y.Z}.cdm.json
        activity.{vX.Y.Z}.cdm.json
        roster.{vX.Y.Z}.cdm.json

このうち、「名簿データ」が roster ディレクトリに格納されるデータ、「アクティビティデータ」が activity ディレクトリに格納されるデータとなります。
それぞれ、「名簿データ」は snapshot.roster.manifest.cdm.json、「アクティビティデータ」は activity.manifest.cdm.json にデータ構造が記載されており、それらをもとにスキーマを選択します。

実際にこれらを読むと、以下の関連性を確認できます。

エンティティディレクトリ エンティティ名 スキーマファイル名
activity TechActivity activity.{vX.Y.Z}.cdm.json
roster/AadGroup AadGroup activedirectory.{vX.Y.Z}.cdm.json
roster/AadGroupMembership AadGroupMembership activedirectory.{vX.Y.Z}.cdm.json
roster/AadUser AadUser activedirectory.{vX.Y.Z}.cdm.json
roster/AadUserPersonMapping AadUserPersonMapping roster.{vX.Y.Z}.cdm.json
roster/Course Course roster.{vX.Y.Z}.cdm.json
roster/CourseGradeLevel CourseGradeLevel roster.{vX.Y.Z}.cdm.json
roster/CourseSubject CourseSubject roster.{vX.Y.Z}.cdm.json
roster/Enrollment Enrollment roster.{vX.Y.Z}.cdm.json
roster/Organization Organization roster.{vX.Y.Z}.cdm.json
roster/Person Person roster.{vX.Y.Z}.cdm.json
roster/PersonDemographic PersonDemographic roster.{vX.Y.Z}.cdm.json
roster/PersonDemographicEthnicity PersonDemographicEthnicity roster.{vX.Y.Z}.cdm.json
roster/PersonDemographicPersonFlag PersonDemographicPersonFlag roster.{vX.Y.Z}.cdm.json
roster/PersonDemographicRace PersonDemographicRace roster.{vX.Y.Z}.cdm.json
roster/PersonEmailAddress PersonEmailAddress roster.{vX.Y.Z}.cdm.json
roster/PersonIdentifier PersonIdentifier roster.{vX.Y.Z}.cdm.json
roster/PersonOrganizationRole PersonOrganizationRole roster.{vX.Y.Z}.cdm.json
roster/PersonPhoneNumber PersonPhoneNumber roster.{vX.Y.Z}.cdm.json
roster/PersonRelationship PersonRelationship roster.{vX.Y.Z}.cdm.json
roster/RefDefinition RefDefinition roster.{vX.Y.Z}.cdm.json
roster/Section Section roster.{vX.Y.Z}.cdm.json
roster/SectionGradeLevel SectionGradeLevel roster.{vX.Y.Z}.cdm.json
roster/SectionSession SectionSession roster.{vX.Y.Z}.cdm.json
roster/SectionSubject SectionSubject roster.{vX.Y.Z}.cdm.json
roster/Session Session roster.{vX.Y.Z}.cdm.json
roster/SourceSystem SourceSystem roster.{vX.Y.Z}.cdm.json

この関連性をもとに、load_csv_with_cdm メソッドの引数にエンティティ名やスキーマパスを渡し、読み込むという流れになります。

さいごに

Education Data Lake Export で出力された CSV データを Common Data Model を用いてスキーマ付きで読み込むことができました。
Education Data Lake Export は日本での利用者数が少なく、情報もあまり多くないため手探りで試し実装することが多いですが、その分うまくいったときは嬉しいものです。

この記事の内容が参考になったら「いいね」をお願いします:star:

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?