はじめに
前の記事でSnowflakeのSTAGE + COPY INTOを触った。
今回はS3・Snowflake・pandasを組み合わせた実務でよく見るデータパイプラインの基本構成をまとめる。「S3にあるCSVを取り込んでSnowflakeで集計してS3に返す」というパターンが実際の案件で出てきたので、その構成を整理した。
全体構成
[データソース]
↓
[S3バケット] ← CSVやJSONが蓄積される
↓ COPY INTO
[Snowflake] ← 生データを格納・集計
↓ snowflake-connector
[Python/pandas] ← 加工・分析
↓ boto3
[S3バケット] ← 集計結果を保存
これを実現するために使うライブラリ:
pip install snowflake-connector-python
pip install "snowflake-connector-python[pandas]"
pip install boto3
pip install pandas
pip install python-dotenv
環境設定
# config.py
import os
from dataclasses import dataclass
from dotenv import load_dotenv
load_dotenv()
@dataclass
class SnowflakeConfig:
account: str = os.environ["SNOWFLAKE_ACCOUNT"]
user: str = os.environ["SNOWFLAKE_USER"]
password: str = os.environ["SNOWFLAKE_PASSWORD"]
warehouse: str = os.environ["SNOWFLAKE_WAREHOUSE"]
database: str = os.environ["SNOWFLAKE_DATABASE"]
schema: str = os.environ["SNOWFLAKE_SCHEMA"]
@dataclass
class S3Config:
bucket: str = os.environ["S3_BUCKET"]
region: str = os.environ.get("AWS_REGION", "ap-northeast-1")
aws_access_key: str = os.environ.get("AWS_ACCESS_KEY_ID", "")
aws_secret_key: str = os.environ.get("AWS_SECRET_ACCESS_KEY", "")
# .env
SNOWFLAKE_ACCOUNT=xxxx.ap-northeast-1.aws
SNOWFLAKE_USER=myuser
SNOWFLAKE_PASSWORD=mypassword
SNOWFLAKE_WAREHOUSE=my_warehouse
SNOWFLAKE_DATABASE=my_database
SNOWFLAKE_SCHEMA=my_schema
S3_BUCKET=my-data-bucket
AWS_REGION=ap-northeast-1
AWS_ACCESS_KEY_ID=your-key
AWS_SECRET_ACCESS_KEY=your-secret
S3クライアントの構築
# s3_client.py
import boto3
import pandas as pd
import io
from pathlib import Path
from contextlib import contextmanager
from config import S3Config
class S3Client:
def __init__(self, config: S3Config):
self.config = config
self.client = boto3.client(
"s3",
region_name = config.region,
aws_access_key_id = config.aws_access_key or None,
aws_secret_access_key = config.aws_secret_key or None,
# 値が空の場合はNoneにしてIAMロールを使う
)
self.bucket = config.bucket
def list_files(self, prefix: str) -> list[str]:
"""指定プレフィックスのファイル一覧を取得"""
response = self.client.list_objects_v2(
Bucket = self.bucket,
Prefix = prefix,
)
if "Contents" not in response:
return []
return [obj["Key"] for obj in response["Contents"]]
def read_csv(self, key: str, **kwargs) -> pd.DataFrame:
"""S3のCSVをDataFrameとして読み込む"""
response = self.client.get_object(Bucket=self.bucket, Key=key)
return pd.read_csv(
io.BytesIO(response["Body"].read()),
**kwargs
)
def read_parquet(self, key: str) -> pd.DataFrame:
"""S3のParquetをDataFrameとして読み込む"""
response = self.client.get_object(Bucket=self.bucket, Key=key)
return pd.read_parquet(io.BytesIO(response["Body"].read()))
def write_csv(
self,
df: pd.DataFrame,
key: str,
index: bool = False,
) -> None:
"""DataFrameをCSVとしてS3に書き込む"""
buffer = io.StringIO()
df.to_csv(buffer, index=index, encoding="utf-8")
self.client.put_object(
Bucket = self.bucket,
Key = key,
Body = buffer.getvalue().encode("utf-8"),
ContentType = "text/csv",
)
print(f"S3に書き込み完了: s3://{self.bucket}/{key}")
def write_parquet(self, df: pd.DataFrame, key: str) -> None:
"""DataFrameをParquetとしてS3に書き込む"""
buffer = io.BytesIO()
df.to_parquet(buffer, index=False, engine="pyarrow")
buffer.seek(0)
self.client.put_object(
Bucket = self.bucket,
Key = key,
Body = buffer.getvalue(),
ContentType = "application/octet-stream",
)
print(f"S3に書き込み完了: s3://{self.bucket}/{key}")
def download_file(self, key: str, local_path: str) -> None:
"""S3のファイルをローカルにダウンロード"""
self.client.download_file(self.bucket, key, local_path)
def upload_file(self, local_path: str, key: str) -> None:
"""ローカルのファイルをS3にアップロード"""
self.client.upload_file(local_path, self.bucket, key)
Snowflakeクライアントの構築
# snowflake_client.py
import snowflake.connector
import pandas as pd
from contextlib import contextmanager
from typing import Generator
from config import SnowflakeConfig
from snowflake.connector.pandas_tools import write_pandas
class SnowflakeClient:
def __init__(self, config: SnowflakeConfig):
self.config = config
@contextmanager
def get_connection(self) -> Generator:
conn = snowflake.connector.connect(
account = self.config.account,
user = self.config.user,
password = self.config.password,
warehouse = self.config.warehouse,
database = self.config.database,
schema = self.config.schema,
)
try:
yield conn
finally:
conn.close()
def query(self, sql: str, params: tuple = ()) -> pd.DataFrame:
"""SELECTクエリを実行してDataFrameを返す"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(sql, params)
return cursor.fetch_pandas_all()
def execute(self, sql: str, params: tuple = ()) -> int:
"""INSERT/UPDATE/DELETEを実行して影響件数を返す"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(sql, params)
conn.commit()
return cursor.rowcount
def write_df(
self,
df: pd.DataFrame,
table_name: str,
overwrite: bool = False,
auto_create: bool = False,
) -> int:
"""DataFrameをSnowflakeのテーブルに書き込む"""
df.columns = [col.upper() for col in df.columns] # カラム名を大文字に
with self.get_connection() as conn:
success, chunks, rows, _ = write_pandas(
conn,
df,
table_name = table_name.upper(),
overwrite = overwrite,
auto_create_table = auto_create,
)
print(f"書き込み完了: {rows}件 → {table_name}")
return rows
def copy_from_s3(
self,
table_name: str,
s3_stage: str,
s3_prefix: str,
file_format: str = "csv_format",
pattern: str = ".*\\.csv",
) -> list:
"""S3のファイルをCOPY INTOでロードする"""
sql = f"""
COPY INTO {table_name}
FROM @{s3_stage}/{s3_prefix}/
FILE_FORMAT = (FORMAT_NAME = '{file_format}')
PATTERN = '{pattern}'
ON_ERROR = 'CONTINUE'
"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(sql)
return cursor.fetchall()
パイプラインの実装
パターン① S3 → Snowflake(データ取り込み)
# pipelines/ingest.py
from datetime import date, timedelta
from pathlib import Path
import pandas as pd
from s3_client import S3Client
from snowflake_client import SnowflakeClient
from config import S3Config, SnowflakeConfig
s3 = S3Client(S3Config())
sf = SnowflakeClient(SnowflakeConfig())
def ingest_daily_orders(target_date: date):
"""指定日の注文CSVをS3からSnowflakeに取り込む"""
prefix = f"raw/orders/{target_date.strftime('%Y/%m/%d')}/"
files = s3.list_files(prefix)
if not files:
print(f"ファイルなし: {prefix}")
return
print(f"{len(files)}件のファイルを取り込みます")
all_dfs = []
for key in files:
print(f"読み込み中: {key}")
df = s3.read_csv(
key,
dtype = {"order_id": str, "user_id": str},
parse_dates = ["created_at"],
)
df["_source_file"] = Path(key).name
df["_loaded_at"] = pd.Timestamp.now()
all_dfs.append(df)
merged_df = pd.concat(all_dfs, ignore_index=True)
print(f"合計: {len(merged_df)}件")
# 重複チェック
dupes = merged_df.duplicated(subset=["order_id"]).sum()
if dupes > 0:
print(f"警告: {dupes}件の重複を検出 → 除外します")
merged_df = merged_df.drop_duplicates(subset=["order_id"])
# Snowflakeにロード(MERGE)
sf.write_df(merged_df, "ORDERS_STAGING", overwrite=True)
sf.execute("""
MERGE INTO orders AS target
USING orders_staging AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET
status = source.status,
updated_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN INSERT
(order_id, user_id, product, amount, created_at)
VALUES
(source.order_id, source.user_id, source.product,
source.amount, source.created_at)
""")
print(f"取り込み完了: {target_date}")
# 実行
ingest_daily_orders(date.today() - timedelta(days=1))
パターン② Snowflake → pandas → 集計
# pipelines/aggregate.py
from datetime import date
import pandas as pd
from snowflake_client import SnowflakeClient
from s3_client import S3Client
from config import SnowflakeConfig, S3Config
sf = SnowflakeClient(SnowflakeConfig())
s3 = S3Client(S3Config())
def generate_monthly_report(year: int, month: int):
"""月次売上レポートを集計してS3に保存する"""
# ① Snowflakeから生データを取得
raw_df = sf.query(f"""
SELECT
o.order_id,
o.user_id,
o.product,
o.amount,
o.created_at,
u.region,
u.age_group,
p.category
FROM orders o
LEFT JOIN users u ON o.user_id = u.user_id
LEFT JOIN products p ON o.product = p.product_id
WHERE DATE_TRUNC('month', o.created_at) = '{year:04d}-{month:02d}-01'::DATE
""")
print(f"取得件数: {len(raw_df)}")
# ② pandasで集計
# 地域×カテゴリ別の売上
region_category = (
raw_df
.groupby(["REGION", "CATEGORY"])
.agg(
件数 =("ORDER_ID", "count"),
売上合計=("AMOUNT", "sum"),
平均単価=("AMOUNT", "mean"),
)
.round({"平均単価": 0})
.reset_index()
.sort_values("売上合計", ascending=False)
)
# 年齢層別の購買傾向
age_trend = (
raw_df
.groupby(["AGE_GROUP", "CATEGORY"])
.agg(
件数 =("ORDER_ID", "count"),
売上合計=("AMOUNT", "sum"),
)
.reset_index()
.pivot_table(
values ="売上合計",
index ="AGE_GROUP",
columns="CATEGORY",
aggfunc="sum",
fill_value=0,
)
.reset_index()
)
# 日次推移
daily_trend = (
raw_df
.assign(DATE=raw_df["CREATED_AT"].dt.date)
.groupby("DATE")
.agg(
件数 =("ORDER_ID", "count"),
売上合計=("AMOUNT", "sum"),
)
.reset_index()
.sort_values("DATE")
)
# 前月比
daily_trend["前月比"] = (
daily_trend["売上合計"] / daily_trend["売上合計"].shift(1)
).round(2)
# ③ 結果をSnowflakeに書き戻す
sf.write_df(region_category, "MONTHLY_REPORT_BY_REGION", overwrite=True)
# ④ 結果をS3に保存
report_prefix = f"reports/monthly/{year:04d}/{month:02d}"
s3.write_csv(
region_category,
f"{report_prefix}/region_category.csv",
)
s3.write_csv(
age_trend,
f"{report_prefix}/age_trend.csv",
)
s3.write_csv(
daily_trend,
f"{report_prefix}/daily_trend.csv",
)
s3.write_parquet(
raw_df,
f"{report_prefix}/raw_data.parquet",
)
print(f"レポート生成完了: {year}年{month}月")
print(f"S3保存先: s3://{s3.bucket}/{report_prefix}/")
return region_category, age_trend, daily_trend
# 実行
generate_monthly_report(2024, 3)
パイプラインの実行管理
シンプルなスケジューラー
# scheduler.py
import schedule
import time
from datetime import date, timedelta
from pipelines.ingest import ingest_daily_orders
from pipelines.aggregate import generate_monthly_report
def daily_job():
"""毎日実行するジョブ"""
yesterday = date.today() - timedelta(days=1)
print(f"=== 日次バッチ開始: {yesterday} ===")
try:
ingest_daily_orders(yesterday)
print("日次バッチ完了")
except Exception as e:
print(f"日次バッチ失敗: {e}")
raise
def monthly_job():
"""月初に先月分のレポートを生成"""
today = date.today()
if today.day == 1: # 月初だけ実行
last_month = (today.replace(day=1) - timedelta(days=1))
generate_monthly_report(last_month.year, last_month.month)
# スケジュール設定
schedule.every().day.at("06:00").do(daily_job)
schedule.every().day.at("07:00").do(monthly_job)
if __name__ == "__main__":
print("スケジューラー起動")
while True:
schedule.run_pending()
time.sleep(60)
Dockerで動かす
# Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "scheduler.py"]
エラー通知
# notifier.py
import boto3
import json
import os
def notify_slack(message: str, channel: str = "#data-alerts"):
"""SlackにWebhookで通知する"""
import urllib.request
webhook_url = os.environ["SLACK_WEBHOOK_URL"]
payload = json.dumps({"text": message, "channel": channel})
req = urllib.request.Request(
webhook_url,
data = payload.encode("utf-8"),
headers = {"Content-Type": "application/json"},
)
urllib.request.urlopen(req)
def notify_sns(message: str, subject: str = "パイプラインエラー"):
"""AWS SNSで通知する"""
sns = boto3.client("sns", region_name=os.environ.get("AWS_REGION", "ap-northeast-1"))
sns.publish(
TopicArn = os.environ["SNS_TOPIC_ARN"],
Message = message,
Subject = subject,
)
# パイプラインにエラー通知を組み込む
def run_with_notification(func, *args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
error_msg = f"パイプラインエラー\n関数: {func.__name__}\nエラー: {str(e)}"
notify_slack(error_msg)
raise
パフォーマンスのポイント
# ① 大量データはchunkで処理
def process_large_dataset(sf: SnowflakeClient, chunk_size: int = 50000):
with sf.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM large_orders")
while True:
rows = cursor.fetchmany(chunk_size)
if not rows:
break
cols = [desc[0] for desc in cursor.description]
df = pd.DataFrame(rows, columns=cols)
# chunkごとに処理
process_chunk(df)
# ② S3への書き込みはParquetが効率的
# CSV vs Parquet の比較
# CSV: 100MB, 読み込み速度 遅い
# Parquet: 20MB, 読み込み速度 速い(カラム型圧縮)
# ③ Snowflakeのクエリで集計してから取得する
# 悪い例 — 全件Pythonに持ってきてから集計
df = sf.query("SELECT * FROM orders")
result = df.groupby("region")["amount"].sum()
# 良い例 — Snowflakeで集計してから取得
result = sf.query("""
SELECT region, SUM(amount) AS total
FROM orders
GROUP BY region
""")
まとめ
今回構築したパイプラインの構成:
① S3のCSVをpandasで読み込む(S3Client.read_csv)
② データをクレンジング・変換する(pandas)
③ Snowflakeに書き込む(SnowflakeClient.write_df)
④ Snowflakeで集計クエリを実行する(SnowflakeClient.query)
⑤ 結果をpandasで加工する
⑥ 加工結果をS3に保存する(S3Client.write_csv / write_parquet)
- S3・Snowflake・pandasそれぞれをクライアントクラスに切り出すと再利用しやすい
- 大量データはchunk処理とParquet形式で効率化する
- MERGE文でUPSERTすることでデータの重複を防ぐ
- エラー通知をパイプラインに組み込んでおくと運用が楽
実務のデータパイプラインはこのパターンの応用が多い。次はSnowflakeのTASKとSTREAMを使った差分処理の自動化を整理する。