7
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Amazon Redshift Data API 使ってみた

Posted at

はじめに

本記事は Japan AWS Jr. Champions Advent Calendar 2024 シリーズ2 の22日目の記事となります。

先日 Redshift Data API を利用する機会がありましたので、その内容を整理してお伝えできればと思います。

Redshift Data API とは

Redshift Data API は、Redshift への接続方法のひとつとして提供されている機能です。本機能を利用することで、データベースドライバやネットワーク設定などの管理が不要となり、Redshift への接続を簡素化することができます。Software Development Kit (SDK) と統合されており、C++, Go, Java, PHP, Python, Rubyなど様々な言語のアプリケーションから利用することができます。

以下の図のように、Lambda, SageMaker, カスタムアプリケーションなどから Redshift へ接続する際に利用できます。

image003-2.png

本記事では、ユースケースのひとつである SageMaker から Redshift への接続を試してみようと思います。

前提条件

  • SageMaker Studio をセットアップ済みであること。
  • Redshift Serverless のワークグループと名前空間を作成済みであること。
  • Redshift Serverless において、データベース, スキーマ, テーブルを作成済みであり、テーブルにデータをロード済みであること。
  • SageMaker の IAM ロールに対して、Redshift Serverless のテーブル操作権限を許可 (GRANT) 済みであること。
    ※ ちなみに私はこの GRANT を忘れており、IAM ロールに権限付与しているのになぜ権限不足のエラーになるのかわからず時間を浪費しました...
    Redshift Data API を用いた SQL 実行で以下のようなエラーが出た方は GRANT を疑ってみましょう。
    ERROR: permission denied for relation {テーブル名}
  • 言語は Python を利用します。

使ってみよう

では SageMaker Studio から Redshift Serverless へ接続し、テーブルデータを取得してみましょう。

接続先情報を定義

Redshift Serverless のワークグループ名とデータベース名を定義しました。

workgroup = {ワークグループ名}
database = {データベース名}

Redshift Data API を利用した関数 run_sql を定義

Redshift Data API の ExecuteStatement を利用して、SQL クエリを実行し、クエリ結果を pandas データフレームとして取得する関数 run_sql を定義しました。

import boto3
import time
import pandas as pd
import numpy as np

def run_sql(sql_text):
    client = boto3.client("redshift-data")
    res = client.execute_statement(Database=database, WorkgroupName=workgroup, Sql=sql_text)
    query_id = res["Id"]
    done = False
    while not done:
        time.sleep(1)
        status_description = client.describe_statement(Id=query_id)
        status = status_description["Status"]
        if status == "FAILED":
            raise Exception('SQL query failed:' + query_id + ": " + status_description["Error"])
        elif status == "FINISHED":
            if status_description['ResultRows']>0:
                results = client.get_statement_result(Id=query_id)
                column_labels = []
                for i in range(len(results["ColumnMetadata"])): column_labels.append(results["ColumnMetadata"][i]['label'])
                records = []
                for record in results.get('Records'):
                    records.append([list(rec.values())[0] for rec in record])
                df = pd.DataFrame(np.array(records), columns=column_labels)
                return df
            else:
                return query_id

関数 run_sql を実行

いざ実行!

df = run_sql("select * from {スキーマ名}.{テーブル名}")
df.head(10)

実行結果

Redshift Serverless からテーブルデータを取得できました!

image 2024-12-22 163643.png

参考文献

おわりに

Redshift Data API の基礎的な内容をまとめてみました。
最後までご覧いただきありがとうございました!

7
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?