はじめに
Snowflakeの概念を整理したので、次はPythonから実際に操作する。
データ処理案件ではPythonでデータを取得・加工してSnowflakeに書き戻す、というパターンが多い。snowflake-connector-pythonというライブラリを使うのが基本で、pandasとの連携も充実している。
インストール
# 基本のコネクター
pip install snowflake-connector-python
# pandas連携を使う場合
pip install "snowflake-connector-python[pandas]"
# SQLAlchemyと連携する場合
pip install snowflake-sqlalchemy
接続の基本
import snowflake.connector
conn = snowflake.connector.connect(
account = "your-account-id", # xxxx.ap-northeast-1.aws のような形式
user = "your-username",
password = "your-password",
warehouse = "my_warehouse",
database = "my_database",
schema = "my_schema",
)
cursor = conn.cursor()
try:
cursor.execute("SELECT CURRENT_VERSION()")
row = cursor.fetchone()
print(f"Snowflakeのバージョン: {row[0]}")
finally:
cursor.close()
conn.close()
接続情報のaccountはSnowflakeのURLから取得できる。https://xxxx.ap-northeast-1.aws.snowflakecomputing.comならxxxx.ap-northeast-1.awsの部分。
環境変数から読む
import os
import snowflake.connector
from dotenv import load_dotenv
load_dotenv()
conn = snowflake.connector.connect(
account = os.environ["SNOWFLAKE_ACCOUNT"],
user = os.environ["SNOWFLAKE_USER"],
password = os.environ["SNOWFLAKE_PASSWORD"],
warehouse = os.environ["SNOWFLAKE_WAREHOUSE"],
database = os.environ["SNOWFLAKE_DATABASE"],
schema = os.environ["SNOWFLAKE_SCHEMA"],
)
# .env
SNOWFLAKE_ACCOUNT=xxxx.ap-northeast-1.aws
SNOWFLAKE_USER=myuser
SNOWFLAKE_PASSWORD=mypassword
SNOWFLAKE_WAREHOUSE=my_warehouse
SNOWFLAKE_DATABASE=my_database
SNOWFLAKE_SCHEMA=my_schema
contextmanagerで接続管理
毎回conn.close()を書くのは面倒なので、withブロックで管理する。
import snowflake.connector
from contextlib import contextmanager
from typing import Generator
@contextmanager
def get_snowflake_connection() -> Generator:
conn = snowflake.connector.connect(
account = os.environ["SNOWFLAKE_ACCOUNT"],
user = os.environ["SNOWFLAKE_USER"],
password = os.environ["SNOWFLAKE_PASSWORD"],
warehouse = os.environ["SNOWFLAKE_WAREHOUSE"],
database = os.environ["SNOWFLAKE_DATABASE"],
schema = os.environ["SNOWFLAKE_SCHEMA"],
)
try:
yield conn
finally:
conn.close()
# 使う側
with get_snowflake_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM orders")
print(cursor.fetchone()[0])
基本的なクエリ操作
SELECT
with get_snowflake_connection() as conn:
cursor = conn.cursor()
# 全件取得
cursor.execute("SELECT id, name, email FROM users LIMIT 10")
rows = cursor.fetchall()
for row in rows:
print(row) # (1, '田中', 'tanaka@example.com')
# 1件取得
cursor.execute("SELECT * FROM users WHERE id = 1")
row = cursor.fetchone()
# 列名付きで取得
cursor.execute("SELECT id, name, email FROM users")
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
for row in rows:
record = dict(zip(columns, row))
print(record) # {'ID': 1, 'NAME': '田中', 'EMAIL': '...'}
cursor.descriptionでカラム名が取れる。Snowflakeはデフォルトでカラム名が大文字になるので注意。
バインド変数でSQLインジェクションを防ぐ
# 悪い例(SQLインジェクションの危険)
user_id = 1
cursor.execute(f"SELECT * FROM users WHERE id = {user_id}")
# 良い例(バインド変数を使う)
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
# 複数のパラメータ
cursor.execute(
"SELECT * FROM users WHERE name = %s AND is_active = %s",
("田中", True)
)
# 辞書形式でも渡せる
cursor.execute(
"SELECT * FROM users WHERE id = %(user_id)s",
{"user_id": 1}
)
PHPのPDOでプリペアドステートメントを使うのと同じ理由。文字列結合でSQLを組み立てるのは絶対に避ける。
INSERT / UPDATE / DELETE
with get_snowflake_connection() as conn:
cursor = conn.cursor()
# INSERT
cursor.execute(
"INSERT INTO users (name, email) VALUES (%s, %s)",
("田中", "tanaka@example.com")
)
conn.commit()
# UPDATE
cursor.execute(
"UPDATE users SET name = %s WHERE id = %s",
("田中太郎", 1)
)
conn.commit()
# DELETE
cursor.execute("DELETE FROM users WHERE id = %s", (1,))
conn.commit()
# 影響件数の確認
print(f"影響件数: {cursor.rowcount}")
大量データの挿入
1件ずつINSERTするのは非効率。executemany()かwrite_pandas()を使う。
executemany
users = [
("田中", "tanaka@example.com", 28),
("鈴木", "suzuki@example.com", 35),
("佐藤", "sato@example.com", 42),
]
with get_snowflake_connection() as conn:
cursor = conn.cursor()
cursor.executemany(
"INSERT INTO users (name, email, age) VALUES (%s, %s, %s)",
users
)
conn.commit()
print(f"挿入件数: {cursor.rowcount}")
write_pandas — pandasとの連携
大量データはpandas DataFrameから直接書き込むのが最も効率的。
import pandas as pd
from snowflake.connector.pandas_tools import write_pandas
df = pd.DataFrame({
"NAME": ["田中", "鈴木", "佐藤"],
"EMAIL": ["tanaka@example.com", "suzuki@example.com", "sato@example.com"],
"AGE": [28, 35, 42],
})
with get_snowflake_connection() as conn:
success, num_chunks, num_rows, output = write_pandas(
conn,
df,
table_name = "USERS",
database = "MY_DATABASE",
schema = "MY_SCHEMA",
overwrite = False, # Trueにするとテーブルを再作成
auto_create_table = False,
)
print(f"成功: {success}, 行数: {num_rows}")
write_pandas()はSnowflakeの内部転送機能を使うので、executemany()より数倍高速。大量データを書き込む場合はこちらを使う。
DataFrameとして取得する
クエリ結果をpandas DataFrameで受け取る。
import pandas as pd
with get_snowflake_connection() as conn:
# fetch_pandas_allで直接DataFrameを取得
cursor = conn.cursor()
cursor.execute("SELECT * FROM orders WHERE created_at >= '2024-01-01'")
df = cursor.fetch_pandas_all()
print(df.head())
print(df.dtypes)
# pd.read_sqlでも取得できる
import pandas as pd
import snowflake.connector
conn = snowflake.connector.connect(...)
df = pd.read_sql(
"SELECT * FROM orders WHERE created_at >= '2024-01-01'",
conn
)
fetch_pandas_all()は内部でArrowフォーマットを使うので、通常のfetchall()より高速にDataFrameに変換できる。大量データを取得するときは必ずこちらを使う。
SQLAlchemyと組み合わせる
FastAPIやSQLAlchemyと組み合わせる場合はsnowflake-sqlalchemyを使う。
pip install snowflake-sqlalchemy
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
# 接続文字列
SNOWFLAKE_URL = (
"snowflake://{user}:{password}@{account}/{database}/{schema}"
"?warehouse={warehouse}"
).format(
user = os.environ["SNOWFLAKE_USER"],
password = os.environ["SNOWFLAKE_PASSWORD"],
account = os.environ["SNOWFLAKE_ACCOUNT"],
database = os.environ["SNOWFLAKE_DATABASE"],
schema = os.environ["SNOWFLAKE_SCHEMA"],
warehouse = os.environ["SNOWFLAKE_WAREHOUSE"],
)
engine = create_engine(SNOWFLAKE_URL)
# pandasで読み込む
df = pd.read_sql("SELECT * FROM orders LIMIT 100", engine)
# SQLAlchemyのセッションを使う
Session = sessionmaker(bind=engine)
with Session() as session:
result = session.execute(text("SELECT COUNT(*) FROM orders"))
print(result.fetchone()[0])
SQLAlchemy経由で接続すると、FastAPIのDependsでDB接続を管理する既存のパターンがそのまま使える。
接続プールの設定
本番環境では接続プールを設定する。
from sqlalchemy import create_engine, event
from sqlalchemy.pool import QueuePool
engine = create_engine(
SNOWFLAKE_URL,
poolclass = QueuePool,
pool_size = 5,
max_overflow = 10,
pool_timeout = 30,
pool_recycle = 1800, # 30分で接続をリサイクル
)
# 接続前にウェアハウスを起動するイベントフック
@event.listens_for(engine, "connect")
def connect(dbapi_connection, connection_record):
cursor = dbapi_connection.cursor()
cursor.execute("ALTER WAREHOUSE my_warehouse RESUME IF SUSPENDED")
cursor.close()
よく使うパターン集
大量データを分割して処理する
def process_in_chunks(
conn,
query: str,
chunk_size: int = 10000,
):
cursor = conn.cursor()
cursor.execute(query)
while True:
rows = cursor.fetchmany(chunk_size)
if not rows:
break
df = pd.DataFrame(rows, columns=[desc[0] for desc in cursor.description])
yield df
# 使う側
with get_snowflake_connection() as conn:
for chunk_df in process_in_chunks(conn, "SELECT * FROM large_table"):
# 10,000件ずつ処理
process(chunk_df)
print(f"{len(chunk_df)}件処理完了")
メモリに載り切らない大量データを処理するときのパターン。pandasのread_sql(chunksize=...)に相当する。
マルチステートメントの実行
# 複数のSQLをまとめて実行
sql = """
BEGIN;
UPDATE orders SET status = 'shipped' WHERE id IN (1, 2, 3);
INSERT INTO order_logs (order_id, action) VALUES (1, 'shipped');
COMMIT;
"""
with get_snowflake_connection() as conn:
conn.execute_string(sql)
execute_string()でセミコロン区切りの複数SQLを一度に実行できる。
クエリのキャンセル
import threading
def cancel_long_query(cursor, timeout_seconds: int = 30):
"""タイムアウト付きでクエリを実行する"""
def cancel():
cursor.cancel()
timer = threading.Timer(timeout_seconds, cancel)
timer.start()
try:
cursor.execute("SELECT * FROM very_large_table")
result = cursor.fetchall()
timer.cancel()
return result
except Exception as e:
print(f"クエリがキャンセルされました: {e}")
return None
エラーハンドリング
import snowflake.connector
from snowflake.connector.errors import (
DatabaseError,
InterfaceError,
ProgrammingError,
OperationalError,
)
try:
with get_snowflake_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM non_existent_table")
except ProgrammingError as e:
# SQLの構文エラー、テーブルが存在しないなど
print(f"SQLエラー: {e.msg}")
print(f"SQLState: {e.sqlstate}")
except OperationalError as e:
# 接続エラー、ウェアハウスが停止しているなど
print(f"接続エラー: {e.msg}")
except DatabaseError as e:
# その他のDBエラー
print(f"DBエラー: {e.msg}")
実務でよく使うコード集
ETLパイプラインの基本パターン
import pandas as pd
from snowflake.connector.pandas_tools import write_pandas
def run_etl_pipeline():
"""外部APIからデータを取得してSnowflakeに書き込む"""
# ① Extract — データを取得
df = fetch_from_api()
print(f"取得件数: {len(df)}")
# ② Transform — データを変換・クレンジング
df = df.rename(columns=str.upper) # カラム名を大文字に
df["CREATED_AT"] = pd.Timestamp.now() # 取得日時を追加
df = df.drop_duplicates(subset=["USER_ID"]) # 重複を除去
df = df.dropna(subset=["USER_ID", "NAME"]) # 必須項目のNULLを除去
print(f"変換後件数: {len(df)}")
# ③ Load — Snowflakeに書き込む
with get_snowflake_connection() as conn:
# 一時テーブルに書き込んでからMERGE
write_pandas(conn, df, "USERS_STAGING", overwrite=True)
cursor = conn.cursor()
cursor.execute("""
MERGE INTO users AS target
USING users_staging AS source
ON target.user_id = source.user_id
WHEN MATCHED THEN
UPDATE SET
name = source.name,
updated_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN
INSERT (user_id, name, created_at)
VALUES (source.user_id, source.name, CURRENT_TIMESTAMP())
""")
conn.commit()
print(f"MERGE完了: {cursor.rowcount}件")
run_etl_pipeline()
MERGE文(UPSERT)はSnowflakeでデータを取り込むときによく使うパターン。既存データは更新、新規データは挿入というETLの定番処理。
まとめ
| 操作 | メソッド |
|---|---|
| 接続 | snowflake.connector.connect() |
| クエリ実行 | cursor.execute() |
| 全件取得 | cursor.fetchall() |
| DataFrame取得 | cursor.fetch_pandas_all() |
| 大量INSERT | write_pandas() |
| 複数SQL実行 | conn.execute_string() |
| バインド変数 |
%s または %(key)s
|
- カラム名はデフォルトで大文字になる
- バインド変数で必ずSQLインジェクション対策をする
- 大量データの書き込みは
write_pandas()が最速 - 大量データの読み込みは
fetch_pandas_all()を使う - 接続管理は
contextmanagerでまとめると楽