LoginSignup
2
1

More than 3 years have passed since last update.

Athena+embulk で ETL

Last updated at Posted at 2019-12-02

目的

日毎のバッチで、S3にある ParquetファイルをQueryし、その結果をRedshift(DWH)へ入れたかった。

背景

  • QueryしたいParquetファイルは Data Lake 上にすべて存在していた。
  • 必要なParquetファイルはすべて AWS Glue Datacatalog でテーブル構造が定義されていた。
  • Queryした結果をRedshift上にテーブル化し、他のアプリケーションで使いたかった。
  • Embulkは本番環境はすでにバリバリ使っていた。

ということで

これだけ揃ってんなら、AthenaでQueryして、結果をEmbulkでRedshiftに入れればいいんでない。

作る

AthenaでQueryして、結果のCSVファイルを確認する

  • AWS CLI でAthenaにQueryを投げる。
  • Queryを投げるコマンドはQuery要求を出すだけですぐ終了するので終了を待つ。
  • jqコマンドを予め yum か何かでインストールしておく
#!/bin/bash

export RESULT_BACKET="result-bucket"
export RESULT_PATH="result-files"

# 結果待ち
wait_query(){
  max_retry=100
  check_interval=10

  echo -n "sleep ${check_interval} seconds"
  for((i = 0; i <= ${max_retry}; i++)); do
    sleep ${check_interval}

    resultJson=$(aws --profile athena_query athena get-query-execution --query-execution-id $1)
    runningStatus=$(echo ${resultJson} | jq -r '.QueryExecution.Status.State')
    echo -n "."
    if [ "${runningStatus}" == "SUCCEEDED" ]; then
      break
    fi

    if [ "${runningStatus}" == "FAILED" -o "${runningStatus}" == "CANCELLED" ]; then
      echo "query error !!"
      echo "${resultJson}"
      exit -1
    fi
  done
}

### 結果の確認
check_result(){
  json=$(aws --profile athena_query athena get-query-execution --query-execution-id $1 )
  status=$(echo ${json} | jq -r '.QueryExecution.Status.State')

  if [ "$status" == "SUCCEEDED" ]; then
    outputLocation=$(echo ${json} | jq -r '.QueryExecution.ResultConfiguration.OutputLocation')
    echo "query finished!! to ${outputLocation}"
  else
    echo "query result is not found !!"
    echo "${json}"
    exit -1
  fi
}


########## START
### Queryを実行する QueryId を取得する
export QUERY_ID=$(aws --profile athena_query athena start-query-execution \
    --query-string \
      "
  SELECT *
  FROM
    test_schema.table_a
  FULL JOIN
    test_schema.table_b on test_schema.table_a.id = test_schema.table_b = id
      " \
    --result-configuration OutputLocation="s3://${RESULT_BACKET}/${RESULT_PATH}" \
    | jq -r '.QueryExecutionId')

### Query 実行待ち
wait_until_end ${QUERY_ID}

### resulr file を取得
check_result ${QUERY_ID}

ResultのCSVファイルをEmbulkでRedshiftへInsertする

  • source となるCSVファイルのファイル名は ${QUERY_ID}.csv
  • embulk の定義ファイルはこちら
insert.yml

in:
  type: s3
  access_key: {{env.ACCESS_KEY}}
  secret_key: {{env.SECRET_KEY}}
  default_timezone: Asia/Tokyo
  bucket: {{env.RESULT_BACKET}}
  path_prefix: {{env.RESULT_PATH}}/{{env.QUERY_ID}}
  parser:
    type: csv
    delimiter: ","
    skip_header_lines: 1
    columns:
      - {name: id, type: long }
      - {name: point, type: long }
      - {name: count, type: long }
      - {name: regist_date, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N' }  

out:
  type: redshift
  host: {{ env.REDSHIFT_HOST }}
  port: 5439
  user: {{ env.REDSHIFT_USER }}
  password: {{ env.REDSHIFT_PASSWORD }}
  database: {{ env.REDSHIFT_DATABASE }}
  table: redshift_test_table
  mode: truncate_insert
  aws_auth_method: profile
  aws_profile_name: embulk
  iam_user_name: IAM-Embulk-S3-Accsess
  s3_bucket: embulk-s3-to-redshift
  s3_key_prefix: embulk
  delete_s3_temp_file: false
  default_timezone: "Japan"
  column_options:
    id: {type: 'bigint'}
    point: {type: 'int'}
    count: {type: 'int'}
    regist_date: {type: 'timestamp'}
  • embulk 実行コマンドライン
  • AthenaからQueryしたshに追加する
/home/embulk/.embulk/bin/embulk run ./insert.yml

注意点

  • AWSの都合でAthenaへのQueryの実行が待たされる場合があるので、日毎のバッチ時実行したいときは実行時間に注意

その他

  • jqコマンド超便利。AWS Cli のresult値取得に超便利。
2
1
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1