1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Microsoft FabricAdvent Calendar 2024

Day 25

Microsoft Fabric の Spark ノートブックを恐れない SQL ユーザーのためのお作法

Last updated at Posted at 2024-12-23

はじめに

Spark 、難しいと思われがちですが、基本的なデータエンジニアリングをするだけであれば SQL で完遂できますので、SQL ユーザーに馴染みやすいお作法を紹介します。
(私の周りも SQLによる ETL 処理を開発して DWH を構築してきたエンジニアが多数派です)

データエンジニアリング例

image.png

このような3つのテーブルを使ってデータエンジニアリングをする例で紹介します。

導出結果は顧客ごとの注文回数です。

基本のテーブル読み取りと結果の確認

  1. テーブルを読み取ってノートブック上にデータを読みこむには、テーブルのオプションからデータの読み込みを選択します。
    image.png

  2. SELECT 文を実行するため、spark.sql関数が入力されます。SQLの結果が変数dfに格納されます。変数名は任意ですがデータフレームをdfと表現して接頭辞か接尾辞として利用することが多いです。
    image.png

  3. display(df)を実行すると データが表示されます。
    image.png

このように spark.sql と display が対話型の SQL 実行で重要な構文となります

WITH句を使用する

SparkSQLは WITH 句(CTE)に対応しているので、これを実行します。

  1. SQL を実行したいだけの場合は、%%sql をセルの先頭に付与すると、SQLセルとして扱われます。

    sql
    
    %%sql
    
    with orders as (
    
        select
            id as order_id,
            user_id as customer_id,
            order_date,
            status
        from raw_orders
    
    )
    
        select
            customer_id,
            min(order_date) as first_order,
            max(order_date) as most_recent_order,
            count(order_id) as number_of_orders
        from orders
        group by customer_id
    
        
    
  2. 実行すると  display の時と同様に結果が表示されます。
    image.png

SQL での書き込み

テーブルの同時作成(saveAsTable)は CREATE TABLE AS SELECT (CTAS)、
テーブルへの書き込みは INSERT INTO または INSERT OVERWRITE (全件上書き)や、UPDATE, MERGE INTO にて行います。

  1. CTASの場合、以下のように記述可能です。

    sql
    %%sql
    
    create table customer_orders using delta  
    as 
    with orders as (
        select
            id as order_id,
            user_id as customer_id,
            order_date,
            status
        from raw_orders
    )
    select
        customer_id,
        min(order_date) as first_order,
        max(order_date) as most_recent_order,
        count(order_id) as number_of_orders
    from orders
    group by customer_id
    
    
  2. テーブルが存在する場合は、DML が利用可能です。

    sql
    
    %%sql
     
    with orders as (
        select
            id as order_id,
            user_id as customer_id,
            order_date,
            status
        from raw_orders
    )
    
    insert overwrite customer_orders
    select
        customer_id,
        min(order_date) as first_order,
        max(order_date) as most_recent_order,
        count(order_id) as number_of_orders
    from orders
    group by customer_id
    
    

pyspark の処理とSQLでの処理を織り交ぜる

ノートブックの中では二つの言語を行き来することができます。

  1. sqlの処理結果にpython処理を加えたい場合

    pyspark
    df = spark.sql("""
    
    with orders as (
        select
            id as order_id,
            user_id as customer_id,
            order_date,
            status
        from raw_orders
    )
    
    select
        customer_id,
        min(order_date) as first_order,
        max(order_date) as most_recent_order,
        count(order_id) as number_of_orders
    from orders
    group by customer_id
    
    """)
    
    import pyspark.sql.functions as F 
    
    df_add_column =  df.withColumn("add_column1",F.lit("追加列"))
    
    display(df_add_column)
    
    

    image.png

  2. pyspark の処理結果をSQLで読みたい場合, createOrReplaceTempView を使用すると簡単です。

    pyspark
    
    df_add_column.createOrReplaceTempView("sql_tempview")
    
    
    sql
    %%sql
    
    select * from sql_tempview
    
    

    image.png

with句の代わりにcreateOrReplaceTempView をチェーンする

createOrReplaceTempViewを CTEのように使うことができます。

pyspark

ch1 = spark.sql("""
    select
        id as order_id,
        user_id as customer_id,
        order_date,
        status
    from raw_orders
""")
ch1.createOrReplaceTempView("temp_orders")

pyspark
ch2 = spark.sql("""
    select
        customer_id,
        min(order_date) as first_order,
        max(order_date) as most_recent_order,
        count(order_id) as number_of_orders
    from temp_orders
    group by customer_id

""")
ch2.createOrReplaceTempView("v_customer_orders")

pyspark
ch3 = spark.sql("""
    select
        orders.customer_id,
        sum(amount) as total_amount

    from raw_payments as payments 

    left join temp_orders as orders on
         payments.order_id = orders.order_id

    group by orders.customer_id
""")
ch3.createOrReplaceTempView("v_customer_payments")

sparksql

%%sql

select
    customers.id as customer_id,
    customers.first_name,
    customers.last_name,
    customer_orders.first_order,
    customer_orders.most_recent_order,
    customer_orders.number_of_orders,
    customer_payments.total_amount as customer_lifetime_value

from raw_customers as customers

left join v_customer_orders as customer_orders
    on customers.id = customer_orders.customer_id

left join v_customer_payments as customer_payments
    on  customers.id = customer_payments.customer_id

image.png

コードスニペット

pyspark の処理に困ったらコードスニペットの例も確認してみましょう。

image.png

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?