0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Snowflake + S3 + pandas — データパイプラインの基本構成

0
Posted at

はじめに

前の記事でSnowflakeのSTAGE + COPY INTOを触った。

今回はS3・Snowflake・pandasを組み合わせた実務でよく見るデータパイプラインの基本構成をまとめる。「S3にあるCSVを取り込んでSnowflakeで集計してS3に返す」というパターンが実際の案件で出てきたので、その構成を整理した。


全体構成

[データソース]
    ↓
[S3バケット]  ← CSVやJSONが蓄積される
    ↓ COPY INTO
[Snowflake]   ← 生データを格納・集計
    ↓ snowflake-connector
[Python/pandas] ← 加工・分析
    ↓ boto3
[S3バケット]  ← 集計結果を保存

これを実現するために使うライブラリ:

pip install snowflake-connector-python
pip install "snowflake-connector-python[pandas]"
pip install boto3
pip install pandas
pip install python-dotenv

環境設定

# config.py
import os
from dataclasses import dataclass
from dotenv      import load_dotenv

load_dotenv()

@dataclass
class SnowflakeConfig:
    account:   str = os.environ["SNOWFLAKE_ACCOUNT"]
    user:      str = os.environ["SNOWFLAKE_USER"]
    password:  str = os.environ["SNOWFLAKE_PASSWORD"]
    warehouse: str = os.environ["SNOWFLAKE_WAREHOUSE"]
    database:  str = os.environ["SNOWFLAKE_DATABASE"]
    schema:    str = os.environ["SNOWFLAKE_SCHEMA"]

@dataclass
class S3Config:
    bucket:           str = os.environ["S3_BUCKET"]
    region:           str = os.environ.get("AWS_REGION", "ap-northeast-1")
    aws_access_key:   str = os.environ.get("AWS_ACCESS_KEY_ID", "")
    aws_secret_key:   str = os.environ.get("AWS_SECRET_ACCESS_KEY", "")
# .env
SNOWFLAKE_ACCOUNT=xxxx.ap-northeast-1.aws
SNOWFLAKE_USER=myuser
SNOWFLAKE_PASSWORD=mypassword
SNOWFLAKE_WAREHOUSE=my_warehouse
SNOWFLAKE_DATABASE=my_database
SNOWFLAKE_SCHEMA=my_schema

S3_BUCKET=my-data-bucket
AWS_REGION=ap-northeast-1
AWS_ACCESS_KEY_ID=your-key
AWS_SECRET_ACCESS_KEY=your-secret

S3クライアントの構築

# s3_client.py
import boto3
import pandas as pd
import io
from pathlib     import Path
from contextlib  import contextmanager
from config      import S3Config

class S3Client:
    def __init__(self, config: S3Config):
        self.config = config
        self.client = boto3.client(
            "s3",
            region_name          = config.region,
            aws_access_key_id    = config.aws_access_key or None,
            aws_secret_access_key = config.aws_secret_key or None,
            # 値が空の場合はNoneにしてIAMロールを使う
        )
        self.bucket = config.bucket

    def list_files(self, prefix: str) -> list[str]:
        """指定プレフィックスのファイル一覧を取得"""
        response = self.client.list_objects_v2(
            Bucket = self.bucket,
            Prefix = prefix,
        )
        if "Contents" not in response:
            return []
        return [obj["Key"] for obj in response["Contents"]]

    def read_csv(self, key: str, **kwargs) -> pd.DataFrame:
        """S3のCSVをDataFrameとして読み込む"""
        response = self.client.get_object(Bucket=self.bucket, Key=key)
        return pd.read_csv(
            io.BytesIO(response["Body"].read()),
            **kwargs
        )

    def read_parquet(self, key: str) -> pd.DataFrame:
        """S3のParquetをDataFrameとして読み込む"""
        response = self.client.get_object(Bucket=self.bucket, Key=key)
        return pd.read_parquet(io.BytesIO(response["Body"].read()))

    def write_csv(
        self,
        df:  pd.DataFrame,
        key: str,
        index: bool = False,
    ) -> None:
        """DataFrameをCSVとしてS3に書き込む"""
        buffer = io.StringIO()
        df.to_csv(buffer, index=index, encoding="utf-8")
        self.client.put_object(
            Bucket      = self.bucket,
            Key         = key,
            Body        = buffer.getvalue().encode("utf-8"),
            ContentType = "text/csv",
        )
        print(f"S3に書き込み完了: s3://{self.bucket}/{key}")

    def write_parquet(self, df: pd.DataFrame, key: str) -> None:
        """DataFrameをParquetとしてS3に書き込む"""
        buffer = io.BytesIO()
        df.to_parquet(buffer, index=False, engine="pyarrow")
        buffer.seek(0)
        self.client.put_object(
            Bucket      = self.bucket,
            Key         = key,
            Body        = buffer.getvalue(),
            ContentType = "application/octet-stream",
        )
        print(f"S3に書き込み完了: s3://{self.bucket}/{key}")

    def download_file(self, key: str, local_path: str) -> None:
        """S3のファイルをローカルにダウンロード"""
        self.client.download_file(self.bucket, key, local_path)

    def upload_file(self, local_path: str, key: str) -> None:
        """ローカルのファイルをS3にアップロード"""
        self.client.upload_file(local_path, self.bucket, key)

Snowflakeクライアントの構築

# snowflake_client.py
import snowflake.connector
import pandas as pd
from contextlib  import contextmanager
from typing      import Generator
from config      import SnowflakeConfig
from snowflake.connector.pandas_tools import write_pandas

class SnowflakeClient:
    def __init__(self, config: SnowflakeConfig):
        self.config = config

    @contextmanager
    def get_connection(self) -> Generator:
        conn = snowflake.connector.connect(
            account   = self.config.account,
            user      = self.config.user,
            password  = self.config.password,
            warehouse = self.config.warehouse,
            database  = self.config.database,
            schema    = self.config.schema,
        )
        try:
            yield conn
        finally:
            conn.close()

    def query(self, sql: str, params: tuple = ()) -> pd.DataFrame:
        """SELECTクエリを実行してDataFrameを返す"""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(sql, params)
            return cursor.fetch_pandas_all()

    def execute(self, sql: str, params: tuple = ()) -> int:
        """INSERT/UPDATE/DELETEを実行して影響件数を返す"""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(sql, params)
            conn.commit()
            return cursor.rowcount

    def write_df(
        self,
        df:           pd.DataFrame,
        table_name:   str,
        overwrite:    bool = False,
        auto_create:  bool = False,
    ) -> int:
        """DataFrameをSnowflakeのテーブルに書き込む"""
        df.columns = [col.upper() for col in df.columns]  # カラム名を大文字に
        with self.get_connection() as conn:
            success, chunks, rows, _ = write_pandas(
                conn,
                df,
                table_name         = table_name.upper(),
                overwrite          = overwrite,
                auto_create_table  = auto_create,
            )
        print(f"書き込み完了: {rows}件 → {table_name}")
        return rows

    def copy_from_s3(
        self,
        table_name:   str,
        s3_stage:     str,
        s3_prefix:    str,
        file_format:  str = "csv_format",
        pattern:      str = ".*\\.csv",
    ) -> list:
        """S3のファイルをCOPY INTOでロードする"""
        sql = f"""
            COPY INTO {table_name}
            FROM @{s3_stage}/{s3_prefix}/
            FILE_FORMAT = (FORMAT_NAME = '{file_format}')
            PATTERN     = '{pattern}'
            ON_ERROR    = 'CONTINUE'
        """
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(sql)
            return cursor.fetchall()

パイプラインの実装

パターン① S3 → Snowflake(データ取り込み)

# pipelines/ingest.py
from datetime import date, timedelta
from pathlib  import Path
import pandas as pd
from s3_client        import S3Client
from snowflake_client import SnowflakeClient
from config           import S3Config, SnowflakeConfig

s3  = S3Client(S3Config())
sf  = SnowflakeClient(SnowflakeConfig())

def ingest_daily_orders(target_date: date):
    """指定日の注文CSVをS3からSnowflakeに取り込む"""

    prefix   = f"raw/orders/{target_date.strftime('%Y/%m/%d')}/"
    files    = s3.list_files(prefix)

    if not files:
        print(f"ファイルなし: {prefix}")
        return

    print(f"{len(files)}件のファイルを取り込みます")

    all_dfs = []
    for key in files:
        print(f"読み込み中: {key}")
        df = s3.read_csv(
            key,
            dtype      = {"order_id": str, "user_id": str},
            parse_dates = ["created_at"],
        )
        df["_source_file"] = Path(key).name
        df["_loaded_at"]   = pd.Timestamp.now()
        all_dfs.append(df)

    merged_df = pd.concat(all_dfs, ignore_index=True)
    print(f"合計: {len(merged_df)}")

    # 重複チェック
    dupes = merged_df.duplicated(subset=["order_id"]).sum()
    if dupes > 0:
        print(f"警告: {dupes}件の重複を検出 → 除外します")
        merged_df = merged_df.drop_duplicates(subset=["order_id"])

    # Snowflakeにロード(MERGE)
    sf.write_df(merged_df, "ORDERS_STAGING", overwrite=True)

    sf.execute("""
        MERGE INTO orders AS target
        USING orders_staging AS source
        ON target.order_id = source.order_id
        WHEN MATCHED THEN UPDATE SET
            status     = source.status,
            updated_at = CURRENT_TIMESTAMP()
        WHEN NOT MATCHED THEN INSERT
            (order_id, user_id, product, amount, created_at)
        VALUES
            (source.order_id, source.user_id, source.product,
             source.amount,   source.created_at)
    """)

    print(f"取り込み完了: {target_date}")

# 実行
ingest_daily_orders(date.today() - timedelta(days=1))

パターン② Snowflake → pandas → 集計

# pipelines/aggregate.py
from datetime import date
import pandas as pd
from snowflake_client import SnowflakeClient
from s3_client        import S3Client
from config           import SnowflakeConfig, S3Config

sf = SnowflakeClient(SnowflakeConfig())
s3 = S3Client(S3Config())

def generate_monthly_report(year: int, month: int):
    """月次売上レポートを集計してS3に保存する"""

    # ① Snowflakeから生データを取得
    raw_df = sf.query(f"""
        SELECT
            o.order_id,
            o.user_id,
            o.product,
            o.amount,
            o.created_at,
            u.region,
            u.age_group,
            p.category
        FROM orders o
        LEFT JOIN users    u ON o.user_id    = u.user_id
        LEFT JOIN products p ON o.product    = p.product_id
        WHERE DATE_TRUNC('month', o.created_at) = '{year:04d}-{month:02d}-01'::DATE
    """)

    print(f"取得件数: {len(raw_df)}")

    # ② pandasで集計
    # 地域×カテゴリ別の売上
    region_category = (
        raw_df
        .groupby(["REGION", "CATEGORY"])
        .agg(
            件数    =("ORDER_ID", "count"),
            売上合計=("AMOUNT",   "sum"),
            平均単価=("AMOUNT",   "mean"),
        )
        .round({"平均単価": 0})
        .reset_index()
        .sort_values("売上合計", ascending=False)
    )

    # 年齢層別の購買傾向
    age_trend = (
        raw_df
        .groupby(["AGE_GROUP", "CATEGORY"])
        .agg(
            件数    =("ORDER_ID", "count"),
            売上合計=("AMOUNT",   "sum"),
        )
        .reset_index()
        .pivot_table(
            values ="売上合計",
            index  ="AGE_GROUP",
            columns="CATEGORY",
            aggfunc="sum",
            fill_value=0,
        )
        .reset_index()
    )

    # 日次推移
    daily_trend = (
        raw_df
        .assign(DATE=raw_df["CREATED_AT"].dt.date)
        .groupby("DATE")
        .agg(
            件数    =("ORDER_ID", "count"),
            売上合計=("AMOUNT",   "sum"),
        )
        .reset_index()
        .sort_values("DATE")
    )

    # 前月比
    daily_trend["前月比"] = (
        daily_trend["売上合計"] / daily_trend["売上合計"].shift(1)
    ).round(2)

    # ③ 結果をSnowflakeに書き戻す
    sf.write_df(region_category, "MONTHLY_REPORT_BY_REGION", overwrite=True)

    # ④ 結果をS3に保存
    report_prefix = f"reports/monthly/{year:04d}/{month:02d}"

    s3.write_csv(
        region_category,
        f"{report_prefix}/region_category.csv",
    )
    s3.write_csv(
        age_trend,
        f"{report_prefix}/age_trend.csv",
    )
    s3.write_csv(
        daily_trend,
        f"{report_prefix}/daily_trend.csv",
    )
    s3.write_parquet(
        raw_df,
        f"{report_prefix}/raw_data.parquet",
    )

    print(f"レポート生成完了: {year}{month}")
    print(f"S3保存先: s3://{s3.bucket}/{report_prefix}/")

    return region_category, age_trend, daily_trend

# 実行
generate_monthly_report(2024, 3)

パイプラインの実行管理

シンプルなスケジューラー

# scheduler.py
import schedule
import time
from datetime       import date, timedelta
from pipelines.ingest     import ingest_daily_orders
from pipelines.aggregate  import generate_monthly_report

def daily_job():
    """毎日実行するジョブ"""
    yesterday = date.today() - timedelta(days=1)
    print(f"=== 日次バッチ開始: {yesterday} ===")
    try:
        ingest_daily_orders(yesterday)
        print("日次バッチ完了")
    except Exception as e:
        print(f"日次バッチ失敗: {e}")
        raise

def monthly_job():
    """月初に先月分のレポートを生成"""
    today = date.today()
    if today.day == 1:  # 月初だけ実行
        last_month = (today.replace(day=1) - timedelta(days=1))
        generate_monthly_report(last_month.year, last_month.month)

# スケジュール設定
schedule.every().day.at("06:00").do(daily_job)
schedule.every().day.at("07:00").do(monthly_job)

if __name__ == "__main__":
    print("スケジューラー起動")
    while True:
        schedule.run_pending()
        time.sleep(60)

Dockerで動かす

# Dockerfile
FROM python:3.12-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["python", "scheduler.py"]

エラー通知

# notifier.py
import boto3
import json
import os

def notify_slack(message: str, channel: str = "#data-alerts"):
    """SlackにWebhookで通知する"""
    import urllib.request
    webhook_url = os.environ["SLACK_WEBHOOK_URL"]
    payload = json.dumps({"text": message, "channel": channel})
    req = urllib.request.Request(
        webhook_url,
        data    = payload.encode("utf-8"),
        headers = {"Content-Type": "application/json"},
    )
    urllib.request.urlopen(req)

def notify_sns(message: str, subject: str = "パイプラインエラー"):
    """AWS SNSで通知する"""
    sns = boto3.client("sns", region_name=os.environ.get("AWS_REGION", "ap-northeast-1"))
    sns.publish(
        TopicArn = os.environ["SNS_TOPIC_ARN"],
        Message  = message,
        Subject  = subject,
    )

# パイプラインにエラー通知を組み込む
def run_with_notification(func, *args, **kwargs):
    try:
        return func(*args, **kwargs)
    except Exception as e:
        error_msg = f"パイプラインエラー\n関数: {func.__name__}\nエラー: {str(e)}"
        notify_slack(error_msg)
        raise

パフォーマンスのポイント

# ① 大量データはchunkで処理
def process_large_dataset(sf: SnowflakeClient, chunk_size: int = 50000):
    with sf.get_connection() as conn:
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM large_orders")

        while True:
            rows = cursor.fetchmany(chunk_size)
            if not rows:
                break
            cols = [desc[0] for desc in cursor.description]
            df   = pd.DataFrame(rows, columns=cols)
            # chunkごとに処理
            process_chunk(df)

# ② S3への書き込みはParquetが効率的
# CSV vs Parquet の比較
# CSV:     100MB, 読み込み速度 遅い
# Parquet: 20MB,  読み込み速度 速い(カラム型圧縮)

# ③ Snowflakeのクエリで集計してから取得する
# 悪い例 — 全件Pythonに持ってきてから集計
df = sf.query("SELECT * FROM orders")
result = df.groupby("region")["amount"].sum()

# 良い例 — Snowflakeで集計してから取得
result = sf.query("""
    SELECT region, SUM(amount) AS total
    FROM orders
    GROUP BY region
""")

まとめ

今回構築したパイプラインの構成:

① S3のCSVをpandasで読み込む(S3Client.read_csv)
② データをクレンジング・変換する(pandas)
③ Snowflakeに書き込む(SnowflakeClient.write_df)
④ Snowflakeで集計クエリを実行する(SnowflakeClient.query)
⑤ 結果をpandasで加工する
⑥ 加工結果をS3に保存する(S3Client.write_csv / write_parquet)
  • S3・Snowflake・pandasそれぞれをクライアントクラスに切り出すと再利用しやすい
  • 大量データはchunk処理とParquet形式で効率化する
  • MERGE文でUPSERTすることでデータの重複を防ぐ
  • エラー通知をパイプラインに組み込んでおくと運用が楽

実務のデータパイプラインはこのパターンの応用が多い。次はSnowflakeのTASKとSTREAMを使った差分処理の自動化を整理する。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?