7
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Azure Synapse Analytics の Mapping Data Flow で CSV を Parquet & Delta に変換する

Last updated at Posted at 2020-07-26

はじめに

GUI ベースで ETL 処理が行える Mapping Data Flow は Azure Data Factory だけでなく Azure Synapse Analytics で使うことができます。
Mapping Data Flow は元々 Parquet 形式をサポートしていましたが、最近 Delta 形式もサポート対象に加わりました。
また、Azure Synapse Analytics では SQL オンデマンド というサーバーレスなクエリ実行エンジンを利用できます。

本記事では、Azure Blob Storage に格納された CSV ファイルを Mapping Data Flow で Parquet 形式 & Delta 形式に変換して Azure Data Lake Storage Gen2 (ADLS) に格納し、SQL オンデマンドでクエリを実行するまでの一連の流れを簡単にまとめたいと思います。

目次

  1. Synapse Workspace 作成
  2. 入出力ファイル & フォルダ準備
  3. Linked services & Datasets 設定
  4. Data flows 作成
  5. Pipelines 作成 & 実行
  6. SQL オンデマンドでのクエリ実行

1. Synapse Workspace 作成

Azure ポータルで [synapse] と検索し [Azure Synapse Analytics (workspaces preview)] を選択します。
image.png

Workspace 作成画面です。20207/26 時点では東日本 & 西日本リージョンは選択できません。任意のリージョンを指定します。
image.png

SQL オンデマンドで ADLS のデータに対してクエリを実行する際に権限的に必要なためチェックしておきます。
image.png

次画面では SQL 管理者のユーザー名とパスワードを入力します。他のチェックボックスについては必要に応じてオン/オフを切り替えましょう。
特定の IP アドレスからのアクセスのみを許可したい場合は、一番下の [Allow connection from all IP address] のチェックを外しておきます。Workspace 作成後に Firewall 画面から接続を許可する IP アドレスを編集します。
image.png

問題なければ作成します。数分後にリソース作成が完了します。
image.png

2. 入出力ファイル & フォルダ準備

入力用の準備として、ダミーデータを準備して Blob Storage にアップロードします。
出力用の準備として、ADLS にフォルダーを作成しておきます。

2-1. 入力用 - ダミーデータ作成

まずは Blob Storage に格納する CSV データを準備します。ここでは mockaroo というダミーデータ生成用の Web サイトでダミーデータを作成します。mockaroo では 1000 件 / 回のダミーデータを無料で作成できます。
https://www.mockaroo.com/

こんな感じでフィールド名と型とオプションを入れると良い感じのダミーデータが作成されます。
image.png

作成したデータは以下のような感じです。

order_number,created_at,customer_id,stock_name,quantity
1,2020-07-07T16:58:56Z,cb500cf5-4786-4bdc-b1a8-93652f31cfae,"American Eagle Outfitters, Inc.",82
2,2020-07-07T10:22:17Z,6c57e361-d511-4893-a2a1-af1579f3d15c,"Meridian Waste Solutions, Inc",24
...
999,2020-07-07T02:11:19Z,87f29c2d-30b0-4714-a2db-efe47e28831c,LKQ Corporation,45
1000,2020-07-05T04:34:07Z,58a4d463-9fc9-4f0c-9745-797e0427383f,Valvoline Inc.,29

2-2. 入力用 - ダミーデータを Blob Storage にアップロード

あらかじめ 作っておいた Blob Storage に Storage Explorer でダミーデータをアップロードします。
image.png

2-3. 出力用 - フォルダ作成

この後 Datasets を設定する際に ADLS にフォルダが存在する必要があるため、ADLS の Storage Explorer でフォルダを作成しておきます。
image.png

3. Linked services & Datasets 設定

ここから先は Synapse Workspace で作業を行います。Synapse Workspace の Overview 画面の赤枠で囲んだリンクをクリックすると Workspace が別タブで起動します。
image.png

3-1. Linked services - Blob

(Name: BlobStorageNakazaxQiitaDf)
入力 Blob Storage 用の Linked service です。
[Manage] ハブ > [Linked services] で [New] をクリックして新規 Linked service を作成します。
image.png

3-2. Datasets - Blob

(Name: OrdersCsv)
入力データ用の Dataset です。[Data] ハブの [+] アイコンをクリックして [Datasets] をクリックします。[Azure Blob Storage] > [Delimited Text] を選んだ後、入力データが格納されているパスなどを指定して Dataset を作成します。作成後の状態がこちらです。
image.png

3-3. Datasets - ADLS

(Name: OrdersParquetByDate)
Parquet 出力データ用の Dataset です。[Data] ハブの [+] アイコンをクリックして [Datasets] をクリックします。[Azure Data Lake Storage Gen2] > [Parquet] を選んだ後、出力先のフォルダパスを指定して Dataset を作成します。

image.png
image.png

これで Datasets の設定は完了です。Delta 用の Dataset については後続の手順で設定していきます。

4. Data flows 作成

CSV > Parquet 変換用の Data flow と、CSV > Delta 変換用の Data flow を作成していきます。

4-1. CSV > Parquet 変換用 Data flow

(Name: DataflowOrdersCsvToParquet)
[Develop] ハブの [+] アイコンをクリックして [Data flow] をクリックします。Data flow ではそれぞれの箱を「変換」(transformation) と呼びます。以下、各変換のキャプチャを貼付します。

4-1-1. Source 変換

(Name: SourceOrdersCsv)
Dataset に OrdersCsv を指定します。
image.png

各カラムの型はデフォルトでは String で解釈されるため、order_numberquantity の Type を Integer に変更します。
image.png

4-1-2. Derived column 変換

(Name: AddDateColumns)
パーティションに利用する日時カラムを追加します。
image.png

4-1-3. Sink 変換

(Name: SinkOrdersParquetByDate)

image.png

ここでは出力先フォルダーを毎回の処理実行時にクリアするように設定しています。
image.png

pt_year/pt_month/pt_day というパーティションを指定します。
image.png

以下のカラムが出力ファイルに書き込まれます。
image.png

4-2. CSV > Delta 変換用 Data flow

(Name: DataflowOrdersCsvToDelta)

4-2-1. Source 変換

(Name: SourceOrdersCsv)
CSV > Parquet 変換用 Data flow と同じ内容のため割愛

4-2-2. Derived column 変換

(Name: AddDateColumns)
CSV > Parquet 変換用 Data flow と同じ内容のため割愛

4-2-3. Sink 変換

(Name: SinkOrdersDeltaByDate)

Delta を指定する場合、[Sink type] から [Delta] を指定します。
image.png

出力先 ADLS の Linked service を指定します。
image.png

[Settings] タブでフォルダーパス以外にも諸々の設定ができますが、今回は特に変更せずにデフォルトのままにしておきます。
image.png

pt_year/pt_month/pt_day というパーティションを指定します。
image.png

5. Pipelines 作成 & 実行

先ほど作成した Data flow を呼び出す Pipeline を作成し、デバッグ実行して結果を出力します。

5-1. CSV > Parquet 変換用 Data flow 用 Pipeline

(Name: PipelineDataflowOrdersCsvToParquet)

[Orchestrate] ハブの [+] アイコンをクリックして [Pipeline] をクリックします。[Move & transform] > [Data flow] を右側の空白部分にドラッグ & ドロップして先ほど作成したCSV > Parquet 変換用 Data flow を指定します。[Publish all] をクリックします。
image.png

[Add trigger] > [Trigger now] をクリックし、Pipeline を実行します。
image.png

5-2. CSV > Delta 変換用 Data flow 用 Pipeline

(Name: PipelineDataflowOrdersCsvToDelta)
上記と同様の手順のため割愛します。

5-3. Pipeline 実行結果確認

[Monitor] ハブ > [Pipeline runs] から Pipeline の実行結果が確認できます。[STATUS] が [Succeeded] になっていれば OK です。
image.png

Parquet 出力結果

実際にどのようなデータが出力されたかを見てみましょう。
[Data] ハブ > [Linked] の Storage accounts に ADLS のコンテナー一覧が表示されているはずなので、その中で出力先として利用しているコンテナーを指定します。
Parquet の方はこのような出力になるはずです。

image.png

pt_year=2020 フォルダー (年のパーティション) と _SUCCESS ファイル (処理成功を表す) がある
image.png

pt_month=07 フォルダー (月のパーティション) がある
image.png

pt_day=01 ~ pt_day=07 フォルダー (日のパーティション) がある
image.png

各フォルダーの配下に part-00000-${GUID}.c0000.snappy.parquet といったネーミングのファイルがある (これがデータの実体)
image.png

Delta 出力結果

image.png

pt_year=2020 フォルダー以下の構造は Parquet と同じですが一点だけ違いとして _delta_log フォルダーがある
image.png

_delta_log フォルダーには以下のような JSON ファイルがある
image.png

JSON の中には以下のようなメタデータが格納されている。

{"commitInfo":{"timestamp":1595757668111,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"pt_year\",\"pt_month\",\"pt_day\"]"},"isBlindAppend":true,"operationMetrics":{"numFiles":"7","numOutputBytes":"84256","numOutputRows":"1000"}}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"04e98468-c8ca-41e7-a01f-f589ab5f533b","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"order_number\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"created_at\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"customer_id\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"stock_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"quantity\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"pt_year\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"pt_month\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"pt_day\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["pt_year","pt_month","pt_day"],"configuration":{},"createdTime":1595757649086}}
{"add":{"path":"pt_year=2020/pt_month=07/pt_day=01/part-00000-4dfe08ac-252a-4cee-a797-e7bdd4b48fa5.c000.snappy.parquet","partitionValues":{"pt_year":"2020","pt_month":"07","pt_day":"01"},"size":14098,"modificationTime":1595757657000,"dataChange":true}}
{"add":{"path":"pt_year=2020/pt_month=07/pt_day=02/part-00000-e12e27d8-e651-4bd2-8e45-6dddac0665ac.c000.snappy.parquet","partitionValues":{"pt_year":"2020","pt_month":"07","pt_day":"02"},"size":12294,"modificationTime":1595757658000,"dataChange":true}}
{"add":{"path":"pt_year=2020/pt_month=07/pt_day=03/part-00000-bb8e1936-e220-4200-a241-57d1663b00e4.c000.snappy.parquet","partitionValues":{"pt_year":"2020","pt_month":"07","pt_day":"03"},"size":11243,"modificationTime":1595757660000,"dataChange":true}}
{"add":{"path":"pt_year=2020/pt_month=07/pt_day=04/part-00000-5968555c-de58-4c29-b1b0-d433870d7976.c000.snappy.parquet","partitionValues":{"pt_year":"2020","pt_month":"07","pt_day":"04"},"size":11288,"modificationTime":1595757661000,"dataChange":true}}
{"add":{"path":"pt_year=2020/pt_month=07/pt_day=05/part-00000-b19c8e6b-1853-46c6-9d34-5b8b70f50d8f.c000.snappy.parquet","partitionValues":{"pt_year":"2020","pt_month":"07","pt_day":"05"},"size":11764,"modificationTime":1595757662000,"dataChange":true}}
{"add":{"path":"pt_year=2020/pt_month=07/pt_day=06/part-00000-725b1fcd-5c6c-49bf-a194-56710d5766fc.c000.snappy.parquet","partitionValues":{"pt_year":"2020","pt_month":"07","pt_day":"06"},"size":11898,"modificationTime":1595757664000,"dataChange":true}}
{"add":{"path":"pt_year=2020/pt_month=07/pt_day=07/part-00000-27b18250-ae7e-42de-8f76-37c69fac7d07.c000.snappy.parquet","partitionValues":{"pt_year":"2020","pt_month":"07","pt_day":"07"},"size":11671,"modificationTime":1595757665000,"dataChange":true}}

6. SQL オンデマンドでのクエリ実行

以下、Delta 形式のデータに対するクエリの例をいくつか記載します。Parquet 形式についても同様のクエリでアクセスできます。他のパターンについては以下ドキュメントを参考頂くのが良いと思います。

データ アナリスト チュートリアル: SQL オンデマンド (プレビュー) を使用して Azure Synapse Studio (プレビュー) で Azure Open Datasets を分析する - Azure Synapse Analytics | Microsoft Docs

単一ファイル指定 & TOP 100

SELECT
    TOP 100 *
FROM
    OPENROWSET(
        BULK 'https://stnakazaxqiitadf.dfs.core.windows.net/orders-container/orders_delta_by_date/pt_year=2020/pt_month=07/pt_day=01/part-00000-4dfe08ac-252a-4cee-a797-e7bdd4b48fa5.c000.snappy.parquet',
        FORMAT='PARQUET'
    ) AS [r];

image.png

単一ファイル指定 & COUNT

SELECT
    COUNT(*)
FROM
    OPENROWSET(
        BULK 'https://stnakazaxqiitadf.dfs.core.windows.net/orders-container/orders_delta_by_date/pt_year=2020/pt_month=07/pt_day=01/part-00000-4dfe08ac-252a-4cee-a797-e7bdd4b48fa5.c000.snappy.parquet',
        FORMAT='PARQUET'
    ) AS [r];

image.png

全ファイル指定 & COUNT

SELECT
    COUNT(*)
FROM
    OPENROWSET(
        BULK 'https://stnakazaxqiitadf.dfs.core.windows.net/orders-container/orders_delta_by_date/pt_year=*/pt_month=*/pt_day=*/*.parquet',
        FORMAT='PARQUET'
    ) AS [r];

image.png

WHERE 句で絞り込んだ日付別の件数

SELECT
    SUBSTRING(r.created_at, 0, 11) AS created_at_date,
    COUNT(*) AS count
FROM
    OPENROWSET(
        BULK 'https://stnakazaxqiitadf.dfs.core.windows.net/orders-container/orders_delta_by_date/pt_year=2020/pt_month=07/pt_day=*/*.parquet',
        FORMAT='PARQUET'
    ) AS [r]
WHERE r.filepath(1) >= '01' AND r.filepath(1) <= '04'
GROUP BY SUBSTRING(r.created_at, 0, 11)
ORDER BY created_at_date ASC;

image.png

以上です。

7
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?