0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

SnowflakeをPythonから操作する — snowflake-connectorの基本

0
Posted at

はじめに

Snowflakeの概念を整理したので、次はPythonから実際に操作する。

データ処理案件ではPythonでデータを取得・加工してSnowflakeに書き戻す、というパターンが多い。snowflake-connector-pythonというライブラリを使うのが基本で、pandasとの連携も充実している。


インストール

# 基本のコネクター
pip install snowflake-connector-python

# pandas連携を使う場合
pip install "snowflake-connector-python[pandas]"

# SQLAlchemyと連携する場合
pip install snowflake-sqlalchemy

接続の基本

import snowflake.connector

conn = snowflake.connector.connect(
    account   = "your-account-id",    # xxxx.ap-northeast-1.aws のような形式
    user      = "your-username",
    password  = "your-password",
    warehouse = "my_warehouse",
    database  = "my_database",
    schema    = "my_schema",
)

cursor = conn.cursor()

try:
    cursor.execute("SELECT CURRENT_VERSION()")
    row = cursor.fetchone()
    print(f"Snowflakeのバージョン: {row[0]}")
finally:
    cursor.close()
    conn.close()

接続情報のaccountはSnowflakeのURLから取得できる。https://xxxx.ap-northeast-1.aws.snowflakecomputing.comならxxxx.ap-northeast-1.awsの部分。

環境変数から読む

import os
import snowflake.connector
from dotenv import load_dotenv

load_dotenv()

conn = snowflake.connector.connect(
    account   = os.environ["SNOWFLAKE_ACCOUNT"],
    user      = os.environ["SNOWFLAKE_USER"],
    password  = os.environ["SNOWFLAKE_PASSWORD"],
    warehouse = os.environ["SNOWFLAKE_WAREHOUSE"],
    database  = os.environ["SNOWFLAKE_DATABASE"],
    schema    = os.environ["SNOWFLAKE_SCHEMA"],
)
# .env
SNOWFLAKE_ACCOUNT=xxxx.ap-northeast-1.aws
SNOWFLAKE_USER=myuser
SNOWFLAKE_PASSWORD=mypassword
SNOWFLAKE_WAREHOUSE=my_warehouse
SNOWFLAKE_DATABASE=my_database
SNOWFLAKE_SCHEMA=my_schema

contextmanagerで接続管理

毎回conn.close()を書くのは面倒なので、withブロックで管理する。

import snowflake.connector
from contextlib import contextmanager
from typing     import Generator

@contextmanager
def get_snowflake_connection() -> Generator:
    conn = snowflake.connector.connect(
        account   = os.environ["SNOWFLAKE_ACCOUNT"],
        user      = os.environ["SNOWFLAKE_USER"],
        password  = os.environ["SNOWFLAKE_PASSWORD"],
        warehouse = os.environ["SNOWFLAKE_WAREHOUSE"],
        database  = os.environ["SNOWFLAKE_DATABASE"],
        schema    = os.environ["SNOWFLAKE_SCHEMA"],
    )
    try:
        yield conn
    finally:
        conn.close()

# 使う側
with get_snowflake_connection() as conn:
    cursor = conn.cursor()
    cursor.execute("SELECT COUNT(*) FROM orders")
    print(cursor.fetchone()[0])

基本的なクエリ操作

SELECT

with get_snowflake_connection() as conn:
    cursor = conn.cursor()

    # 全件取得
    cursor.execute("SELECT id, name, email FROM users LIMIT 10")
    rows = cursor.fetchall()
    for row in rows:
        print(row)  # (1, '田中', 'tanaka@example.com')

    # 1件取得
    cursor.execute("SELECT * FROM users WHERE id = 1")
    row = cursor.fetchone()

    # 列名付きで取得
    cursor.execute("SELECT id, name, email FROM users")
    columns = [desc[0] for desc in cursor.description]
    rows    = cursor.fetchall()
    for row in rows:
        record = dict(zip(columns, row))
        print(record)  # {'ID': 1, 'NAME': '田中', 'EMAIL': '...'}

cursor.descriptionでカラム名が取れる。Snowflakeはデフォルトでカラム名が大文字になるので注意。

バインド変数でSQLインジェクションを防ぐ

# 悪い例(SQLインジェクションの危険)
user_id = 1
cursor.execute(f"SELECT * FROM users WHERE id = {user_id}")

# 良い例(バインド変数を使う)
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))

# 複数のパラメータ
cursor.execute(
    "SELECT * FROM users WHERE name = %s AND is_active = %s",
    ("田中", True)
)

# 辞書形式でも渡せる
cursor.execute(
    "SELECT * FROM users WHERE id = %(user_id)s",
    {"user_id": 1}
)

PHPのPDOでプリペアドステートメントを使うのと同じ理由。文字列結合でSQLを組み立てるのは絶対に避ける。

INSERT / UPDATE / DELETE

with get_snowflake_connection() as conn:
    cursor = conn.cursor()

    # INSERT
    cursor.execute(
        "INSERT INTO users (name, email) VALUES (%s, %s)",
        ("田中", "tanaka@example.com")
    )
    conn.commit()

    # UPDATE
    cursor.execute(
        "UPDATE users SET name = %s WHERE id = %s",
        ("田中太郎", 1)
    )
    conn.commit()

    # DELETE
    cursor.execute("DELETE FROM users WHERE id = %s", (1,))
    conn.commit()

    # 影響件数の確認
    print(f"影響件数: {cursor.rowcount}")

大量データの挿入

1件ずつINSERTするのは非効率。executemany()write_pandas()を使う。

executemany

users = [
    ("田中", "tanaka@example.com", 28),
    ("鈴木", "suzuki@example.com", 35),
    ("佐藤", "sato@example.com",   42),
]

with get_snowflake_connection() as conn:
    cursor = conn.cursor()
    cursor.executemany(
        "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)",
        users
    )
    conn.commit()
    print(f"挿入件数: {cursor.rowcount}")

write_pandas — pandasとの連携

大量データはpandas DataFrameから直接書き込むのが最も効率的。

import pandas as pd
from snowflake.connector.pandas_tools import write_pandas

df = pd.DataFrame({
    "NAME":  ["田中", "鈴木", "佐藤"],
    "EMAIL": ["tanaka@example.com", "suzuki@example.com", "sato@example.com"],
    "AGE":   [28, 35, 42],
})

with get_snowflake_connection() as conn:
    success, num_chunks, num_rows, output = write_pandas(
        conn,
        df,
        table_name    = "USERS",
        database      = "MY_DATABASE",
        schema        = "MY_SCHEMA",
        overwrite     = False,  # Trueにするとテーブルを再作成
        auto_create_table = False,
    )
    print(f"成功: {success}, 行数: {num_rows}")

write_pandas()はSnowflakeの内部転送機能を使うので、executemany()より数倍高速。大量データを書き込む場合はこちらを使う。


DataFrameとして取得する

クエリ結果をpandas DataFrameで受け取る。

import pandas as pd

with get_snowflake_connection() as conn:
    # fetch_pandas_allで直接DataFrameを取得
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM orders WHERE created_at >= '2024-01-01'")
    df = cursor.fetch_pandas_all()

    print(df.head())
    print(df.dtypes)
# pd.read_sqlでも取得できる
import pandas as pd
import snowflake.connector

conn = snowflake.connector.connect(...)

df = pd.read_sql(
    "SELECT * FROM orders WHERE created_at >= '2024-01-01'",
    conn
)

fetch_pandas_all()は内部でArrowフォーマットを使うので、通常のfetchall()より高速にDataFrameに変換できる。大量データを取得するときは必ずこちらを使う。


SQLAlchemyと組み合わせる

FastAPIやSQLAlchemyと組み合わせる場合はsnowflake-sqlalchemyを使う。

pip install snowflake-sqlalchemy
from sqlalchemy             import create_engine, text
from sqlalchemy.orm         import sessionmaker

# 接続文字列
SNOWFLAKE_URL = (
    "snowflake://{user}:{password}@{account}/{database}/{schema}"
    "?warehouse={warehouse}"
).format(
    user      = os.environ["SNOWFLAKE_USER"],
    password  = os.environ["SNOWFLAKE_PASSWORD"],
    account   = os.environ["SNOWFLAKE_ACCOUNT"],
    database  = os.environ["SNOWFLAKE_DATABASE"],
    schema    = os.environ["SNOWFLAKE_SCHEMA"],
    warehouse = os.environ["SNOWFLAKE_WAREHOUSE"],
)

engine = create_engine(SNOWFLAKE_URL)

# pandasで読み込む
df = pd.read_sql("SELECT * FROM orders LIMIT 100", engine)

# SQLAlchemyのセッションを使う
Session = sessionmaker(bind=engine)
with Session() as session:
    result = session.execute(text("SELECT COUNT(*) FROM orders"))
    print(result.fetchone()[0])

SQLAlchemy経由で接続すると、FastAPIのDependsでDB接続を管理する既存のパターンがそのまま使える。


接続プールの設定

本番環境では接続プールを設定する。

from sqlalchemy import create_engine, event
from sqlalchemy.pool import QueuePool

engine = create_engine(
    SNOWFLAKE_URL,
    poolclass    = QueuePool,
    pool_size    = 5,
    max_overflow = 10,
    pool_timeout = 30,
    pool_recycle = 1800,  # 30分で接続をリサイクル
)

# 接続前にウェアハウスを起動するイベントフック
@event.listens_for(engine, "connect")
def connect(dbapi_connection, connection_record):
    cursor = dbapi_connection.cursor()
    cursor.execute("ALTER WAREHOUSE my_warehouse RESUME IF SUSPENDED")
    cursor.close()

よく使うパターン集

大量データを分割して処理する

def process_in_chunks(
    conn,
    query: str,
    chunk_size: int = 10000,
):
    cursor = conn.cursor()
    cursor.execute(query)

    while True:
        rows = cursor.fetchmany(chunk_size)
        if not rows:
            break

        df = pd.DataFrame(rows, columns=[desc[0] for desc in cursor.description])
        yield df

# 使う側
with get_snowflake_connection() as conn:
    for chunk_df in process_in_chunks(conn, "SELECT * FROM large_table"):
        # 10,000件ずつ処理
        process(chunk_df)
        print(f"{len(chunk_df)}件処理完了")

メモリに載り切らない大量データを処理するときのパターン。pandasのread_sql(chunksize=...)に相当する。

マルチステートメントの実行

# 複数のSQLをまとめて実行
sql = """
BEGIN;
    UPDATE orders SET status = 'shipped' WHERE id IN (1, 2, 3);
    INSERT INTO order_logs (order_id, action) VALUES (1, 'shipped');
COMMIT;
"""

with get_snowflake_connection() as conn:
    conn.execute_string(sql)

execute_string()でセミコロン区切りの複数SQLを一度に実行できる。

クエリのキャンセル

import threading

def cancel_long_query(cursor, timeout_seconds: int = 30):
    """タイムアウト付きでクエリを実行する"""
    def cancel():
        cursor.cancel()

    timer = threading.Timer(timeout_seconds, cancel)
    timer.start()
    try:
        cursor.execute("SELECT * FROM very_large_table")
        result = cursor.fetchall()
        timer.cancel()
        return result
    except Exception as e:
        print(f"クエリがキャンセルされました: {e}")
        return None

エラーハンドリング

import snowflake.connector
from snowflake.connector.errors import (
    DatabaseError,
    InterfaceError,
    ProgrammingError,
    OperationalError,
)

try:
    with get_snowflake_connection() as conn:
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM non_existent_table")

except ProgrammingError as e:
    # SQLの構文エラー、テーブルが存在しないなど
    print(f"SQLエラー: {e.msg}")
    print(f"SQLState: {e.sqlstate}")

except OperationalError as e:
    # 接続エラー、ウェアハウスが停止しているなど
    print(f"接続エラー: {e.msg}")

except DatabaseError as e:
    # その他のDBエラー
    print(f"DBエラー: {e.msg}")

実務でよく使うコード集

ETLパイプラインの基本パターン

import pandas as pd
from snowflake.connector.pandas_tools import write_pandas

def run_etl_pipeline():
    """外部APIからデータを取得してSnowflakeに書き込む"""

    # ① Extract — データを取得
    df = fetch_from_api()
    print(f"取得件数: {len(df)}")

    # ② Transform — データを変換・クレンジング
    df = df.rename(columns=str.upper)           # カラム名を大文字に
    df["CREATED_AT"] = pd.Timestamp.now()       # 取得日時を追加
    df = df.drop_duplicates(subset=["USER_ID"]) # 重複を除去
    df = df.dropna(subset=["USER_ID", "NAME"])  # 必須項目のNULLを除去
    print(f"変換後件数: {len(df)}")

    # ③ Load — Snowflakeに書き込む
    with get_snowflake_connection() as conn:
        # 一時テーブルに書き込んでからMERGE
        write_pandas(conn, df, "USERS_STAGING", overwrite=True)

        cursor = conn.cursor()
        cursor.execute("""
            MERGE INTO users AS target
            USING users_staging AS source
            ON target.user_id = source.user_id
            WHEN MATCHED THEN
                UPDATE SET
                    name       = source.name,
                    updated_at = CURRENT_TIMESTAMP()
            WHEN NOT MATCHED THEN
                INSERT (user_id, name, created_at)
                VALUES (source.user_id, source.name, CURRENT_TIMESTAMP())
        """)
        conn.commit()
        print(f"MERGE完了: {cursor.rowcount}")

run_etl_pipeline()

MERGE文(UPSERT)はSnowflakeでデータを取り込むときによく使うパターン。既存データは更新、新規データは挿入というETLの定番処理。


まとめ

操作 メソッド
接続 snowflake.connector.connect()
クエリ実行 cursor.execute()
全件取得 cursor.fetchall()
DataFrame取得 cursor.fetch_pandas_all()
大量INSERT write_pandas()
複数SQL実行 conn.execute_string()
バインド変数 %s または %(key)s
  • カラム名はデフォルトで大文字になる
  • バインド変数で必ずSQLインジェクション対策をする
  • 大量データの書き込みはwrite_pandas()が最速
  • 大量データの読み込みはfetch_pandas_all()を使う
  • 接続管理はcontextmanagerでまとめると楽
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?