目的
日毎のバッチで、S3にある ParquetファイルをQueryし、その結果をRedshift(DWH)へ入れたかった。
背景
- QueryしたいParquetファイルは Data Lake 上にすべて存在していた。
- 必要なParquetファイルはすべて AWS Glue Datacatalog でテーブル構造が定義されていた。
- Queryした結果をRedshift上にテーブル化し、他のアプリケーションで使いたかった。
- Embulkは本番環境はすでにバリバリ使っていた。
ということで
これだけ揃ってんなら、AthenaでQueryして、結果をEmbulkでRedshiftに入れればいいんでない。
作る
AthenaでQueryして、結果のCSVファイルを確認する
- AWS CLI でAthenaにQueryを投げる。
- Queryを投げるコマンドはQuery要求を出すだけですぐ終了するので終了を待つ。
- jqコマンドを予め yum か何かでインストールしておく
#!/bin/bash
export RESULT_BACKET="result-bucket"
export RESULT_PATH="result-files"
# 結果待ち
wait_query(){
max_retry=100
check_interval=10
echo -n "sleep ${check_interval} seconds"
for((i = 0; i <= ${max_retry}; i++)); do
sleep ${check_interval}
resultJson=$(aws --profile athena_query athena get-query-execution --query-execution-id $1)
runningStatus=$(echo ${resultJson} | jq -r '.QueryExecution.Status.State')
echo -n "."
if [ "${runningStatus}" == "SUCCEEDED" ]; then
break
fi
if [ "${runningStatus}" == "FAILED" -o "${runningStatus}" == "CANCELLED" ]; then
echo "query error !!"
echo "${resultJson}"
exit -1
fi
done
}
### 結果の確認
check_result(){
json=$(aws --profile athena_query athena get-query-execution --query-execution-id $1 )
status=$(echo ${json} | jq -r '.QueryExecution.Status.State')
if [ "$status" == "SUCCEEDED" ]; then
outputLocation=$(echo ${json} | jq -r '.QueryExecution.ResultConfiguration.OutputLocation')
echo "query finished!! to ${outputLocation}"
else
echo "query result is not found !!"
echo "${json}"
exit -1
fi
}
########## START
### Queryを実行する QueryId を取得する
export QUERY_ID=$(aws --profile athena_query athena start-query-execution \
--query-string \
"
SELECT *
FROM
test_schema.table_a
FULL JOIN
test_schema.table_b on test_schema.table_a.id = test_schema.table_b = id
" \
--result-configuration OutputLocation="s3://${RESULT_BACKET}/${RESULT_PATH}" \
| jq -r '.QueryExecutionId')
### Query 実行待ち
wait_until_end ${QUERY_ID}
### resulr file を取得
check_result ${QUERY_ID}
ResultのCSVファイルをEmbulkでRedshiftへInsertする
- source となるCSVファイルのファイル名は ${QUERY_ID}.csv
- embulk の定義ファイルはこちら
insert.yml
in:
type: s3
access_key: {{env.ACCESS_KEY}}
secret_key: {{env.SECRET_KEY}}
default_timezone: Asia/Tokyo
bucket: {{env.RESULT_BACKET}}
path_prefix: {{env.RESULT_PATH}}/{{env.QUERY_ID}}
parser:
type: csv
delimiter: ","
skip_header_lines: 1
columns:
- {name: id, type: long }
- {name: point, type: long }
- {name: count, type: long }
- {name: regist_date, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N' }
out:
type: redshift
host: {{ env.REDSHIFT_HOST }}
port: 5439
user: {{ env.REDSHIFT_USER }}
password: {{ env.REDSHIFT_PASSWORD }}
database: {{ env.REDSHIFT_DATABASE }}
table: redshift_test_table
mode: truncate_insert
aws_auth_method: profile
aws_profile_name: embulk
iam_user_name: IAM-Embulk-S3-Accsess
s3_bucket: embulk-s3-to-redshift
s3_key_prefix: embulk
delete_s3_temp_file: false
default_timezone: "Japan"
column_options:
id: {type: 'bigint'}
point: {type: 'int'}
count: {type: 'int'}
regist_date: {type: 'timestamp'}
- embulk 実行コマンドライン
- AthenaからQueryしたshに追加する
/home/embulk/.embulk/bin/embulk run ./insert.yml
注意点
- AWSの都合でAthenaへのQueryの実行が待たされる場合があるので、日毎のバッチ時実行したいときは実行時間に注意
その他
- jqコマンド超便利。AWS Cli のresult値取得に超便利。