やりたいこと
- 最初に、日本郵政が公開している郵便番号データをAthenaテーブルとしてhive互換パーティションを設定して登録する
- 次に追加のパーティションにデータを出力して、MSCK REPAIR TABLEを実行して追加のパーティション情報をGlueに登録して追加のパーティションキーでデータが参照できることを確認する
郵便番号データをS3にアップロードする
- 日本郵政が公開している郵便番号データをS3にアップする
- s3バケット名は、PySparkコード上ではプレースホルダ{_s3bucket_input}として表記
- 東京都はtokyo、千葉県はchibaというフォルダを作成し、その下にCSVを配置
郵便番号データをAthenaに登録する
- 以下のPySparkコードで東京都のデータをParquet形式で出力し、Glueカタログにテーブルを登録する
- EMRを起動してspark-submitで実行
- 都道府県名カラムをパーティションキーとした
- 出力先のバケット名は、プレースホルダ{_s3bucket_output}として表記
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([
StructField('jis_code', StringType(), True, {'comment': "全国地方公共団体コード"}),
StructField('postal5', StringType(), True, {'comment': "(旧)郵便番号(5桁)"}),
StructField('postal7', StringType(), True, {'comment': "郵便番号(7桁)"}),
StructField('pref_kana', StringType(), True, {'comment': "都道府県名カナ"}),
StructField('city_kana', StringType(), True, {'comment': "市区町村名カナ"}),
StructField('address_kana', StringType(), True, {'comment': "町域名カナ"}),
StructField('pref', StringType(), True, {'comment': "都道府県名"}),
StructField('city', StringType(), True, {'comment': "市区町村"}),
StructField('address', StringType(), True, {'comment': "町域名"}),
StructField('flag1', StringType(), True, {'comment': "一町域が二以上の郵便番号で表される場合の表示(「1」は該当、「0」は該当せず)"}),
StructField('flag2', StringType(), True, {'comment': "小字毎に番地が起番されている町域の表示(「1」は該当、「0」は該当せず)"}),
StructField('flag3', StringType(), True, {'comment': "丁目を有する町域の場合の表示 (「1」は該当、「0」は該当せず)"}),
StructField('flag4', StringType(), True, {'comment': "一つの郵便番号で二以上の町域を表す場合の表示(「1」は該当、「0」は該当せず)"}),
StructField('flag5', StringType(), True, {'comment': "更新の表示(「0」は変更なし、「1」は変更あり、「2」廃止(廃止データのみ使用))"}),
StructField('flag6', StringType(), True, {'comment': "変更理由 (「0」は変更なし、「1」市政・区政・町政・分区・政令指定都市施行、「2」住居表示の実施、「3」区画整理、「4」郵便区調整等、「5」訂正、「6」廃止(廃止データのみ使用))"})
])
def main():
spark = SparkSession.builder \
.config('hive.metastore.client.factory.class', 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory') \
.config('spark.sql.sources.partitionOverwriteMode', 'dynamic') \
.enableHiveSupport() \
.getOrCreate()
# s3から東京都のcsvデータを入力
df = spark.read \
.format('csv') \
.option('header','false') \
.option('encoding','Windows-31J') \
.schema(schema) \
.load('{_s3bucket_input}/tokyo/')
# s3にparquet形式のデータを出力してGlueカタログにテーブルメタデータを登録
spark.catalog.setCurrentDatabase('test')
df.write.option('path','{_s3bucket_output}/zipcode') \
.option('compression','snappy') \
.mode('overwrite') \
.partitionBy('pref') \
.format('parquet') \
.saveAsTable('zipcode')
if __name__ == '__main__':
main()
-
s3には東京都のパーティションデータが出力されている
- {s3_bucket_output}/zipcode/pref=東京都/
追加のhive互換パーティションを出力する
- 今度は千葉県のcsvデータを入力してhive互換パーティションデータを出力する
- main関数を以下に変更して実行
- 'spark.sql.sources.partitionOverwriteMode'を指定することで、東京都のパーティションデータは保持され、追加の千葉県のデータがパーティションデータとして追加される
- {s3_bucket_output}/zipcode/pref=千葉県/
def main():
spark = SparkSession.builder \
.config('hive.metastore.client.factory.class', 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory') \
.config('spark.sql.sources.partitionOverwriteMode', 'dynamic') \
.enableHiveSupport() \
.getOrCreate()
df = spark.read \
.format('csv') \
.option('header','false') \
.option('encoding','Windows-31J') \
.schema(schema) \
.load('{_s3bucket_input}/chiba/')
df.write.parquet('{_s3bucket_output}/zipcode', mode='overwrite', partitionBy='pref', compression='snappy')
- この時点では、partition情報がGlueカタログに登録されていないので、千葉県のデータを参照することはできない
追加のパーティション情報をGlueに登録する
- MSCK REPAIR TABLEを実行して追加のパーティション情報をGlueに登録する
- main関数を以下に変更して実行
def main():
spark = SparkSession.builder \
.config('hive.metastore.client.factory.class', 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory') \
.enableHiveSupport() \
.getOrCreate()
spark.sql('MSCK REPAIR TABLE test.zipcode')
- 追加のパーティション(千葉県)がGlueに登録される
- Athenaで追加のパーティション情報を参照できるようになる
さいごに
- このようなETLジョブの要件として、1つ以上のパーティションを含む増分で登録するという要件が多いかと思います。今回の例ではこのようなケースで対応できる実装例を示してみました。