9
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

生成AIによるデータ加工でメダリオン・アーキテクチャを構築できるOracle AI Data Platform (AIDP) を使ってデータ分析してみた

Last updated at Posted at 2025-12-08

本記事はOracle Cloud Infrastructure Advent Calendar 2025 シリーズ1の8日目の記事です。
7日目は tkote さんの「Object Storage 上の Iceberg テーブルを Autonomous AI Database の外部表にする」でした。

目次

はじめに

2025年10月に開催されたOracle AI World 2025で AI Data Platform というOCIの新サービスが発表されました。

主な特徴は以下になります。

  • データレイクの用途に便利なDelta Lakeによるデータ管理機能を提供
  • Apache Sparkによる分散処理のコンピュート環境をマネージドで提供
  • 生成AIを使ったデータ加工機能を内包

これら機能がすぐ使える状態でサービス提供されており、PySparkによるデータ加工や、メダリオン・アーキテクチャによる段階的なデータ管理をサービス立ち上げ直後から実行可能です。
ちなみにメダリオン・アーキテクチャとは Bronze、Silver、Gold の3レイヤーで以下のように段階的なデータ管理を行うコンセプトです。

  • Bronze: 外部データソースから取得した生データ
  • Silver: 生データから欠損地や外れ値などを除去して品質を向上させたもの
  • Gold: Silverのデータに対して、さらに用途に合わせて追加データを付加したもの
    • ex) 既存列の平均値など集計した値、既存データをコンテキストとした生成AIによる出力、など

今回はこうしたDelta Lakeによるメダリオン・アーキテクチャの構築、加工したデータの分析といった作業を試してみました。
なお作業内容はOracle LiveLabsで公開されている以下ハンズオン教材を利用しました。

注意 (利用するOCIリージョンについて)
もしAIDPとOCIの生成AIサービスを組み合わせて利用する場合、生成AIサービスが提供されているリージョンを選択してください。
生成AIサービスが未提供のリージョンでは、OCI生成サービスが提供するLLMの呼び出しが出来ませんでした (何か方法があるのかもしれませんが...)

外部データソース (Autonomous AI Lakehouse) の準備

まずはAIDPに読み込む外部データソースとして Autonomous AI Lakehouse を準備します。
Autonomous AI Lakehouse もOracle AI Worldで発表されたサービスですが、本記事では詳細を割愛いたします。
ご興味ありましたら以下をご参照ください。

Autonomous AI Databaseのページから作成していきます。

スクリーンショット 2025-12-05 144827.png

DBバージョンは 26ai を選びます。

スクリーンショット 2025-12-05 144841.png

検証用データを格納するスキーマを作ります。

CREATE USER gold IDENTIFIED BY "strong_password";

-- Data privileges
GRANT CONNECT, RESOURCE TO gold;

-- Allow creation of tables, views, and other objects
GRANT CREATE SESSION TO gold;
GRANT CREATE TABLE TO gold;
GRANT CREATE VIEW TO gold;
GRANT CREATE SEQUENCE TO gold;
GRANT CREATE PROCEDURE TO gold;
GRANT UNLIMITED TABLESPACE TO gold;

-- Enable DBMS_CLOUD 
GRANT EXECUTE ON DBMS_CLOUD TO gold;

-- Grant access to data_pump_dir (used for saveAsTable operation in spark)
GRANT READ, WRITE ON DIRECTORY DATA_PUMP_DIR TO gold;

スクリーンショット 2025-12-05 145543.png

最後に実行しているディレクトリ・オブジェクトの権限ですが、PySparkを使って加工したDelta Lake上のデータをADBへ保存する際に必要となります。

作成したDBユーザ GOLD のRESTを有効化し、SQL Developer Webが使えるようにします。

スクリーンショット 2025-12-05 145640.png

GOLD スキーマ上にテーブルを作成し、検証用のデータを追加します。
今回利用する検証用データは各航空会社毎のフライト実績になります。

CREATE TABLE AIRLINE_SAMPLE (
  FLIGHT_ID   NUMBER,
  AIRLINE     VARCHAR2(20),
  ORIGIN      VARCHAR2(3),
  DEST        VARCHAR2(3),
  DEP_DELAY   NUMBER,
  ARR_DELAY   NUMBER,
  DISTANCE    NUMBER
);

INSERT INTO AIRLINE_SAMPLE (FLIGHT_ID, AIRLINE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE) VALUES (1001, 'Skynet Airways', 'JFK', 'LAX', 10, 5, 2475);
INSERT INTO AIRLINE_SAMPLE (FLIGHT_ID, AIRLINE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE) VALUES (1002, 'Sunwind Lines', 'ORD', 'SFO', -3, -5, 1846);
INSERT INTO AIRLINE_SAMPLE (FLIGHT_ID, AIRLINE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE) VALUES (1003, 'BlueJet', 'ATL', 'SEA', 0, 15, 2182);
INSERT INTO AIRLINE_SAMPLE (FLIGHT_ID, AIRLINE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE) VALUES (1004, 'Quantum Flyers', 'DFW', 'MIA', 5, 20, 1121);
INSERT INTO AIRLINE_SAMPLE (FLIGHT_ID, AIRLINE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE) VALUES (1005, 'Nebula Express', 'BOS', 'DEN', 12, 8, 1754);
INSERT INTO AIRLINE_SAMPLE (FLIGHT_ID, AIRLINE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE) VALUES (1006, 'Skynet Airways', 'SEA', 'ORD', -5, -2, 1721);
INSERT INTO AIRLINE_SAMPLE (FLIGHT_ID, AIRLINE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE) VALUES (1007, 'Sunwind Lines', 'MIA', 'ATL', 7, 4, 595);
INSERT INTO AIRLINE_SAMPLE (FLIGHT_ID, AIRLINE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE) VALUES (1008, 'BlueJet', 'SFO', 'BOS', 22, 18, 2704);
INSERT INTO AIRLINE_SAMPLE (FLIGHT_ID, AIRLINE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE) VALUES (1009, 'Quantum Flyers', 'LAX', 'JFK', -1, 0, 2475);
INSERT INTO AIRLINE_SAMPLE (FLIGHT_ID, AIRLINE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE) VALUES (1010, 'Nebula Express', 'DEN', 'DFW', 14, 20, 641);
INSERT INTO AIRLINE_SAMPLE (FLIGHT_ID, AIRLINE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE) VALUES (1011, 'Skynet Airways', 'PHX', 'SEA', 3, -2, 1107);
INSERT INTO AIRLINE_SAMPLE (FLIGHT_ID, AIRLINE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE) VALUES (1012, 'BlueJet', 'ORD', 'ATL', -7, -10, 606);
INSERT INTO AIRLINE_SAMPLE (FLIGHT_ID, AIRLINE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE) VALUES (1013, 'Quantum Flyers', 'BOS', 'JFK', 9, 11, 187);
INSERT INTO AIRLINE_SAMPLE (FLIGHT_ID, AIRLINE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE) VALUES (1014, 'Sunwind Lines', 'LAX', 'DFW', 13, 15, 1235);
INSERT INTO AIRLINE_SAMPLE (FLIGHT_ID, AIRLINE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE) VALUES (1015, 'Nebula Express', 'SFO', 'SEA', 0, 3, 679);
INSERT INTO AIRLINE_SAMPLE (FLIGHT_ID, AIRLINE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE) VALUES (1016, 'Skynet Airways', 'ATL', 'DEN', 6, 5, 1199);
INSERT INTO AIRLINE_SAMPLE (FLIGHT_ID, AIRLINE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE) VALUES (1017, 'BlueJet', 'DFW', 'PHX', -2, 1, 868);
INSERT INTO AIRLINE_SAMPLE (FLIGHT_ID, AIRLINE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE) VALUES (1018, 'Quantum Flyers', 'ORD', 'BOS', 8, -1, 867);
INSERT INTO AIRLINE_SAMPLE (FLIGHT_ID, AIRLINE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE) VALUES (1019, 'Sunwind Lines', 'JFK', 'MIA', 10, 16, 1090);
INSERT INTO AIRLINE_SAMPLE (FLIGHT_ID, AIRLINE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE) VALUES (1020, 'Nebula Express', 'DEN', 'ORD', -4, 0, 888);
INSERT INTO AIRLINE_SAMPLE (FLIGHT_ID, AIRLINE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE) VALUES (1021, 'Skynet Airways', 'SEA', 'ATL', 16, 12, 2182);
INSERT INTO AIRLINE_SAMPLE (FLIGHT_ID, AIRLINE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE) VALUES (1022, 'BlueJet', 'MIA', 'LAX', 5, 7, 2342);
INSERT INTO AIRLINE_SAMPLE (FLIGHT_ID, AIRLINE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE) VALUES (1023, 'Quantum Flyers', 'DEN', 'BOS', 2, -2, 1754);
INSERT INTO AIRLINE_SAMPLE (FLIGHT_ID, AIRLINE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE) VALUES (1024, 'Sunwind Lines', 'SFO', 'JFK', -6, -8, 2586);
INSERT INTO AIRLINE_SAMPLE (FLIGHT_ID, AIRLINE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE) VALUES (1025, 'Nebula Express', 'ORD', 'MIA', 11, 13, 1197);

スクリーンショット 2025-12-05 150223.png

もう一つテーブルを作成します。
こちらには最後にOracle Analytics Cloudから参照する分析用データを格納します。
中身のデータはPySparkで加工したデータを入れるため、一旦テーブルだけ作ります。

CREATE TABLE GOLD.AIRLINE_SAMPLE_GOLD (
  FLIGHT_ID   NUMBER,
  AIRLINE     VARCHAR2(20),
  ORIGIN      VARCHAR2(3),
  DEST        VARCHAR2(3),
  DEP_DELAY   NUMBER,
  ARR_DELAY   NUMBER,
  DISTANCE    NUMBER,
  AVG_DEP_DELAY   NUMBER,
  AVG_ARR_DELAY   NUMBER,
  AVG_DISTANCE    NUMBER,
  REVIEW      VARCHAR2(4000),
  SENTIMENT VARCHAR2(200)
);

スクリーンショット 2025-12-05 150236.png

AIDPの作成

AIDPの環境をプロビジョニングします。
メニューから「AIデータ・プラットフォーム」を選択し、「Create AI Data Platform」を選択します。

スクリーンショット 2025-12-05 150304.png

スクリーンショット 2025-12-05 150350.png

AI Data Platform名、ワークスペース名を入力します。

スクリーンショット 2025-12-05 150507.png

アクセス・レベルに「Standard」を選び、要件となっているポリシーが提示された場合は「Add」を押下してポリシーを適用します。

スクリーンショット 2025-12-05 150817.png

オプションとなっているポリシーも必要に応じて適用します。
本検証では「Enable object deletion」が必要となるため、それだけ適用します。

スクリーンショット 2025-12-05 150910.png

作成ボタンを押下して数分ほどで利用可能な状態になります。
StateActive になり次第、AIDP名の横にあるリンクを押下します。

スクリーンショット 2025-12-05 152622.png

AIDPのサービス・コンソールが開いたらカタログ作成へ進みます。

スクリーンショット 2025-12-05 152920.png

ADB上に作成した GOLD スキーマのデータをカタログに登録し、先ほど作成したデータを読み込めるようにします。
入力フォームへ以下画像の通りに入力します。

スクリーンショット 2025-12-05 153228.png

スクリーンショット 2025-12-05 153244.png

Password にはADB上に作成した GOLD スキーマのパスワードを入力します。
作成ボタンを押下してカタログを作ります。

サービス・コンソールのメニューから Master catalog を選択して更新ボタンを押下すると、作成した新しいカタログが表示されます。

スクリーンショット 2025-12-05 153414.png

Delta Lakeで利用するObject Storageバケットの準備

次にDelta Lakeとして利用するObject Storageバケットを作成します。

スクリーンショット 2025-12-05 153557.png

作成したバケットに delta という名前のフォルダを作成します。

スクリーンショット 2025-12-05 153636.png

スクリーンショット 2025-12-05 153701.png

上記フォルダ内にDeltaテーブルの本体 (Parquetファイル) とメタデータ (jsonファイル) が保存されていきます。

ワークスペースの作成

AIDP上にワークスペースを作成し、PySparkを使ったデータ加工ができるようにします。
サービス・コンソールのメニューから Workspace を選び、右上の Create を押下します。

スクリーンショット 2025-12-05 154229.png

フォームに入力します。
Default catalog には先ほど作成したカタログ aidp_external_gold_catalog を選び、ADBの GOLD スキーマのデータを読み取れるようにします。
Create を押下してワークスペースを作成します。

スクリーンショット 2025-12-05 154338.png

作成したワークスペース名を選択し、 ボタンからフォルダーを追加します。

スクリーンショット 2025-12-05 154438.png

スクリーンショット 2025-12-05 154507.png

フォームに入力して Create を押下します。

スクリーンショット 2025-12-05 154529.png

作成した demo フォルダを選び、 ボタンから Notebook を作成します。

スクリーンショット 2025-12-05 154620.png

作成された Notebook 右上のメニューから Create cluster を選び、Spark環境として利用するクラスタ環境を作ります。

スクリーンショット 2025-12-05 154732.png

クラスタ名だけ入力して後は初期値のまま Create を押下します。

スクリーンショット 2025-12-05 154824.png

これでPySparkを使ったデータ加工の準備が整いました。

メダリオン・アーキテクチャの構築

外部データソースからの読み込み

Spark環境上でデータ加工していくにあたり、まずは先ほど作成したクラスタをアタッチします。

スクリーンショット 2025-12-05 155941.png

ここから Notebook 上でPySparkを使って作業していきます。
まずは aidp_external_gold_catalog 経由でADB上のデータを読み込みます。

airlines_sample_table = "aidp_external_gold_catalog.gold.AIRLINE_SAMPLE"

# Confirm AIRLINE_SAMPLE table is reflected in spark
spark.sql("SHOW TABLES IN aidp_external_gold_catalog.gold").show(truncate=False)

df = spark.table(airlines_sample_table)

df.show()

スクリーンショット 2025-12-05 155959.png

ADBから読み込んだデータをDeltaテーブルとしてObject Storageに書き出します。

delta_path = "oci://oci-bucket@os-namespace/delta/airline_sample"
df.write.format("delta").mode("overwrite").save(delta_path)

スクリーンショット 2025-12-05 160604.png

Object Storageバケットを確認するとDeltaテーブルのファイルが作成されています。

スクリーンショット 2025-12-05 160620.png

Bronzeレイヤー

Spark上に新規のカタログを作成し、Bronzeレイヤー用のスキーマを作成します。
そこに先ほどObject Storage上に保存したDeltaテーブルを参照先とする新しいテーブル airline_sample_delta を構築します。

bronze_table = "airlines_data_catalog.bronze.airline_sample_delta"

# Create New Internal Catalog & Schema to store data
spark.sql("CREATE CATALOG IF NOT EXISTS airlines_data_catalog")
spark.sql("CREATE SCHEMA IF NOT EXISTS airlines_data_catalog.bronze")

# Drop the table if it exists, to avoid conflicts
spark.sql(f"DROP TABLE IF EXISTS {bronze_table}")

# Create new bronze table
spark.sql(f"""
  CREATE TABLE IF NOT EXISTS {bronze_table}
  USING DELTA
  LOCATION '{delta_path}'
""")

スクリーンショット 2025-12-05 161026.png

上記を実行するとカタログ一覧に airlines_data_catalog が追加され、Bronze スキーマも確認できます。

スクリーンショット 2025-12-06 133834.png

続いてDeltaテーブルのタイムトラベル機能を試すため、Bronzeレイヤーの airline_sample_delta テーブルを加工します。
本来はSilverレイヤーに対して行うクレンジング作業ですが、LiveLabsの手順では上記都合でBronzeレイヤーを対象に行っています。

spark.sql(f"""
    DELETE FROM {bronze_table}
    WHERE DISTANCE IS NULL OR DISTANCE < 0
""")

スクリーンショット 2025-12-05 161212.png

という訳でタイムトラベル機能を試してみます。
Deltaテーブルを読み込む際に versionAsOf オプションを使うことで過去の状態のデータを参照できます。

df_v0 = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)
df_v0.show()

スクリーンショット 2025-12-05 161346.png

Silverレイヤー

先ほどクレンジングしたデータをSilverレイヤーにも改めて保存します。
主な処理の流れは以下の通りです。

  • airlines_data_catalog にSilverレイヤー用のスキーマを作成
  • Silverレイヤーの airline_sample_delta テーブルに紐づける予定のObject Storageパスに、クレンジングしたデータをDeltaテーブルとして保存
  • Object Storage上に保存したDeltaテーブルを参照先とするSilverレイヤー用テーブル airline_sample_delta を作成
df_clean = spark.table(bronze_table)

silver_path = "oci://os-bucket@os-namespace/delta/silver/airline_sample"
silver_table = "airlines_data_catalog.silver.airline_sample_delta"

# Create Silver Schema to store data
spark.sql("CREATE SCHEMA IF NOT EXISTS airlines_data_catalog.silver")

# Write cleaned DataFrame to object storage as Delta
df_clean.write.format("delta").mode("overwrite").save(silver_path)

# Remove table registration if it already exists
spark.sql(f"DROP TABLE IF EXISTS {silver_table}")

# Register cleaned data as new Silver table
spark.sql(f"""
  CREATE TABLE {silver_table}
  USING DELTA
  LOCATION '{silver_path}'
""")

# Check table to make sure it's cleaned 
spark.sql(f"SELECT * FROM {silver_table}").show()

スクリーンショット 2025-12-05 162531.png

上記を実行するとカタログ一覧に Silver スキーマが追加されます。

スクリーンショット 2025-12-06 133848.png

Goldレイヤー

クレンジングしたSilverレイヤーのデータに情報を追加します。
まずは既存の数値データ3種について平均値を追加します。
一旦はPySparkのDataFrame上で加工します。

# Enrich data by adding aggregates/average delays and distance 
from pyspark.sql import functions as F

df = spark.table("airlines_data_catalog.silver.airline_sample_delta")

# Calculate averages by airline
avg_df = df.groupBy("AIRLINE").agg(
    F.avg("DEP_DELAY").alias("AVG_DEP_DELAY"),
    F.avg("ARR_DELAY").alias("AVG_ARR_DELAY"),
    F.avg("DISTANCE").alias("AVG_DISTANCE")
)

# Join with the detail table
enhanced_df = df.join(avg_df, on="AIRLINE", how="left")

enhanced_df.show()

スクリーンショット 2025-12-05 162713.png

既存データに各航空会社のレビュー結果を追加します。
こちらも一旦はPySparkのDataFrame上で加工します。

# Add New Review Column for Sentiment Analysis 
import random

sample_reviews = [
    "The flight was on time and comfortable.",
    "Long delay and unfriendly staff.",
    "Quick boarding and smooth flight.",
    "Lost my luggage, not happy.",
    "Great service and tasty snacks."
]

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

random_review_udf = udf(lambda: random.choice(sample_reviews), StringType())
df_with_review = enhanced_df.withColumn("REVIEW", random_review_udf())
df_with_review.show()

スクリーンショット 2025-12-05 162932.png

追加したレビューに対して生成AIを使ったセンチメント分析の結果も追加します。
こちらも一旦はPySparkのDataFrame上で加工します。
また今回はLLMとしてOCI生成AIサービスで使える Cohre Commnad R の最新バージョンを使っています。

# test model 
spark.sql("select query_model('cohere.command-latest','What is Intelligent Data Lake Service in Oracle?') as questions").show(truncate=False)

# Run Sentiment Analysis Against Review with LLM 
from pyspark.sql.functions import expr
enhanced_df = df_with_review.withColumn("SENTIMENT",\
                     expr("query_model('cohere.command-latest', concat('What is the sentiment for this review: ', review))"))\
#.show(10, False)

enhanced_df.show(10, False)

スクリーンショット 2025-12-06 134015.png

実行した結果、以下の通り SENTIMENT 列にセンチメント分析の結果が格納されました。

スクリーンショット 2025-12-06 134040.png

情報を追加したDataFrame内のデータを、GoldレイヤーのDeltaテーブルへ保存します。
処理の流れは以下の通りで、Silverレイヤーと同様です。

  • airlines_data_catalog にGoldレイヤー用のスキーマを作成
  • Goldレイヤーの airline_sample_avg テーブルに紐づける予定のObject Storageパスに、情報を追加したデータをDeltaテーブルとして保存
  • Object Storage上に保存したDeltaテーブルを参照先とするGoldレイヤー用テーブル airline_sample_avg を作成
# Save Averaged Data to Gold Schema 

gold_path = "oci://os-bucket@os-namespace/delta/gold/airline_sample_avg"
gold_table = "airlines_data_catalog.gold.airline_sample_avg"

# Create Gold Schema 
spark.sql("CREATE SCHEMA IF NOT EXISTS airlines_data_catalog.gold")

enhanced_df.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(gold_path)

spark.sql(f"DROP TABLE IF EXISTS {gold_table}")

spark.sql(f"""
  CREATE TABLE {gold_table}
  USING DELTA
  LOCATION '{gold_path}'
""")

df_gold = spark.table(gold_table) 
df_gold.show()

スクリーンショット 2025-12-06 134251.png

上記を実行するとカタログ一覧に Gold スキーマが追加されます。

スクリーンショット 2025-12-06 134316.png

Oracle Analytics Cloud (OAC) が参照するためのデータを外部データソースへ保存

OAC用データを外部データソースへ保存するにあたり事前準備を行います。
まずはGoldレイヤーのテーブル airline_sample_avg 向けに利用したDataFrameの全カラム名が、大文字となっているか確認します。
OACに読み込むテーブルのカラム名はすべて大文字である必要があります。

for col_name in df_gold.columns:
    df_gold = df_gold.withColumnRenamed(col_name, col_name.upper())

df_gold.show()

スクリーンショット 2025-12-06 134405.png

次に保存先である外部データソースの Autonomous AI Lakehouse に適したデータ型へキャスト処理を行います。

from pyspark.sql.functions import col
from pyspark.sql.types import DecimalType, StringType

# Cast columns in the DataFrame to the exact types expected by the Oracle table.
# Use DecimalType for NUMBER fields, StringType for VARCHAR2/text.

df_gold_typed = (
    df_gold
    # Cast numeric columns to DecimalType (matches NUMBER in Oracle)
    .withColumn("FLIGHT_ID", col("FLIGHT_ID").cast(DecimalType(38,10)))
    .withColumn("DEP_DELAY", col("DEP_DELAY").cast(DecimalType(38,10)))
    .withColumn("ARR_DELAY", col("ARR_DELAY").cast(DecimalType(38,10)))
    .withColumn("DISTANCE", col("DISTANCE").cast(DecimalType(38,10)))
    .withColumn("AVG_DEP_DELAY", col("AVG_DEP_DELAY").cast(DecimalType(38,10)))
    .withColumn("AVG_ARR_DELAY", col("AVG_ARR_DELAY").cast(DecimalType(38,10)))
    .withColumn("AVG_DISTANCE", col("AVG_DISTANCE").cast(DecimalType(38,10)))
    # Cast text columns to StringType (matches VARCHAR2 in Oracle)
    .withColumn("AIRLINE", col("AIRLINE").cast(StringType()))
    .withColumn("ORIGIN", col("ORIGIN").cast(StringType()))
    .withColumn("DEST", col("DEST").cast(StringType()))
    .withColumn("REVIEW", col("REVIEW").cast(StringType()))
    .withColumn("SENTIMENT", col("SENTIMENT").cast(StringType()))
)

# Specify the desired column order to match the target Oracle table
col_order = [
    "FLIGHT_ID", "AIRLINE", "ORIGIN", "DEST", "DEP_DELAY", "ARR_DELAY", "DISTANCE",
    "AVG_DEP_DELAY", "AVG_ARR_DELAY", "AVG_DISTANCE", "REVIEW", "SENTIMENT"
]

# Select only these columns, in this order, to create a clean DataFrame for insertion
df_gold_typed = df_gold_typed.select(col_order)

# Print the final DataFrame schema for validation (should match the Oracle table exactly)
print(df_gold_typed.printSchema())

# Register the DataFrame as a temp view for Spark SQL use (for INSERT INTO ... or further queries)
df_gold_typed.createOrReplaceTempView("df_gold")

スクリーンショット 2025-12-06 134500.png

スクリーンショット 2025-12-06 134513.png

加工したDataFrame df_gold の値を Autonomous AI Lakehouse へ保存します。
保存先のテーブルは冒頭で Autonomous AI Lakehouse の準備作業をした際に作った空のテーブル AIRLINE_SAMPLE_GOLD になります。

%sql
INSERT into aidp_external_gold_catalog.gold.airline_sample_gold select * from df_gold

スクリーンショット 2025-12-06 134726.png

GoldレイヤーのデータをOACで分析

Autonomous AI Lakehouseのテーブル AIRLINE_SAMPLE_GOLD をOACのデータソースとして登録し、分析してみます。
まずはOAC用にAutonomous AI Lakehouse詳細画面からウォレットを取得します。

スクリーンショット 2025-12-05 173254.png

次にOACインスタンスを作成します。

スクリーンショット 2025-12-05 173357.png

スクリーンショット 2025-12-05 173507.png

OACのトップ画面から接続の作成へ進みます。

スクリーンショット 2025-12-05 174939.png

ADW (Autonomous AI Lakehouseの旧称) を選択し、先ほど取得したウォレットをアップロードしつつ接続を作成します。

スクリーンショット 2025-12-05 174954.png

スクリーンショット 2025-12-05 175102.png

再度OACのトップ画面へ行き、データセットの作成へ進みます。

スクリーンショット 2025-12-05 175141.png

作成した接続を選びます。

スクリーンショット 2025-12-05 175157.png

AIRLINE_SAMPLE_GOLD を空白のエリアにドラッグ&ドロップし、右上の保存ボタンからデータセットを保存します。

スクリーンショット 2025-12-05 175239.png

スクリーンショット 2025-12-05 175321.png

スクリーンショット 2025-12-05 175353.png

データセットを使った分析を行うため、OACトップ画面からワークブックの作成を選択します。

スクリーンショット 2025-12-05 175614.png

作成したデータセットを選択します。

スクリーンショット 2025-12-05 175628.png

これで AIRLINE_SAMPLE_GOLD テーブルを通してGoldレイヤーのデータに対する分析を行えます。
例として、以下は航空会社毎の出発遅延時間の平均値に対するグラフとなります。

スクリーンショット 2025-12-05 180048.png

おわりに

以上がAIDPのApache SparkおよびDelta Lake、生成AI連携機能を使ったデータ加工とメダリオン・アーキテクチャの構築でした。
機械学習や分析用のデータ、あるいは生成AIのコンテキストデータを準備するための環境を、AIDPによって簡単に用意できる点が今回の検証で分かりました。
AIDPであれば分散処理のためのクラスター環境やデータレイク用のファイルシステム、メタデータ管理用のカタログを手組で構築する必要がなくなり、すぐにデータエンジニアリングを始められますね。

9
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?