はじめに
統計解析や複雑な処理を施した結果をデータベースに取り込み、それを Yellowfin などの BI ツールで可視化したい要件も多々発生します。例えば、こちらの記事にあるベイズ推定を用いた計算などは、事前に Python を用いてプログラム処理しておく必要があります。
本記事では、AWS EC2 上で Python 処理した結果を、 CSV ファイルに出力し、CSV ファイルの内容を S3 経由で Redshift のテーブルにコピーする一連の処理の流れを紹介したいと思います。
主な流れは以下の通りです。
- Python 処理結果を CSV 出力 (EC2 上)
- CSV ファイルを S3 に転送
- S3 から Redshiftテーブルにデータコピー
特にデータ量が大きい場合、1 行ずつデータを Insert する処理は現実的ではないため、ファイルからテーブルにデータをコピーする処理は必須となります。
環境準備と前提
Python に以下のパッケージをインストールします。
パッケージ | 用途 | インストールコマンド |
---|---|---|
pandas | データフレームの扱い | pip install pandas |
boto3 | S3 操作 | pip install boto3 |
psycog2 | Redshift 接続 | pip install psycopg2 |
Python 処理結果を CSV 出力
ベイズ推定やクラスター解析の記事を参考に、Python で処理した結果を CSV ファイルに出力します。データフレームの内容を CSV ファイルに出力する際には、Python で以下のような記述をするだけです。
df.to_csv('result.csv', header=False, index=False)
上記例では、既存ファイルを上書きします。既存ファイルにデータを追加する場合は、オプションとして mode=’a’ を指定します。
index=True を指定すると、第一列にデータフレームの index が入ってきてしまいます。データベースにデータ連携する時は、index=False を指定しましょう。
原則として、データフレームの内容を print で確認しながらコーディングするのが確実です。
CSV ファイルを S3 に転送
boto3 ライブラリを活用して、S3 連携します。具体的なコーディング例は以下の通りです。
#モジュール読み込み
import boto3
#変数
file_name = '/usr/local/python/result.csv'
bucket_name = 'bucket-name-here'
key = 'result.txt'
#アップロード処理
client = boto3.client('s3',
aws_access_key_id='AWSACCESSKEYIDAWSACC',
aws_secret_access_key='AWSSECRETACCESSKEYAWSSECRETACCESSKEYAWSS',
region_name='region-name-here')
client.upload_file(file_name, bucket_name, key)
上記例のように、コードの中に S3 にアクセスするためのアクセスキーやシークレットキーを記述することは、セキュリティ上適切ではありません。
そのため、通常は以下の例のように、アクセスキーやシークレットキーは環境変数として別ファイルに記述します。
#モジュール読み込み
import boto3
from dotenv import load_dotenv
#環境変数
load_dotenv(verbose=True, dotenv_path='/usr/local/python/s3.env')
#変数
file_name = '/usr/local/python/result.csv'
bucket_name = 'bucket-name-here'
key = 'result.csv'
#アップロード処理
client = boto3.client('s3')
client.upload_file(file_name, bucket_name, key)
AWS_ACCESS_KEY_ID='AWSACCESSKEYIDAWSACC'
AWS_SECRET_ACCESS_KEY=' AWSSECRETACCESSKEYAWSSECRETACCESSKEYAWSS '
AWS_DEFAULT_REGION='region-name-here'
bucket_name='bucket-name-here'
環境変数として、AWS_ACCESS_KEY_ID
、AWS_SECRET_ACCESS_KEY
、AWS_DEFAULT_REGION
という名称を指定しておくだけで、Python プログラムがこれら値を環境変数から読み込みます。
環境変数に関するファイルのみならず、Python プログラム内のパスの指定は絶対パスで指定しておくべきです。理由は、cron や systemd timer でスケジュール実行する際、相対パスで指定すると、スケジューラーがファイルのパスを理解できなくなってしまうからです。
なお、EC2 インスタンス上で Python プログラムを稼働させる場合は、アクセスキーやシークレットキーを使うのではなく、ロールで権限を管理することが推奨されるようです。ロールによる権限管理を再優先しましょう。
アップロード処理が完了したら、AWS コマンドを使って、ファイルが無事アップロードされたか否かを確認します。
aws s3 ls s3://bucket-name-here
S3 から Redshift テーブルへのデータコピー
先の手順で S3 バケットにアップロードしたファイルの中身を、Redshift のテーブルにコピーします。下記例では、Redshift のアクセスするための情報を、環境変数として別ファイルに記述しています。
#!/usr/bin/python
#モジュール読み出し
import os
import psycopg2
import boto3
from dotenv import load_dotenv
#環境変数
load_dotenv(verbose=True, dotenv_path='/usr/local/python/redshift.env')
client = boto3.client('s3')
key_name = os.getenv('key_name')
secret_name = os.getenv('secret_name')
bucket_name=os.getenv('bucket_name')
#変数
target = scheme_name.table_name
source = file_name_on_s3
#Redshiftへのコピー処理
def redshift():
conn = psycopg2.connect(dbname=os.getenv('database'),port=os.getenv('port'),user=os.getenv('user'),password=os.getenv('password'),host=os.getenv('hostname'))
cur = conn.cursor();
cur.execute("begin;")
cur.execute("copy " + target + " from 's3://" + bucket_name + '/' + source + "' credentials 'aws_access_key_id=" + key_name + ";aws_secret_access_key=" + secret_name + "' delimiter ',';")
cur.execute("commit;")
redshift();
region_name='region-name-here'
hostname='hostname-here.redshift.amazonaws.com'
port='5432'
database='python-db'
user='admin_user'
password='password'
コピーを行う場合、テーブル構造とファイル構造はきっちりと合わせる必要があります。列数が合わない場合や、データが列の型と合わない場合などは、Python が STL_LOAD_ERROR を出力します。
上記エラーが出力されたときは、Redshift 上で下記クエリーを実行すると、問題の個所が明示されます。
SELECT * FROM stl_load_errors ORDER BY starttime DESC LIMIT 10;
Yellowfin からのデータ参照
アップロードされたデータを、Yellowfin から参照してチャートやダッシュボードを作成します。
Yellowfin コンテンツの作成に関する詳細は過去の記事をご確認ください。
最後に
予め統計解析など行った結果を Redshift に集約する際には、上記の方法が最も適切ではないかなとは思っています。
ちなみに、もし他にもっと良い方法があれば、是非ご教授ください。
では皆様、良いデータ分析を!