本記事のテーマ
タイトルの通り、Azure Data Factory の Mapping Data Flow で CSV ファイルの重複行を削除する方法について記載します。サンプルのユースケースは以下の通りです。
- 複数に分かれた顧客情報 (customers) の CSV ファイルがあり、それらを 1 つのファイルにマージする
- ただし、重複する
ID
を持つレコードが存在する可能性がある -
ID
が重複する場合、更新日時が新しいレコードを残して他のレコードは削除したい
入出力ファイルのイメージ
本記事では Input: Azure Data Lake Storage Gen2 (ADLS Gen2) -> Process: Mapping Data Flow -> Output: ADLS Gen2
という流れを例にしたいと思います。
入力ファイルのイメージ
あるディレクトリの配下に同じレイアウトの CSV ファイルが複数ある想定にしています。
-
customers1.csv
- ストレージアカウント: stnakazaxqiitadf (ADLS Gen2)
- コンテナ: customers-container
- ファイルパス: /input/customers1.csv
"ID","Email","Name","Updated At"
"1","yamada.hanako@example.com","Yamada Hanako","2019-08-31 00:00:00 +0900"
"2","tanaka.taro@example.com","Tanaka Taro","2019-08-31 00:00:00 +0900"
-
customers2.csv
- ストレージアカウント: stnakazaxqiitadf (ADLS Gen2)
- コンテナ: customers-container
- ファイルパス: /input/customers2.csv
"ID","Email","Name","Updated At"
"3","adam@contoso.com","Adam","2019-09-01 00:00:00 +0900"
"4","ben@contoso.com","Ben","2019-10-01 00:00:00 +0900"
-
customers3.csv
- ストレージアカウント: stnakazaxqiitadf (ADLS Gen2)
- コンテナ: customers-container
- ファイルパス: /input/customers3.csv
"ID","Email","Name","Updated At"
"5","chris@contoso.com","Chris","2020-01-01 00:00:00 +0900"
"6","david@contoso.com","David","2020-02-01 00:00:00 +0900"
-
customers4.csv
- ストレージアカウント: stnakazaxqiitadf (ADLS Gen2)
- コンテナ: customers-container
- ファイルパス: /input/customers4.csv
"ID","Email","Name","Updated At"
"1","tanaka.hanako@example.com","Tanaka Hanako","2020-04-15 00:00:00 +0900"
"3","adam@fabricam.com","Adam","2020-04-15 00:00:00 +0900"
"5","chris@fabricam.com","Chris","2020-04-15 00:00:00 +0900"
"7","elvis@fabricam.com","Elvis","2020-04-15 00:00:00 +0900"
出力ファイルのイメージ
- ストレージアカウント: stnakazaxqiitadf (ADLS Gen2)
- コンテナ: customers-container
- ファイルパス: /output/merged_customers.csv
"ID","Email","Name","Updated At"
"1","tanaka.hanako@example.com","Tanaka Hanako","2020-04-15 00:00:00 +0900"
"2","tanaka.taro@example.com","Tanaka Taro","2019-08-31 00:00:00 +0900"
"3","adam@fabricam.com","Adam","2020-04-15 00:00:00 +0900"
"4","ben@contoso.com","Ben","2019-10-01 00:00:00 +0900"
"5","chris@fabricam.com","Chris","2020-04-15 00:00:00 +0900"
"6","david@contoso.com","David","2020-02-01 00:00:00 +0900"
"7","elvis@fabricam.com","Elvis","2020-04-15 00:00:00 +0900"
Mapping Data Flow での処理フロー例
Mapping Data Flow の処理フロー例は以下の通りです。色々なやり方があると思いますので、これが絶対の正解という訳ではなく方法の一つだと捉えて頂ければと思います。
No. | 処理内容 | 利用する組み込み機能 |
---|---|---|
1 | 入力ファイルを読み込み | ソース変換 |
2 |
ID と Updated At の最大値を出力 |
集計変換 |
3 | 1 と 2 のストリームを ID と Updated At をキーにしてジョイン |
結合変換 |
4 | 結果ファイルを出力 | シンク変換 |
以下のキャプチャは Mapping Data Flow で処理フローを組み上げたときの完成形のイメージです。
以降では各フローの簡単なポイントについて紹介します。
ソース変換
ソース変換で入力ファイルを指定する際にワイルドカード *
が使えます。
以下のように指定すると /input
ディレクトリの customers1.csv
customers2.csv
... customersN.csv
のようにまとめて読み込めます。
ワイルドカードパスはオプションなので省略可能です。省略した場合はデータセットで指定したディレクトリ直下の全ファイル、またはデータセットで指定したファイルが読み込まれます。
集計変換
[Aggregate setting] タブで [Group by] の [Columns] に ID
を指定します。
[Aggregates] では first()
last()
min()
max()
などの集計関数が使えます。ここでは max({Updated At})
で最新の更新日時を選択しています。なお、 {}
はカラム名が空白を含む場合に指定する必要があります。空白を含まない場合は不要です。
式の左側のテキストボックスで集計関数の結果を格納するカラム名を指定できるのですが、元々のカラム名と同じ Updated At
を指定している理由は後述します。
Data flow debug をオンにして [Data preview] タブを見ると以下のように一意な ID
と Updated At
の最大値のペアが出力されているはずです。
結合変換
1 と 2のストリームを入力として ID
と Updated At
が一致するレコード同士をジョインします。ジョインタイプは Inner
を選択します。
Data flow debug をオンにして [Data preview] タブを見ると以下のように一意な ID
かつ最新の Updated At
を持つレコードのみが表示されるはずです。なお、 ID
列と Updated At
列が二重で表示されていますが、シンク変換で一列ずつに絞り込めるためここでは無視して OK です。
シンク変換
[Inspect] タブの [Input] のカラムを見ると ID
と Update At
が二重で表示されています。
[Mapping] タブで [Skip duplicate input columns] にチェックを入れると、入力ストリームに重複するカラムが存在する場合、1 つのカラムのみに絞り込むことができます。
Data flow debug をオンにして [Data preview] タブを見ると以下のような結果が出力されているはずです。基本はこれで問題ないと思いますが、例えば ID
昇順で並べ替えたい場合は 並べ替え変換、カラムの並び順を変更したい場合は 選択変換 をシンク変換の前に行うと良いでしょう。