セキュリティの観点から、AWS Redshiftへは踏み台経由でSSH接続する場合が多いと思います。
そこで、今回はデータサイエンティスト・アナリストが良く使うであろうpandas.read_sql()
や、ORMのSQLAlchemyを利用したpandas.DataFrameのto_sql()
を、踏み台経由でSSH接続したRedshiftで使用する方法を紹介します。
ライブラリ
psycopg2-binary==2.9.1
sqlalchemy==1.4.25
sshtunnel==0.4.0
メソッド定義
新たにread_sql, to_sqlメソッドを定義する。
import psycopg2
from sqlalchemy import create_engine
from sshtunnel import SSHTunnelForwarder
def read_sql(ssh_host, ssh_port, ssh_username, ssh_pkey, dbhost, dbport, dbuser, dbpassword, database, query):
# Redshiftに接続
with SSHTunnelForwarder(
(ssh_host, ssh_port),
ssh_username=ssh_username,
# 秘密鍵
ssh_pkey=ssh_pkey,
remote_bind_address=(dbhost, dbport),
) as server:
conn = {
'host': 'localhost',
'port': server.local_bind_port,
'user': dbuser,
'password': dbpassword,
'database': database
}
with psycopg2.connect(**conn) as conn:
return pd.read_sql(query, con=conn)
def to_sql(ssh_host, ssh_port, ssh_username, ssh_pkey, dbhost, dbport, dbuser, dbpassword, database, schema, table_name, df, if_exists):
# Redshiftに接続
with SSHTunnelForwarder(
(ssh_host, ssh_port),
ssh_username=ssh_username,
# 秘密鍵
ssh_pkey=ssh_pkey,
remote_bind_address=(dbhost, dbport),
) as server:
engine = create_engine(f'postgresql://{user}:{password}@localhost:{server.local_bind_port}/{database}')
df.to_sql(table_name, engine, schema=schema, index=False, if_exists=if_exists)
使い方
接続情報
踏み台サーバー、接続先のDBの情報を記載する。
config = {
# ssh config
'ssh_host': '*****',
'ssh_port': *****,
'ssh_username': '*****',
'ssh_pkey': '*****', # 秘密鍵の絶対パス
# db config
'dbhost': '*****',
'dbport': *****,
'database': '*****',
'dbuser': '*****',
'dbpassword': '*****'
}
read
# read sql from redshift
read_sql(**config, query='select top 10 * from hoge.fuga')
write
# create from pandas DataFrame into redshift
df = pd.DataFrame([{'Name': 'Ram', 'Age': 50},
{'Name': 'Bhim', 'Age': 23},
{'Name': 'Shyam', 'Age': 25}]
)
to_sql(**config, schema='sandbox', table_name='test', df=df, if_exists='replace')