Posted at

Azure Data Lake AnalyticsでJSONデータを加工

More than 1 year has passed since last update.


やりたいこと


  • ADLSにCSVまたはJSONを配置

  • U-SQLで動的なカラム (e.g., ユーザーidなど) ごとにCSVファイルを出力

  • BlobにCSVファイルを保存

これだけ!!


必要なAzureのサービス


  • Data Lake Analytics

  • ストレージアカウント


    • Blob



  • Data Lake Store


全体の流れ


  • 事前準備

  • Azure Data Lake Storeからデータを吸い上げ、Blobにデータを吐き出せるようにする

  • JSONをU-SQLにて読み込めるようにする

  • 動的なカラムごとにCSVファイルを出力


事前準備


Azureのサービスの作成

Azure Data Lake を使用したスケーラブルなデータ サイエンス: エンドツーエンド チュートリアル | Microsoft Docs

これをやれば一通りの準備はできる

なぜかHDInsightクラスタを作成するという手順があるがやらないほうがよい

HDInsightクラスタは起動させておくと、めちゃお金かかるから速攻無料分がなくなる

image.png


Azure Data Lake Storeからデータを吸い上げ、Blobにデータを吐き出す

BlobをData Lake AnalyticsのData sourceに追加する必要がある

すべてのリソース > Data Lake Analytics > SETTINGS > Data sources > Add data source

で追加できる

作業中

image.png

作業後

image.png


JSONをCSVに変換

このサイトをそのままやった

Querying Multi-Structured JSON Files with U-SQL in Azure Data Lake - SQL Chick


参考

Azure Data Lake のための U-SQL プログラミング ガイド | Microsoft Docs

Azure Data Lake Analytics で100GBくらいのCSVファイルを処理 - Qiita

以下、手順


ユーザー定義エクストラクターのアセンブリの作成


1. GitHubからUSQL repositoryをクローンしてくる


ローカル

git clone https://github.com/Azure/usql.git



2. Visual StudioでMicrosoft.Analytics.Samples.Formats projectをビルド

Visual Studio > ファイル > 追加 > 既存のプロジェクト

でクローンした中のusql\Examples\DataFormats\Microsoft.Analytics.Samples.Formatsフォルダ内のMicrosoft.Analytics.Samples.Formats.csprojを指定する

image.png

ソリューション エクスプローラーにてMicrosoft.Analytics.Samples.Formatsを右クリック、ビルドを選択


3. Microsoft.Analytics.Samples.Formats\bin\Debugにファイルが生成されたことを確認

Newtonsoft.Json.dll

Microsoft.Analytics.Samples.Formats.dll

が生成されていることを確認する

image.png


テーブル作成


AzureDataAnalytics

CREATE DATABASE IF NOT EXISTS PoCADLDB;


image.png


Data Lake Storeに生成したアセンブリファイルの配置

生成した上記2つのDDLファイルをAzure Data Lake Storeに配置する

今回は\assembiles\JSONに配置した

image.png


アセンブリファイルをAzure Data Lake Analyticsに登録する


AzureDataAnalytics

USE DATABASE [PoCADLDB];

CREATE ASSEMBLY [Newtonsoft.Json] FROM @"assemblies/JSON/Newtonsoft.Json.dll";
CREATE ASSEMBLY [Microsoft.Analytics.Samples.Formats] FROM @"assemblies/JSON/Microsoft.Analytics.Samples.Formats.dll";

image.png


JSONファイルをADLSにアップロード

image.png


LogCapture201703.json

[

{
"AID":"Document1",
"Timestamp":"2017-03-14T20:58:13.3896042Z",
"Data": {
"Val": 67,
"PrevVal": 50,
"Descr": "ValueA"
}
},
{
"AID":"Document2",
"Timestamp":"2017-03-14T20:04:12.9693345Z",
"Data": {
"Val": 92,
"Descr": "ValueB"
}
}
]


ADLAからU-SQLを実行


AzureDataAnalytics

REFERENCE ASSEMBLY PoCADLDB.[Newtonsoft.Json];

REFERENCE ASSEMBLY PoCADLDB.[Microsoft.Analytics.Samples.Formats];

USING Microsoft.Analytics.Samples.Formats.Json;

DECLARE @InputPath string = "/poc/data/LogCapture201703.json";

DECLARE @OutputFile string = "/poc/output/LogCapture.csv";

@RawData =
EXTRACT [AID] string,
[Timestamp] DateTime,
[Data] string
FROM @InputPath
USING new JsonExtractor();

@CreateJSONTuple =
SELECT [AID] AS AssignedID,
[Timestamp] AS TimestampUtc,
JsonFunctions.JsonTuple([Data]) AS EventData
FROM @RawData;

@Dataset =
SELECT AssignedID,
TimestampUtc,
EventData["Val"] ?? "0" AS DataValue,
EventData["PrevVal"] ?? "0" AS PreviousDataValue,
EventData["Descr"] ?? "N/A" AS Description
FROM @CreateJSONTuple;

OUTPUT @Dataset
TO @OutputFile
USING Outputters.Csv();


image.png


JSONファイルをADLSにアップロード

image.png


trip_data_1.json

[

{
"medallion": "89D227B655E5C82AECF13C3F540D4CF4",
"hack_license": "BA96DE419E711691B9445D6A6307C170",
"vendor_id": "CMT",
"rate_code": "1",
"store_and_fwd_flag": "N",
"pickup_datetime": "2013-01-01 15:11:48",
"dropoff_datetime": "2013-01-01 15:18:10",
"passenger_count": "4",
"trip_time_in_secs": "382",
"trip_distance": "1.00",
"pickup_longitude": "-73.978165",
"pickup_latitude": "40.757977",
"dropoff_longitude": "-73.989838",
"dropoff_latitude": "40.751171"
},
{
"medallion": "0BD7C8F5BA12B88E0B67BED28BEA73D8",
"hack_license": "9FD8F69F0804BDB5549F40E9DA1BE472",
"vendor_id": "CMT",
"rate_code": "1",
"store_and_fwd_flag": "N",
"pickup_datetime": "2013-01-06 00:18:35",
"dropoff_datetime": "2013-01-06 00:22:54",
"passenger_count": "1",
"trip_time_in_secs": "259",
"trip_distance": "1.50",
"pickup_longitude": "-74.006683",
"pickup_latitude": "40.731781",
"dropoff_longitude": "-73.994499",
"dropoff_latitude": "40.75066"
},
{
"medallion": "0BD7C8F5BA12B88E0B67BED28BEA73D8",
"hack_license": "9FD8F69F0804BDB5549F40E9DA1BE472",
"vendor_id": "CMT",
"rate_code": "1",
"store_and_fwd_flag": "N",
"pickup_datetime": "2013-01-05 18:49:41",
"dropoff_datetime": "2013-01-05 18:54:23",
"passenger_count": "1",
"trip_time_in_secs": "282",
"trip_distance": "1.10",
"pickup_longitude": "-74.004707",
"pickup_latitude": "40.73777",
"dropoff_longitude": "-74.009834",
"dropoff_latitude": "40.726002"
},
{
"medallion": "DFD2202EE08F7A8DC9A57B02ACB81FE2",
"hack_license": "51EE87E3205C985EF8431D850C786310",
"vendor_id": "CMT",
"rate_code": "1",
"store_and_fwd_flag": "N",
"pickup_datetime": "2013-01-07 23:54:15",
"dropoff_datetime": "2013-01-07 23:58:20",
"passenger_count": "2",
"trip_time_in_secs": "244",
"trip_distance": ".70",
"pickup_longitude": "-73.974602",
"pickup_latitude": "40.759945",
"dropoff_longitude": "-73.984734",
"dropoff_latitude": "40.759388"
},
{
"medallion": "DFD2202EE08F7A8DC9A57B02ACB81FE2",
"hack_license": "51EE87E3205C985EF8431D850C786310",
"vendor_id": "CMT",
"rate_code": "1",
"store_and_fwd_flag": "N",
"pickup_datetime": "2013-01-07 23:25:03",
"dropoff_datetime": "2013-01-07 23:34:24",
"passenger_count": "1",
"trip_time_in_secs": "560",
"trip_distance": "2.10",
"pickup_longitude": "-73.97625",
"pickup_latitude": "40.748528",
"dropoff_longitude": "-74.002586",
"dropoff_latitude": "40.747868"
}
]


ADLAからU-SQLを実行


AzureDataAnalytics

REFERENCE ASSEMBLY PoCADLDB.[Newtonsoft.Json];

REFERENCE ASSEMBLY PoCADLDB.[Microsoft.Analytics.Samples.Formats];

USING Microsoft.Analytics.Samples.Formats.Json;

DECLARE @InputPath string = "adl://pocadls4preprocess.azuredatalakestore.net/poc/data/trip_data_1.json";

DECLARE @OutputFile string = "wasb://blob1@pocblob4preprocess.blob.core.windows.net/demo_trip_json.csv";

@RawData =
EXTRACT [medallion] string,
[hack_license] string,
[vendor_id] string,
[rate_code] string,
[store_and_fwd_flag] string,
[pickup_datetime] DateTime,
[dropoff_datetime] DateTime,
[passenger_count] int,
[trip_time_in_secs] double?,
[trip_distance] double?,
[pickup_longitude] float?,
[pickup_latitude] float?,
[dropoff_longitude] float?,
[dropoff_latitude] float?
FROM @InputPath
USING new JsonExtractor();

@Dataset =
SELECT [medallion],
[hack_license],
[vendor_id],
[rate_code],
[store_and_fwd_flag],
[pickup_datetime],
[dropoff_datetime],
[passenger_count],
[trip_time_in_secs],
[trip_distance],
[pickup_longitude],
[pickup_latitude],
[dropoff_longitude],
[dropoff_latitude]
FROM @RawData;

OUTPUT @Dataset
TO @OutputFile
USING Outputters.Csv();



Tagごとに集計

参考

Azure DataLake 大全 - SlideShare


JSONファイルをADLSにアップロード

image.png


random_tags_12.json

[

{
"id": 0,
"tag": 1
},
{
"id": 1,
"tag": 0
},
{
"id": 2,
"tag": 2
},
{
"id": 3,
"tag": 1
},
{
"id": 4,
"tag": 0
},
{
"id": 5,
"tag": 1
},
{
"id": 6,
"tag": 1
},
{
"id": 7,
"tag": 0
},
{
"id": 8,
"tag": 2
},
{
"id": 9,
"tag": 0
},
{
"id": 10,
"tag": 1
},
{
"id": 11,
"tag": 2
}
]


ADLAからU-SQLを実行


AzureDataAnalytics

REFERENCE ASSEMBLY PoCADLDB.[Newtonsoft.Json];

REFERENCE ASSEMBLY PoCADLDB.[Microsoft.Analytics.Samples.Formats];

USING Microsoft.Analytics.Samples.Formats.Json;

DECLARE @InputPath string = "adl://pocadls4preprocess.azuredatalakestore.net/poc/data/random_tags_12.json";

DECLARE @OutputDir string = "wasb://blob1@pocblob4preprocess.blob.core.windows.net/";
DECLARE @OutputFilePref string = "demo_tally_json";

// Read JSON file
@RawData =
EXTRACT [id] string,
[tag] string
FROM @InputPath
USING new JsonExtractor();

// Tags set
@TagSet =
SELECT tag,
id
FROM @RawData
GROUP BY tag, id;

DECLARE @OutputFileTags string = @OutputDir + @OutputFilePref + "_tags.csv";

OUTPUT @TagSet
TO @OutputFileTags
USING Outputters.Csv();