0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS Python 処理結果を S3 経由で Redshift にコピー

Last updated at Posted at 2024-08-05

はじめに

統計解析や複雑な処理を施した結果をデータベースに取り込み、それを Yellowfin などの BI ツールで可視化したい要件も多々発生します。例えば、こちらの記事にあるベイズ推定を用いた計算などは、事前に Python を用いてプログラム処理しておく必要があります。

本記事では、AWS EC2 上で Python 処理した結果を、 CSV ファイルに出力し、CSV ファイルの内容を S3 経由で Redshift のテーブルにコピーする一連の処理の流れを紹介したいと思います。

主な流れは以下の通りです。

  1. Python 処理結果を CSV 出力 (EC2 上)
  2. CSV ファイルを S3 に転送
  3. S3 から Redshiftテーブルにデータコピー

特にデータ量が大きい場合、1 行ずつデータを Insert する処理は現実的ではないため、ファイルからテーブルにデータをコピーする処理は必須となります。

環境準備と前提

Python に以下のパッケージをインストールします。

パッケージ 用途 インストールコマンド
pandas データフレームの扱い pip install pandas
boto3 S3 操作 pip install boto3
psycog2 Redshift 接続 pip install psycopg2

Python 処理結果を CSV 出力

ベイズ推定クラスター解析の記事を参考に、Python で処理した結果を CSV ファイルに出力します。データフレームの内容を CSV ファイルに出力する際には、Python で以下のような記述をするだけです。

CSV 書き出し
df.to_csv('result.csv', header=False, index=False)

上記例では、既存ファイルを上書きします。既存ファイルにデータを追加する場合は、オプションとして mode=’a’ を指定します。
index=True を指定すると、第一列にデータフレームの index が入ってきてしまいます。データベースにデータ連携する時は、index=False を指定しましょう。

原則として、データフレームの内容を print で確認しながらコーディングするのが確実です。

CSV ファイルを S3 に転送

boto3 ライブラリを活用して、S3 連携します。具体的なコーディング例は以下の通りです。

S3 アップロード
#モジュール読み込み
import boto3

#変数
file_name = '/usr/local/python/result.csv'
bucket_name = 'bucket-name-here'
key = 'result.txt'

#アップロード処理
client = boto3.client('s3',
                 aws_access_key_id='AWSACCESSKEYIDAWSACC',
                 aws_secret_access_key='AWSSECRETACCESSKEYAWSSECRETACCESSKEYAWSS',
                 region_name='region-name-here')
client.upload_file(file_name, bucket_name, key)

上記例のように、コードの中に S3 にアクセスするためのアクセスキーやシークレットキーを記述することは、セキュリティ上適切ではありません。
そのため、通常は以下の例のように、アクセスキーやシークレットキーは環境変数として別ファイルに記述します。

S3 アップロード
#モジュール読み込み
import boto3
from dotenv import load_dotenv

#環境変数
load_dotenv(verbose=True, dotenv_path='/usr/local/python/s3.env')

#変数
file_name = '/usr/local/python/result.csv'
bucket_name = 'bucket-name-here'
key = 'result.csv'

#アップロード処理
client = boto3.client('s3')
client.upload_file(file_name, bucket_name, key)
s3.env
AWS_ACCESS_KEY_ID='AWSACCESSKEYIDAWSACC'
AWS_SECRET_ACCESS_KEY=' AWSSECRETACCESSKEYAWSSECRETACCESSKEYAWSS '
AWS_DEFAULT_REGION='region-name-here'
bucket_name='bucket-name-here'

環境変数として、AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYAWS_DEFAULT_REGIONという名称を指定しておくだけで、Python プログラムがこれら値を環境変数から読み込みます。

環境変数に関するファイルのみならず、Python プログラム内のパスの指定は絶対パスで指定しておくべきです。理由は、cron や systemd timer でスケジュール実行する際、相対パスで指定すると、スケジューラーがファイルのパスを理解できなくなってしまうからです。

なお、EC2 インスタンス上で Python プログラムを稼働させる場合は、アクセスキーやシークレットキーを使うのではなく、ロールで権限を管理することが推奨されるようです。ロールによる権限管理を再優先しましょう。

アップロード処理が完了したら、AWS コマンドを使って、ファイルが無事アップロードされたか否かを確認します。

S3 バケット
aws s3 ls s3://bucket-name-here

S3 から Redshift テーブルへのデータコピー

先の手順で S3 バケットにアップロードしたファイルの中身を、Redshift のテーブルにコピーします。下記例では、Redshift のアクセスするための情報を、環境変数として別ファイルに記述しています。

Redshift
#!/usr/bin/python

#モジュール読み出し
import os
import psycopg2
import boto3
from dotenv import load_dotenv

#環境変数
load_dotenv(verbose=True, dotenv_path='/usr/local/python/redshift.env')
client = boto3.client('s3')
key_name = os.getenv('key_name')
secret_name = os.getenv('secret_name')
bucket_name=os.getenv('bucket_name')

#変数
target = scheme_name.table_name
source = file_name_on_s3

#Redshiftへのコピー処理
def redshift():
    conn = psycopg2.connect(dbname=os.getenv('database'),port=os.getenv('port'),user=os.getenv('user'),password=os.getenv('password'),host=os.getenv('hostname'))
    cur = conn.cursor();
    cur.execute("begin;")
    cur.execute("copy " + target + " from 's3://" + bucket_name + '/'  + source + "' credentials 'aws_access_key_id=" + key_name + ";aws_secret_access_key=" + secret_name + "' delimiter ',';")
    cur.execute("commit;")

redshift();
redshift.env
region_name='region-name-here'
hostname='hostname-here.redshift.amazonaws.com'
port='5432'
database='python-db'
user='admin_user'
password='password'

コピーを行う場合、テーブル構造とファイル構造はきっちりと合わせる必要があります。列数が合わない場合や、データが列の型と合わない場合などは、Python が STL_LOAD_ERROR を出力します。
上記エラーが出力されたときは、Redshift 上で下記クエリーを実行すると、問題の個所が明示されます。

stl_load_errors
SELECT * FROM stl_load_errors ORDER BY starttime DESC LIMIT 10;

Yellowfin からのデータ参照

アップロードされたデータを、Yellowfin から参照してチャートやダッシュボードを作成します。
Yellowfin コンテンツの作成に関する詳細は過去の記事をご確認ください。

最後に

予め統計解析など行った結果を Redshift に集約する際には、上記の方法が最も適切ではないかなとは思っています。
ちなみに、もし他にもっと良い方法があれば、是非ご教授ください。

では皆様、良いデータ分析を!

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?