Help us understand the problem. What is going on with this article?

AWS Athenaで結果を強引にJSONで受け取る方法

More than 1 year has passed since last update.

追記

微妙に最新の情報を見逃していてAthenaはCTAS(CREATE TABLE AS)をサポートできるようになってたらしいです。
https://dev.classmethod.jp/cloud/aws/amazon-athena-support-ctas/

ということで以下のワークアラウンドはほぼ不要です。

tl;dr

  • Athenaの実行結果をJSONで受け取りたかった
  • SQLですべてのカラムを一つのJSONにキャストすればJSONで受け取れる
  • 本格的にETLするならGlueジョブを使いましょう

はじめに

AWS AthenaはマネージドPrestoとも言えるサービスでクラスタを管理しなくても手軽にS3のデータにクエリを投げれる良いサービスです。
一方でAthenaはINSERT文に対応していなかったり、クエリの出力形式がCSVかつ無圧縮しか選択できなかったりと、なんらかETLをするには不向きなサービスです(2018/10/15時点)。本格的なETLにはGlueを使うのがいいんですが、整形されたデータをチョロチョロっとサマって他のプログラムの入力にしたいみたいケースでは手軽にAthenaで済ませたくなります。そういった際に出力をプログラムで扱いやすいJSON形式でできるとなお嬉しいです。

ということで以下のようなクエリを書いてみました。

クエリ

こんな感じで出力結果をMAP経由でJSONにキャストすると結果を一つのJSON形式にできます。(サブクエリの中は適当なサンプルデータです。)

SELECT
  CAST( MAP(
    ARRAY[
      'name',
      'age',
      'skils',
      'account'
    ],
    ARRAY[
      CAST(name as JSON),
      CAST(age as JSON),
      CAST(skils as JSON),
      CAST(account as JSON)
    ]
  ) AS JSON) AS json_data
FROM (
  SELECT
    'kanga' as name,
    100 as age,
    ARRAY['sql','aws'] as skils,
    MAP(
      ARRAY['qiita', 'twitter'],
      ARRAY['kanga', 'kanga333']
    ) as account
);

こんな感じに結果が受け取れます。

json.csv
"json_data"
"{""account"":{""qiita"":""kanga"",""twitter"":""kanga333""},""age"":100,""name"":""kanga"",""skils"":[""sql"",""aws""]}"

厳密には出力はJSON形式では無く、1カラムのCSVの文字列の中にJSONが入ってる感じになります。

利用する際は文字列からデコードする必要があります。

直接csvをロードする場合はこんな感じです。

load.py
import csv
import json

with open("json.csv") as f:
    csv_records = csv.reader(f)
    # Skip header
    next(csv_records, None)
    for csv_record in csv_records:
        json_record = json.loads(csv_record[0])
        # Output: kanga
        print(json_record['name'])
        # Output: {'qiita': 'kanga', 'twitter': 'kanga333'}
        print(json_record['account'])

AWS APIのget_query_resultsから利用する際も同じ感じです。

get_query_results.py
import json
import boto3

athena = boto3.client('athena')
response = athena.get_query_results(
    QueryExecutionId='15299086-c4e6-425a-9b61-d360a875225d'
)
rows = response['ResultSet']['Rows']

for row in rows[1:]:
    data = row['Data'][0]['VarCharValue']
    json_record = json.loads(data)
    # Output: kanga
    print(json_record['name'])
    # Output: kanga
    print(json_record['account'])

おわりに

以上、Athenaの結果をJSONで受け取る方法でした。とりあえず現状、データをサマって抽出した上で何らかプログラムの入力にする、とかそういったケースで使えなくもないかな?と思います。
一方で、この方法で出力されるファイルは無圧縮のJSONになるので、大量のレコードを出力したい場合には使わない方が良いでしょう。そういったケースでは素直にGlueを使うかEMRでクラスタを立てましょう。

Athenaが正式にJSON形式での結果出力をサポートする日を待ってます、あるいはINSERT文対応。あとは結果を圧縮して格納するオプションも出ると嬉しいなぁ。。。

kanga
うさんくさいインフラエンジニア
http://kangaat.hatenablog.com/
speee
株式会社Speeeは「解き尽くす。未来を引きよせる。」というミッションを実現すべく、中長期的な目線で企業価値を最大化させていくため、組織・事業のStyleを大切にした永続的な価値創造を目指しています。
https://www.speee.jp/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした