#この記事のゴール
ちょっとcsvファイルをAthenaでテーブル化したい、集計したいみたいな時にファイルを置くだけで自動でテーブル化してくれるLambda関数を作成します。
##Athenaとは
AthenaはAWSの提供するサービスの一つでcsvなどのテキストファイルをあたかもテーブルのように扱うことができ、そのテーブルに対してSQLなどのDMLを実行できるサービスです。大量のデータを集計するときには大変重宝します。また実行したクエリが読み込むデータ量に対して課金されるため(1TBあたり$5)、非常にお安い点も良いです。
##DDLがめんどくさい
Athenaでは通常のテーブル作成と同様にCREATE TABLE
文で作成することが可能です。
例えばs3バケットに下記のようなcsvがあったとします。
###s3://bucket/test.csv
col1,col2,col3
aaaa,bbbb,cccc
dddd,eeee,ffff
これをAthenaでテーブル化するCREATE TABLE
文は下記の通りです。
CREATE EXTERNAL TABLE IF NOT EXISTS db_name.table_name(
col1 string,
col2 string,
col3 string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
'separatorChar' = ',',
'escapeChar' = '\\'
)
LOCATION 's3://bucket/'
TBLPROPERTIES (
'skip.header.line.count'='1'
);
###いや~めんどくさい
もしカラムがもっと多かったら[カラム名 string] みたいな形でつらつら書く必要がありますし、テーブルプロパティでヘッダー行指定しているんだからそのヘッダのカラム名をよしなにつかってくれればいいのに。。。って思います。
慣れれば簡単ですが、シリアライザやプロパティ系も地味に覚えるのがめんどくさいですよね。毎回こんなもの書いていられません。
なのでちょっとこのcsvをテーブル化したいなぁレベルのテーブル作成は自動化してしまおうと思います。
#ざっくり仕様
★対象のs3バケットにcsvファイルが置かれたらputイベントでLambda起動
★Lambdaでは置かれたパスをLocationとしてCREATE TABLE
★おかれたファイルの1行目を読み込み、カラム名として利用 型は全てstring
★テーブルごとにディレクトリを切ってそのディレクトリ名をテーブル名として利用
★データベースは固定
【ディレクトリ構成】
bucket
├table1
├aaa.csv
├table2
├bbb.csv
├bbb2.csv
csvファイルが置かれたらtable1やtable2が配下のファイルを参照したテーブルが作られるイメージです。
#さっそく実装!
###まずは対象のバケットにLambdaからS3のイベントを貼ります。
設定方法についてはLambda関数を設定してS3イベントで動かそう!を参照してみてください。
###Lambda関数実装(コピー用)
1.Lambda関数を作成し対象のバケットにオブジェクト作成イベントを設定、下記のコードの要編集部分を変更
2.バケットにファイルを置いて動作確認!
※注意
ファイルが重い場合などLambdaサーバにファイルのアップロードが追い付かず、DDL作成が失敗する可能性があります。time.sleep(秒数)
コマンドで調整しましょう。
import json
import boto3
import time
s3 = boto3.resource('s3')
s3_client = boto3.client('s3')
client = boto3.client('athena')
#keyからファイル名を削除
def remove_file_name(key):
array = key.split('/')
array.pop(-1)
key_without_filename = '/'.join(array)
return key_without_filename
#DDL作成
def create_ddl(db, table, columns, s3_location, partition_flag):
athena_ddl_columns = []
for column in columns:
column = column.replace("\n","") #改行コード削除 改行コードが違う場合要編集
athena_ddl_columns.append('`' + column + '` string')
athena_ddl = "CREATE EXTERNAL TABLE IF NOT EXISTS "+db+"."+table+"(" + athena_ddl_columns.join(",") + ")"
if partition_flag == 1:
athena_ddl = athena_ddl+"PARTITIONED BY (dt string)"
athena_ddl = athena_ddl+"\
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'\
WITH SERDEPROPERTIES ('separatorChar' = ',', 'escapeChar' = '\\\\') \
LOCATION '"+s3_location+"'\
TBLPROPERTIES (\
'skip.header.line.count'='1'\
);"
return athena_ddl
#Athenaクエリ実行
def execute_athena(athena_ddl):
queryid = client.start_query_execution(
QueryString = athena_ddl,
ResultConfiguration ={
'OutputLocation' : 's3://your_location/' #要編集
}
)
return queryid
#約1分間、5秒ごとに結果確認
def check_result(qid):
for i in range(12):
status = client.get_query_execution(
QueryExecutionId = qid
)
if status['QueryExecution']['Status']['State'] == 'SUCCEEDED':
break
elif status['QueryExecution']['Status']['State'] == 'FAILED':
print('クエリ実行に失敗しました。')
raise Exception
else:
time.sleep(5)
def lambda_handler(event, context):
bucket_name = event['Records'][0]['s3']['bucket']['name']
bucket = s3.Bucket(bucket_name)
key = event['Records'][0]['s3']['object']['key']
#Lambdaサーバ上の/tmpディレクトリにファイルをダウンロードをする
tmp_file = '/tmp/tmp.csv'
bucket.download_file(key, tmp_file)
key_without_filename = remove_file_name(key)
s3_location = 's3://' + bucket_name + '/' + key_without_filename + '/'
#ファイルの1行目(ヘッダー行)を読み込んでカラム名とし、カンマ区切りで配列に格納
csvfile = open(tmp_file)
data = csvfile.readlines()
columns = data[0].split(',')
db = db_name #要編集
table = key.split('/')[1] #要編集 テーブル名ディレクトリを参照するようにインデックスを指定
partition_flag = 0 #0 or 1 partitionが必要なら設定
athena_ddl = create_ddl(db, table, columns, s3_location, partition_flag)
queryid = execute_athena(athena_ddl)
check_result(queryid['QueryExecutionId'])
~~~
#終わりに
注意点として、Lambdaの/tmpは最大で512MBまでしかデータを格納できません。(2020/2/28)
それ以上になるとエラー終了するので気を付けましょう。
今回はcsvですがtsv等もセパレータやシリアライザを変えれば簡単にできると思います。