はじめに
Glue DataBrew をつかってデータ加工を行い、Athena でクエリー出来る環境を作成できます。
初期構築時に入れたデータのクエリーを行えますが、日々データは生まれていきます。どのように DataBrew を動かして Athena としてデータを追加できるか不明だったので、検証した手順を載せておきます。
用意するデータ
以下の 3 種類の csv データを準備します。
sales-01.csv
- 1月~3月のデータ
date,shiten_code,revenue
20100101,0001,100000
20100101,0002,200000
20100101,0003,300000
20100101,0004,400000
20100101,0005,500000
20100201,0001,100000
20100201,0002,200000
20100201,0003,300000
20100201,0004,400000
20100201,0005,500000
20100301,0001,100000
20100301,0002,200000
20100301,0003,300000
20100301,0004,400000
20100301,0005,500000
sales-02.csv
- 4月~5月のデータ
date,shiten_code,revenue
20100401,0001,1000
20100401,0002,2000
20100401,0003,3000
20100401,0004,4000
20100401,0005,5000
20100501,0001,1000
20100501,0002,2000
20100501,0003,3000
20100501,0004,4000
20100501,0005,5000
shiten-master.csv
shiten_code,name
0001,東京支店
0002,大阪支店
0003,名古屋支店
0004,福岡支店
0005,札幌支店
S3 にデータ用意
S3 バケットに、2つのファイルを格納します。sales-02.csv は後で追加していきます。
Glue DataBrew でデータセットの作成
Connect new dataset を押します
sales-01.csv
を選択します
同様に shiten-master も追加しました
Glue DataBrew Project 作成
Create project を押します
adddata-sales-01
のデータセットを選択します
Create Project
Project の初期設定で頑張っている様子が見えます
表示されました。
データ加工の Recipe を作成
支店コードで結合
JOIN を押します
adddata-shiten-master
を選択して Next を押します
2 つのデータセットを shiten_code
で結合する設定を入れて、Table B 側の shiten_code は不要なので除外します。そして、Joined table prebiew を押します
結合されました
日付を Date 型に変更
date 列が int になっているので、アイコンをクリック
date を選択
Apply を押す
Recipe の Publish
Publish を押します
Publish
Recipe が作成されました
Recipe から Job の作成
作成した Recipe を選択して、Create job with this recipe を押します
ジョブの名前と、データセットを選択します
ジョブ実行結果の出力先として、S3 を選択します。そして、出力に関する詳細を Settings で変更できます。
Settings の画面は、このまま変更せずに閉じます。一番上のものは、追記型の出力です。
それ以外は Default のまま Create and run job を押します
Job が Running になりました。この記事の環境では、約 4 分ほどで Job の実行が完了しました。
S3 バケットに、あらたに outputs
のディレクトリが作成されています。outputs
の中には、Job ごとのディレクトリが新たに作成され、その下に実行結果の csv が出力されています。
glue-databrew-adddata-test01
├── outputs
│ └── adddata-sales01-job_23Jul2022_1658555829911
│ └── adddata-sales01-job_23Jul2022_1658555829911_part00000.csv
Glue の Crawler でテーブル定義を自動生成
Add crawler
名前を指定
このまま Next を押します
対象の S3 バケットとディレクトリを指定します
Next を押します
IAM Role を選んで Next
オンデマンド実行
Database を選択して Next を押します
Finish を押します
作成した Crawler を Run します
テーブル定義が作成されました
テーブル定義を以下のクエリーで確認可能です
SHOW CREATE TABLE `outputs`;
表示結果
- パーティションが有効になっれえいることがわかります。
CREATE EXTERNAL TABLE `outputs`(
`date` string,
`shiten_code` bigint,
`revenue` bigint,
`name` string)
PARTITIONED BY (
`partition_0` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
's3://glue-databrew-adddata-test01/outputs/'
TBLPROPERTIES (
'CrawlerSchemaDeserializerVersion'='1.0',
'CrawlerSchemaSerializerVersion'='1.0',
'UPDATED_BY_CRAWLER'='glue-databrew-adddata-test',
'areColumnsQuoted'='false',
'averageRecordSize'='25',
'classification'='csv',
'columnsOrdered'='true',
'compressionType'='none',
'delimiter'=',',
'objectCount'='1',
'recordCount'='21',
'sizeKey'='534',
'skip.header.line.count'='1',
'typeOfData'='file')
Athena の動作確認
SELECT 文を投げてみます。
SELECT * FROM "glue-databrew-adddata"."outputs";
ちゃんと結合された状態でデータが見えています。
Data Brew で新たなデータを追加
ここからが本番です。新たなデータセットを追加していきます。4~5 月のデータを S3 にアップロードしました。
date,shiten_code,revenue
20100401,0001,1000
20100401,0002,2000
20100401,0003,3000
20100401,0004,4000
20100401,0005,5000
20100501,0001,1000
20100501,0002,2000
20100501,0003,3000
20100501,0004,4000
20100501,0005,5000
DataBrew 上で、データセットとして作成します
既存のレシピから新たなジョブを実行します。Create job with this recipe を押します。
新たに登録したデータセットを選択します。
出力先の S3 バケットを選択します。ジョブ実行時に新たなディレクトリを生成する Setting を入れている状態です。
Create
Job が Running となりました
Job が完了すると、S3 の outputs
ディレクトリの下に、あらたなディレクトリが作成されて、ジョブの実行結果が格納されています。
> s3-tree glue-databrew-adddata-test01
glue-databrew-adddata-test01
├── outputs
│ ├── adddata-sales01-job_23Jul2022_1658555829911
│ │ └── adddata-sales01-job_23Jul2022_1658555829911_part00000.csv
│ └── adddata-sales02-job_23Jul2022_1658560796182
│ └── adddata-sales02-job_23Jul2022_1658560796182_part00000.csv
Athena で動作確認
この段階では、クロウラーで作成したテーブルではデータが更新されていません。なぜなら、パーティションが有効になっているテーブルなので、パーティションの反映が必要です。MSCK REPAIR TABLE
コマンドで追加してもいいのですが、今回は再度クロウラーを実行してみましょう。
これによって、新たにパーティションが認識されます。
これで Athena 上のクエリーにも反映されるようになりました。
検証を通じてわかったこと
-
時系列データのように、日付ごとにデータが増えていくユースケースの場合は、比較的シンプル。例えば、1日ごとにデータの加工を行い、既存のテーブルに紐づく S3 に追加でデータをアップロードすればよい
- パーティションが有効の場合は、再度クロウラーの実施や、
MSCK REPAIR TABLE
コマンドなどで新しいパーティションを認識させてあげる必要があり
- パーティションが有効の場合は、再度クロウラーの実施や、
-
過去のデータも含めてまるまる変更となるデータの場合は、すべてのデータを再加工する方法がまずは考えられる。(DROP TABLE → CREATE TABLE)
- データ量が少ない場合は、全データの再加工がシンプルで良さそう
- データ量が多い場合。Athena が Iceberg に対応したので、UPDATE クエリーで過去のデータを更新できようになった。この辺りも検討に加えると良いかもしれない
-
Glue DataBrew の Recipe を ジョブとして実行するときに、指定できるデータセットは1個のみ。
- Join や Union するときに結合する 2個目以降のテーブルは、Recipe 内で指定したデータセットで固定となる
-
データセットにアスタリスクを指定して、複数の csv を対象にすることができる。
- Join や Union するときに、外部のデータセットを指定している場合、うまくアスタリスクが利用できそうかも
- もしくは、普通に csv を上書きしてもいいかもしれない
-
Glue DataBrew の Job の Output にある Settings を選ぶことで、追記型にするのか、上書き型にするのか選択可能。
-
Athena では、各種名前にハイフンが利用されていると一部の機能が利用できない。そのため、名前にハイフンではなくアンダーバーを利用するのが良い
-
Glue DataBrew でデータ加工した csv を対象に、Glue Crawler を実行したとき、基本的にはパーティションが有効になる
- DataBrew のジョブ実行時に、新たなディレクトリを生成する Output Setting を選択しているとき、パーティションとして認識される
- デフォルトの動作が新たなディレクトリを生成するので、基本的にはパーティションが有効になりそう
-
Glue Grawler によって作成されたテーブルから CREATE TABLE を生成して、手動でパーティションが無いテーブルを作成することも可能
Appendix : パーティションない CREATE TABLE
CREATE EXTERNAL TABLE `outputs_no_partition`(
`date` string,
`shiten_code` bigint,
`revenue` bigint,
`name` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
's3://glue-databrew-adddata-test01/outputs/'
TBLPROPERTIES (
'CrawlerSchemaDeserializerVersion'='1.0',
'CrawlerSchemaSerializerVersion'='1.0',
'UPDATED_BY_CRAWLER'='glue-databrew-adddata-test',
'areColumnsQuoted'='false',
'averageRecordSize'='25',
'classification'='csv',
'columnsOrdered'='true',
'compressionType'='none',
'delimiter'=',',
'objectCount'='1',
'recordCount'='21',
'sizeKey'='534',
'skip.header.line.count'='1',
'typeOfData'='file')