1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

AthenaのMSCK REPAIR TABLEを試してみた

Last updated at Posted at 2021-04-13

やりたいこと

  • 最初に、日本郵政が公開している郵便番号データをAthenaテーブルとしてhive互換パーティションを設定して登録する
  • 次に追加のパーティションにデータを出力して、MSCK REPAIR TABLEを実行して追加のパーティション情報をGlueに登録して追加のパーティションキーでデータが参照できることを確認する

郵便番号データをS3にアップロードする

  • 日本郵政が公開している郵便番号データをS3にアップする
    • s3バケット名は、PySparkコード上ではプレースホルダ{_s3bucket_input}として表記
    • 東京都はtokyo、千葉県はchibaというフォルダを作成し、その下にCSVを配置

郵便番号データをAthenaに登録する

  • 以下のPySparkコードで東京都のデータをParquet形式で出力し、Glueカタログにテーブルを登録する
    • EMRを起動してspark-submitで実行
    • 都道府県名カラムをパーティションキーとした
    • 出力先のバケット名は、プレースホルダ{_s3bucket_output}として表記
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
	StructField('jis_code', StringType(), True, {'comment': "全国地方公共団体コード"}),
	StructField('postal5', StringType(), True, {'comment': "(旧)郵便番号(5桁)"}),
	StructField('postal7', StringType(), True, {'comment': "郵便番号(7桁)"}),
	StructField('pref_kana', StringType(), True, {'comment': "都道府県名カナ"}),
	StructField('city_kana', StringType(), True, {'comment': "市区町村名カナ"}),
	StructField('address_kana', StringType(), True, {'comment': "町域名カナ"}),
	StructField('pref', StringType(), True, {'comment': "都道府県名"}),
	StructField('city', StringType(), True, {'comment': "市区町村"}),
	StructField('address', StringType(), True, {'comment': "町域名"}),
	StructField('flag1', StringType(), True, {'comment': "一町域が二以上の郵便番号で表される場合の表示(「1」は該当、「0」は該当せず)"}),
	StructField('flag2', StringType(), True, {'comment': "小字毎に番地が起番されている町域の表示(「1」は該当、「0」は該当せず)"}),
	StructField('flag3', StringType(), True, {'comment': "丁目を有する町域の場合の表示 (「1」は該当、「0」は該当せず)"}),
	StructField('flag4', StringType(), True, {'comment': "一つの郵便番号で二以上の町域を表す場合の表示(「1」は該当、「0」は該当せず)"}),
	StructField('flag5', StringType(), True, {'comment': "更新の表示(「0」は変更なし、「1」は変更あり、「2」廃止(廃止データのみ使用))"}),
	StructField('flag6', StringType(), True, {'comment': "変更理由 (「0」は変更なし、「1」市政・区政・町政・分区・政令指定都市施行、「2」住居表示の実施、「3」区画整理、「4」郵便区調整等、「5」訂正、「6」廃止(廃止データのみ使用))"})
])

def main():
	spark = SparkSession.builder \
		.config('hive.metastore.client.factory.class', 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory') \
		.config('spark.sql.sources.partitionOverwriteMode', 'dynamic') \
		.enableHiveSupport() \
		.getOrCreate()
	# s3から東京都のcsvデータを入力
	df = spark.read \
		.format('csv') \
		.option('header','false') \
		.option('encoding','Windows-31J') \
		.schema(schema) \
		.load('{_s3bucket_input}/tokyo/')
	# s3にparquet形式のデータを出力してGlueカタログにテーブルメタデータを登録
	spark.catalog.setCurrentDatabase('test')
	df.write.option('path','{_s3bucket_output}/zipcode') \
		.option('compression','snappy') \
		.mode('overwrite') \
		.partitionBy('pref') \
		.format('parquet') \
		.saveAsTable('zipcode')

if __name__ == '__main__':
    main()
  • s3には東京都のパーティションデータが出力されている

    • {s3_bucket_output}/zipcode/pref=東京都/
  • Glueカタログに以下のスキーマ情報が登録される
    glue_catalog_zipcode.png

  • パーティション情報は東京都のみ
    glue_partition_zipcode_tokyo.png

追加のhive互換パーティションを出力する

  • 今度は千葉県のcsvデータを入力してhive互換パーティションデータを出力する
    • main関数を以下に変更して実行
    • 'spark.sql.sources.partitionOverwriteMode'を指定することで、東京都のパーティションデータは保持され、追加の千葉県のデータがパーティションデータとして追加される
      • {s3_bucket_output}/zipcode/pref=千葉県/
def main():
	spark = SparkSession.builder \
		.config('hive.metastore.client.factory.class', 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory') \
		.config('spark.sql.sources.partitionOverwriteMode', 'dynamic') \
		.enableHiveSupport() \
		.getOrCreate()
	df = spark.read \
		.format('csv') \
		.option('header','false') \
		.option('encoding','Windows-31J') \
		.schema(schema) \
		.load('{_s3bucket_input}/chiba/')
	df.write.parquet('{_s3bucket_output}/zipcode', mode='overwrite', partitionBy='pref', compression='snappy')
  • この時点では、partition情報がGlueカタログに登録されていないので、千葉県のデータを参照することはできない

追加のパーティション情報をGlueに登録する

  • MSCK REPAIR TABLEを実行して追加のパーティション情報をGlueに登録する
    • main関数を以下に変更して実行
def main():
	spark = SparkSession.builder \
		.config('hive.metastore.client.factory.class', 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory') \
		.enableHiveSupport() \
		.getOrCreate()
	spark.sql('MSCK REPAIR TABLE test.zipcode')
  • 追加のパーティション(千葉県)がGlueに登録される

glue_partition_zipcode_chiba.png

  • Athenaで追加のパーティション情報を参照できるようになる

Athena_zipcode_chiba.png

さいごに

  • このようなETLジョブの要件として、1つ以上のパーティションを含む増分で登録するという要件が多いかと思います。今回の例ではこのようなケースで対応できる実装例を示してみました。
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?