はじめに
本記事は Japan AWS Jr. Champions Advent Calendar 2024 シリーズ2 の22日目の記事となります。
先日 Redshift Data API を利用する機会がありましたので、その内容を整理してお伝えできればと思います。
Redshift Data API とは
Redshift Data API は、Redshift への接続方法のひとつとして提供されている機能です。本機能を利用することで、データベースドライバやネットワーク設定などの管理が不要となり、Redshift への接続を簡素化することができます。Software Development Kit (SDK) と統合されており、C++, Go, Java, PHP, Python, Rubyなど様々な言語のアプリケーションから利用することができます。
以下の図のように、Lambda, SageMaker, カスタムアプリケーションなどから Redshift へ接続する際に利用できます。
本記事では、ユースケースのひとつである SageMaker から Redshift への接続を試してみようと思います。
前提条件
- SageMaker Studio をセットアップ済みであること。
- Redshift Serverless のワークグループと名前空間を作成済みであること。
- Redshift Serverless において、データベース, スキーマ, テーブルを作成済みであり、テーブルにデータをロード済みであること。
- SageMaker の IAM ロールに対して、Redshift Serverless のテーブル操作権限を許可 (GRANT) 済みであること。
※ ちなみに私はこの GRANT を忘れており、IAM ロールに権限付与しているのになぜ権限不足のエラーになるのかわからず時間を浪費しました...
Redshift Data API を用いた SQL 実行で以下のようなエラーが出た方は GRANT を疑ってみましょう。
ERROR: permission denied for relation {テーブル名}
- 言語は Python を利用します。
使ってみよう
では SageMaker Studio から Redshift Serverless へ接続し、テーブルデータを取得してみましょう。
接続先情報を定義
Redshift Serverless のワークグループ名とデータベース名を定義しました。
workgroup = {ワークグループ名}
database = {データベース名}
Redshift Data API を利用した関数 run_sql を定義
Redshift Data API の ExecuteStatement を利用して、SQL クエリを実行し、クエリ結果を pandas データフレームとして取得する関数 run_sql を定義しました。
import boto3
import time
import pandas as pd
import numpy as np
def run_sql(sql_text):
client = boto3.client("redshift-data")
res = client.execute_statement(Database=database, WorkgroupName=workgroup, Sql=sql_text)
query_id = res["Id"]
done = False
while not done:
time.sleep(1)
status_description = client.describe_statement(Id=query_id)
status = status_description["Status"]
if status == "FAILED":
raise Exception('SQL query failed:' + query_id + ": " + status_description["Error"])
elif status == "FINISHED":
if status_description['ResultRows']>0:
results = client.get_statement_result(Id=query_id)
column_labels = []
for i in range(len(results["ColumnMetadata"])): column_labels.append(results["ColumnMetadata"][i]['label'])
records = []
for record in results.get('Records'):
records.append([list(rec.values())[0] for rec in record])
df = pd.DataFrame(np.array(records), columns=column_labels)
return df
else:
return query_id
関数 run_sql を実行
いざ実行!
df = run_sql("select * from {スキーマ名}.{テーブル名}")
df.head(10)
実行結果
Redshift Serverless からテーブルデータを取得できました!
参考文献
おわりに
Redshift Data API の基礎的な内容をまとめてみました。
最後までご覧いただきありがとうございました!