はじめに
QuickSightでS3に置いたcsvデータを可視化にするまでに当たって、Athena周りに初めて触れたので、クエリできるようになるまでの過程をまとめる。boto3から機械的にテーブル生成もできるようにする。
準備
- S3バケット:
s3://bd8z-temporary/data-for-athena/
- 使用するデータ:
ニュージーランドの統計データ
International trade: June 2023 quarter – CSV
https://www.stats.govt.nz/large-datasets/csv-files-for-download/
成果物:
1.データの確認
選んだデータセットを確認。csvファイル全部で大体100MBある。
2. Athenaデータベースを作る
色々言葉が厄介だが、ポイントは3つ。
- Athenaの構成要素はデータベースと、テーブル
- テーブルの単位は同じカラムのデータ
- データベースには複数のテーブルを作れる
設定を編集から、クエリ結果を保存するS3バケットを選択しておく
クエリ入力画面からデータベースを作成する。
CREATE DATABASE trial_db;
3. テーブルを作成する
3.1 マネコンから作成する場合
クエリによりテーブルを作成する。色々気を付けることがあった。
- テーブル名に使える記号は、
_
と-
で、.
は使えない。 - csvのカラムと同じキーを型ありで入力する。sql予約後はバッククウォート`を使用する。
- カラムはcsvに対して少ない場合は問題ないが不一致の場合はテーブル作成時にエラーが発生する
CREATE EXTERNAL TABLE `table_name` (
time_ref int,
account string,
code string,
country_code string,
product_type string,
`value` string,
status string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
ESCAPED BY '\\'
LINES TERMINATED BY '\n'
LOCATION
's3://bd8z-temporary/data-for-athena/'
TBLPROPERTIES (
'has_encrypted_data'='false',
'skip.header.line.count'='1'
);
3.2 boto3から作成する
データのカラムと、元のcsvデータのカラム単位でのデータ型が分かればクエリできるので、もう少し賢くテーブルを作成してみる。
3.2.1 ベタ書きでboto3でテーブル作成処理を再現
まずはboto3でSQLクエリでテーブルを作成する処理を再現する。
start_query_execution
でクエリを送れる。
import boto3
athenaclient = boto3.client("athena")
SQLString = "CREATE EXTERNAL TABLE IF NOT EXISTS \
table_name21 (time_ref int,\
account string, code string, \
country_code string, product_type string,\
value string, status string) \
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' \
ESCAPED BY '\'LINES TERMINATED BY '\n' \
LOCATION 's3://bd8z-temporary/data-for-athena/' \
TBLPROPERTIES ('has_encrypted_data'='false', \
'skip.header.line.count'='1');"
response = athenaclient.start_query_execution(
QueryString=SQLString,
QueryExecutionContext={
'Database': 'trial_db',
},
ResultConfiguration={
'OutputLocation': 's3://bd8z-temporary/',
},
WorkGroup='primary'
)
3.2.2 csvファイルの読み取りからテーブル作成までを機械的に
前章で作成したSQLString
を機械的に作る。SQLのデータ型に合わせる処理も入れておく。
import boto3
import pandas as pd
import re
class createAthenaTalbe:
def __init__(self, csvFilePath, tableName, s3QueryLogPath):
self.df = pd.read_csv(csvFilePath)
self.tableName = tableName
self.s3QueryLogPath = s3QueryLogPath
def createSQLString(self) ->None:
colList = self.df.columns.to_list()
SQLString = []
for col in colList:
col_ = "`" + col + "`"
if "float" in str(self.df[col].dtype):
SQLString.append(col_ +" float,")
elif "int" in str(self.df[col].dtype):
SQLString.append(col_ +" int,")
elif re.match(r'([0-9]{4}-[0-9]{2}-[0-9]{2}.\
[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2})', self.df["status"][1]) :
SQLString.append( + col_ + " date,")
else:
SQLString.append(col_ + " string,")
self.sqlMEssate = "CREATE EXTERNAL TABLE `" + self.tableName + "` ( " + " ".join(SQLString)[:-1] \
+ ") ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ESCAPED BY '\\\\' LINES TERMINATED BY '\\n' LOCATION '" \
+ self.s3QueryLogPath + "' TBLPROPERTIES ( 'has_encrypted_data'='false', 'skip.header.line.count'='1' )"
def startQuery(self) ->str:
athenaclient = boto3.client("athena")
response = athenaclient.start_query_execution(
QueryString=self.sqlMEssate,
QueryExecutionContext={
'Database': 'trial_db',
},
ResultConfiguration={
'OutputLocation': 's3://bd8z-temporary/',
},
WorkGroup='primary'
)
return response["QueryExecutionId"]
if __name__ == "__main__":
athObj = createAthenaTalbe("output_csv_full.csv","new_talbe_a","s3://bd8z-temporary/data-for-athena/")
athObj.createSQLString()
athObj.startQuery()
終わりに
今回は真面目に変換処理をベタ書きしたが、pythonのデータ型とSQLのデータ型を相互に変換してくれるライブラリがあると楽だと思った。いい感じのものが無いものか・・・。欠損値処理なども併せてしてくれるとありがたい。
SQLとPythonのデータ型マッピング・・・これを自動化したい
https://learn.microsoft.com/ja-jp/sql/machine-learning/python/python-libraries-and-data-types?view=sql-server-ver16