やりたいこと
- ADLSにCSVまたはJSONを配置
- U-SQLで動的なカラム (e.g., ユーザーidなど) ごとにCSVファイルを出力
- BlobにCSVファイルを保存
これだけ!!
必要なAzureのサービス
- Data Lake Analytics
- ストレージアカウント
- Blob
- Data Lake Store
全体の流れ
- 事前準備
- Azure Data Lake Storeからデータを吸い上げ、Blobにデータを吐き出せるようにする
- JSONをU-SQLにて読み込めるようにする
- 動的なカラムごとにCSVファイルを出力
事前準備
Azureのサービスの作成
Azure Data Lake を使用したスケーラブルなデータ サイエンス: エンドツーエンド チュートリアル | Microsoft Docs
これをやれば一通りの準備はできる
なぜかHDInsightクラスタを作成するという手順があるがやらないほうがよい
HDInsightクラスタは起動させておくと、めちゃお金かかるから速攻無料分がなくなる
Azure Data Lake Storeからデータを吸い上げ、Blobにデータを吐き出す
BlobをData Lake AnalyticsのData sourceに追加する必要がある
すべてのリソース > Data Lake Analytics > SETTINGS > Data sources > Add data source
で追加できる
JSONをCSVに変換
このサイトをそのままやった
Querying Multi-Structured JSON Files with U-SQL in Azure Data Lake - SQL Chick
参考
Azure Data Lake のための U-SQL プログラミング ガイド | Microsoft Docs
Azure Data Lake Analytics で100GBくらいのCSVファイルを処理 - Qiita
以下、手順
ユーザー定義エクストラクターのアセンブリの作成
1. GitHubからUSQL repositoryをクローンしてくる
git clone https://github.com/Azure/usql.git
2. Visual StudioでMicrosoft.Analytics.Samples.Formats project
をビルド
Visual Studio > ファイル > 追加 > 既存のプロジェクト
でクローンした中のusql\Examples\DataFormats\Microsoft.Analytics.Samples.Formats
フォルダ内のMicrosoft.Analytics.Samples.Formats.csproj
を指定する
ソリューション エクスプローラーにてMicrosoft.Analytics.Samples.Formats
を右クリック、ビルドを選択
3. Microsoft.Analytics.Samples.Formats\bin\Debug
にファイルが生成されたことを確認
Newtonsoft.Json.dll
Microsoft.Analytics.Samples.Formats.dll
が生成されていることを確認する
テーブル作成
CREATE DATABASE IF NOT EXISTS PoCADLDB;
Data Lake Storeに生成したアセンブリファイルの配置
生成した上記2つのDDLファイルをAzure Data Lake Storeに配置する
今回は\assembiles\JSON
に配置した
アセンブリファイルをAzure Data Lake Analyticsに登録する
USE DATABASE [PoCADLDB];
CREATE ASSEMBLY [Newtonsoft.Json] FROM @"assemblies/JSON/Newtonsoft.Json.dll";
CREATE ASSEMBLY [Microsoft.Analytics.Samples.Formats] FROM @"assemblies/JSON/Microsoft.Analytics.Samples.Formats.dll";
JSONファイルをADLSにアップロード
[
{
"AID":"Document1",
"Timestamp":"2017-03-14T20:58:13.3896042Z",
"Data": {
"Val": 67,
"PrevVal": 50,
"Descr": "ValueA"
}
},
{
"AID":"Document2",
"Timestamp":"2017-03-14T20:04:12.9693345Z",
"Data": {
"Val": 92,
"Descr": "ValueB"
}
}
]
ADLAからU-SQLを実行
REFERENCE ASSEMBLY PoCADLDB.[Newtonsoft.Json];
REFERENCE ASSEMBLY PoCADLDB.[Microsoft.Analytics.Samples.Formats];
USING Microsoft.Analytics.Samples.Formats.Json;
DECLARE @InputPath string = "/poc/data/LogCapture201703.json";
DECLARE @OutputFile string = "/poc/output/LogCapture.csv";
@RawData =
EXTRACT [AID] string,
[Timestamp] DateTime,
[Data] string
FROM @InputPath
USING new JsonExtractor();
@CreateJSONTuple =
SELECT [AID] AS AssignedID,
[Timestamp] AS TimestampUtc,
JsonFunctions.JsonTuple([Data]) AS EventData
FROM @RawData;
@Dataset =
SELECT AssignedID,
TimestampUtc,
EventData["Val"] ?? "0" AS DataValue,
EventData["PrevVal"] ?? "0" AS PreviousDataValue,
EventData["Descr"] ?? "N/A" AS Description
FROM @CreateJSONTuple;
OUTPUT @Dataset
TO @OutputFile
USING Outputters.Csv();
JSONファイルをADLSにアップロード
[
{
"medallion": "89D227B655E5C82AECF13C3F540D4CF4",
"hack_license": "BA96DE419E711691B9445D6A6307C170",
"vendor_id": "CMT",
"rate_code": "1",
"store_and_fwd_flag": "N",
"pickup_datetime": "2013-01-01 15:11:48",
"dropoff_datetime": "2013-01-01 15:18:10",
"passenger_count": "4",
"trip_time_in_secs": "382",
"trip_distance": "1.00",
"pickup_longitude": "-73.978165",
"pickup_latitude": "40.757977",
"dropoff_longitude": "-73.989838",
"dropoff_latitude": "40.751171"
},
{
"medallion": "0BD7C8F5BA12B88E0B67BED28BEA73D8",
"hack_license": "9FD8F69F0804BDB5549F40E9DA1BE472",
"vendor_id": "CMT",
"rate_code": "1",
"store_and_fwd_flag": "N",
"pickup_datetime": "2013-01-06 00:18:35",
"dropoff_datetime": "2013-01-06 00:22:54",
"passenger_count": "1",
"trip_time_in_secs": "259",
"trip_distance": "1.50",
"pickup_longitude": "-74.006683",
"pickup_latitude": "40.731781",
"dropoff_longitude": "-73.994499",
"dropoff_latitude": "40.75066"
},
{
"medallion": "0BD7C8F5BA12B88E0B67BED28BEA73D8",
"hack_license": "9FD8F69F0804BDB5549F40E9DA1BE472",
"vendor_id": "CMT",
"rate_code": "1",
"store_and_fwd_flag": "N",
"pickup_datetime": "2013-01-05 18:49:41",
"dropoff_datetime": "2013-01-05 18:54:23",
"passenger_count": "1",
"trip_time_in_secs": "282",
"trip_distance": "1.10",
"pickup_longitude": "-74.004707",
"pickup_latitude": "40.73777",
"dropoff_longitude": "-74.009834",
"dropoff_latitude": "40.726002"
},
{
"medallion": "DFD2202EE08F7A8DC9A57B02ACB81FE2",
"hack_license": "51EE87E3205C985EF8431D850C786310",
"vendor_id": "CMT",
"rate_code": "1",
"store_and_fwd_flag": "N",
"pickup_datetime": "2013-01-07 23:54:15",
"dropoff_datetime": "2013-01-07 23:58:20",
"passenger_count": "2",
"trip_time_in_secs": "244",
"trip_distance": ".70",
"pickup_longitude": "-73.974602",
"pickup_latitude": "40.759945",
"dropoff_longitude": "-73.984734",
"dropoff_latitude": "40.759388"
},
{
"medallion": "DFD2202EE08F7A8DC9A57B02ACB81FE2",
"hack_license": "51EE87E3205C985EF8431D850C786310",
"vendor_id": "CMT",
"rate_code": "1",
"store_and_fwd_flag": "N",
"pickup_datetime": "2013-01-07 23:25:03",
"dropoff_datetime": "2013-01-07 23:34:24",
"passenger_count": "1",
"trip_time_in_secs": "560",
"trip_distance": "2.10",
"pickup_longitude": "-73.97625",
"pickup_latitude": "40.748528",
"dropoff_longitude": "-74.002586",
"dropoff_latitude": "40.747868"
}
]
ADLAからU-SQLを実行
REFERENCE ASSEMBLY PoCADLDB.[Newtonsoft.Json];
REFERENCE ASSEMBLY PoCADLDB.[Microsoft.Analytics.Samples.Formats];
USING Microsoft.Analytics.Samples.Formats.Json;
DECLARE @InputPath string = "adl://pocadls4preprocess.azuredatalakestore.net/poc/data/trip_data_1.json";
DECLARE @OutputFile string = "wasb://blob1@pocblob4preprocess.blob.core.windows.net/demo_trip_json.csv";
@RawData =
EXTRACT [medallion] string,
[hack_license] string,
[vendor_id] string,
[rate_code] string,
[store_and_fwd_flag] string,
[pickup_datetime] DateTime,
[dropoff_datetime] DateTime,
[passenger_count] int,
[trip_time_in_secs] double?,
[trip_distance] double?,
[pickup_longitude] float?,
[pickup_latitude] float?,
[dropoff_longitude] float?,
[dropoff_latitude] float?
FROM @InputPath
USING new JsonExtractor();
@Dataset =
SELECT [medallion],
[hack_license],
[vendor_id],
[rate_code],
[store_and_fwd_flag],
[pickup_datetime],
[dropoff_datetime],
[passenger_count],
[trip_time_in_secs],
[trip_distance],
[pickup_longitude],
[pickup_latitude],
[dropoff_longitude],
[dropoff_latitude]
FROM @RawData;
OUTPUT @Dataset
TO @OutputFile
USING Outputters.Csv();
Tagごとに集計
参考
Azure DataLake 大全 - SlideShare
JSONファイルをADLSにアップロード
[
{
"id": 0,
"tag": 1
},
{
"id": 1,
"tag": 0
},
{
"id": 2,
"tag": 2
},
{
"id": 3,
"tag": 1
},
{
"id": 4,
"tag": 0
},
{
"id": 5,
"tag": 1
},
{
"id": 6,
"tag": 1
},
{
"id": 7,
"tag": 0
},
{
"id": 8,
"tag": 2
},
{
"id": 9,
"tag": 0
},
{
"id": 10,
"tag": 1
},
{
"id": 11,
"tag": 2
}
]
ADLAからU-SQLを実行
REFERENCE ASSEMBLY PoCADLDB.[Newtonsoft.Json];
REFERENCE ASSEMBLY PoCADLDB.[Microsoft.Analytics.Samples.Formats];
USING Microsoft.Analytics.Samples.Formats.Json;
DECLARE @InputPath string = "adl://pocadls4preprocess.azuredatalakestore.net/poc/data/random_tags_12.json";
DECLARE @OutputDir string = "wasb://blob1@pocblob4preprocess.blob.core.windows.net/";
DECLARE @OutputFilePref string = "demo_tally_json";
// Read JSON file
@RawData =
EXTRACT [id] string,
[tag] string
FROM @InputPath
USING new JsonExtractor();
// Tags set
@TagSet =
SELECT tag,
id
FROM @RawData
GROUP BY tag, id;
DECLARE @OutputFileTags string = @OutputDir + @OutputFilePref + "_tags.csv";
OUTPUT @TagSet
TO @OutputFileTags
USING Outputters.Csv();