今回は、バッチ処理です。
(翌日にバッチ処理がうまくいってなかったときの悲しみたるや...)
一言サマリー
一括処理で便利に効率的に!
バッチ処理とは何か
バッチ処理(Batch Processing)は、一定量のデータをまとめて処理する方式です。「バッチ」とは「ひとまとまり」という意味で、データを束にして一括で処理します。
例えば、ECサイトで1日の売上を集計する場合、営業終了後に1日分のデータをまとめて処理します。リアルタイムで注文ごとに集計するのではなく、決まった時間に一括処理を行うのがバッチ処理の特徴です。
バッチ処理の特徴
バッチ処理には、いくつかの明確な特徴があります。
定期実行
決まった時間やトリガーで実行されます。毎日深夜2時、毎週月曜日、月末など、スケジュールに基づいて動作します。
大量データの処理
蓄積されたデータをまとめて処理できるため、大量データの効率的な処理に適しています。個別に処理するより、まとめて処理する方がオーバーヘッドが少なくなります。
非リアルタイム
処理結果が即座に必要でない場合に向いています。数時間から数日の遅延が許容される分析やレポート作成に最適です。
バッチ処理のユースケース
実際のビジネスでよく使われるバッチ処理のパターンを見ていきましょう。
日次レポート生成
最も一般的なユースケースです。前日の売上、アクセス数、会員登録数などを集計し、レポートを作成します。
from datetime import datetime, timedelta
import pandas as pd
def generate_daily_report(date):
"""日次売上レポートの生成"""
# 前日のデータを抽出
target_date = date - timedelta(days=1)
query = f"""
SELECT
category,
COUNT(*) as order_count,
SUM(amount) as total_sales,
AVG(amount) as avg_order_value
FROM orders
WHERE DATE(order_date) = '{target_date}'
GROUP BY category
"""
df = pd.read_sql(query, con=engine)
# レポート保存
report_path = f'reports/sales_{target_date}.csv'
df.to_csv(report_path, index=False)
print(f"Report generated: {report_path}")
return df
# 毎朝実行
report = generate_daily_report(datetime.now())
データ集約とサマリー
詳細なトランザクションデータから、分析用のサマリーテーブルを作成します。
def create_customer_summary():
"""顧客別サマリーテーブルの作成"""
query = """
SELECT
customer_id,
COUNT(DISTINCT order_id) as total_orders,
SUM(amount) as lifetime_value,
MAX(order_date) as last_order_date,
MIN(order_date) as first_order_date,
AVG(amount) as avg_order_value
FROM orders
GROUP BY customer_id
"""
df = pd.read_sql(query, con=engine)
# RFM分析用の指標を追加
df['days_since_last_order'] = (
pd.Timestamp.now() - pd.to_datetime(df['last_order_date'])
).dt.days
# サマリーテーブルに保存
df.to_sql('customer_summary', con=engine,
if_exists='replace', index=False)
return df
データクレンジングとマスタ更新
日々蓄積されるデータをクレンジングし、マスターデータを更新します。
def clean_and_update_customer_master():
"""顧客マスタのクレンジングと更新"""
# 最新データを取得
df = pd.read_sql("SELECT * FROM customer_raw", con=engine)
# クレンジング処理
df['email'] = df['email'].str.lower().str.strip()
df['phone'] = df['phone'].str.replace('-', '')
df = df.drop_duplicates(subset=['email'])
# 住所の正規化
df['prefecture'] = df['address'].str.extract(r'(東京都|大阪府|.*県)')
# マスタテーブルを更新(UPSERT)
temp_table = 'temp_customer_master'
df.to_sql(temp_table, con=engine, if_exists='replace', index=False)
with engine.begin() as conn:
conn.execute(text(f"""
INSERT INTO customer_master
SELECT * FROM {temp_table}
ON CONFLICT (customer_id)
DO UPDATE SET
email = EXCLUDED.email,
phone = EXCLUDED.phone,
prefecture = EXCLUDED.prefecture,
updated_at = CURRENT_TIMESTAMP
"""))
バッチジョブの設計パターン
効率的なバッチ処理を設計するための基本パターンを紹介します。
チャンク処理(分割処理)
大量データを一度に処理するとメモリ不足になるため、小さなチャンクに分割して処理します。
def process_large_file_in_chunks(file_path, chunk_size=10000):
"""大きなファイルをチャンクで処理"""
total_processed = 0
for chunk in pd.read_csv(file_path, chunksize=chunk_size):
# 各チャンクを処理
chunk_cleaned = clean_data(chunk)
chunk_transformed = transform_data(chunk_cleaned)
# データベースに保存
chunk_transformed.to_sql('processed_data',
con=engine,
if_exists='append',
index=False)
total_processed += len(chunk)
print(f"Processed {total_processed} rows")
return total_processed
増分処理パターン
前回処理した位置を記録し、新しいデータのみを処理します。
def incremental_batch_process():
"""増分バッチ処理"""
# 最後に処理した時刻を取得
with engine.connect() as conn:
result = conn.execute(text(
"SELECT MAX(processed_at) FROM batch_log"
))
last_processed = result.scalar()
if last_processed is None:
# 初回実行の場合
last_processed = '2020-01-01 00:00:00'
# 前回以降のデータのみ取得
query = f"""
SELECT * FROM transactions
WHERE created_at > '{last_processed}'
ORDER BY created_at
"""
df = pd.read_sql(query, con=engine)
if len(df) == 0:
print("No new data to process")
return
# データ処理
df_processed = process_data(df)
# 結果を保存
df_processed.to_sql('processed_transactions',
con=engine,
if_exists='append',
index=False)
# 処理履歴を記録
log_batch_execution(len(df), df['created_at'].max())
並列処理パターン
独立した複数のタスクを並列で実行し、処理時間を短縮します。
from concurrent.futures import ThreadPoolExecutor, as_completed
def process_category(category):
"""カテゴリ別の処理"""
query = f"""
SELECT * FROM products
WHERE category = '{category}'
"""
df = pd.read_sql(query, con=engine)
# 処理実行
result = analyze_category(df)
return category, result
def parallel_batch_process():
"""並列バッチ処理"""
categories = ['Electronics', 'Books', 'Clothing', 'Food']
results = {}
with ThreadPoolExecutor(max_workers=4) as executor:
# 全カテゴリを並列で処理
futures = {
executor.submit(process_category, cat): cat
for cat in categories
}
for future in as_completed(futures):
category, result = future.result()
results[category] = result
print(f"Completed: {category}")
return results
バッチスケジューリング
バッチ処理を定期的に実行するためのスケジューリング方法を見ていきます。
cronを使った基本的なスケジューリング
# crontab の設定例
# 毎日午前2時に実行
0 2 * * * /usr/bin/python3 /path/to/daily_batch.py
# 毎週月曜日の午前3時に実行
0 3 * * 1 /usr/bin/python3 /path/to/weekly_batch.py
# 毎月1日の午前4時に実行
0 4 1 * * /usr/bin/python3 /path/to/monthly_batch.py
Pythonでのスケジューリング(schedule ライブラリ)
import schedule
import time
def daily_job():
"""日次ジョブ"""
print("Running daily job...")
generate_daily_report(datetime.now())
def weekly_job():
"""週次ジョブ"""
print("Running weekly job...")
generate_weekly_summary()
# スケジュール設定
schedule.every().day.at("02:00").do(daily_job)
schedule.every().monday.at("03:00").do(weekly_job)
# 実行ループ
while True:
schedule.run_pending()
time.sleep(60) # 1分ごとにチェック
バッチ処理のモニタリング
バッチ処理が正常に実行されているかを監視することが重要です。
実行ログの記録
def log_batch_execution(batch_name, status, records_processed, error_msg=None):
"""バッチ実行ログの記録"""
log_entry = {
'batch_name': batch_name,
'execution_time': datetime.now(),
'status': status, # 'SUCCESS' or 'FAILED'
'records_processed': records_processed,
'error_message': error_msg
}
df_log = pd.DataFrame([log_entry])
df_log.to_sql('batch_execution_log',
con=engine,
if_exists='append',
index=False)
def run_batch_with_logging(batch_func, batch_name):
"""ログ記録付きバッチ実行"""
try:
records = batch_func()
log_batch_execution(batch_name, 'SUCCESS', records)
return records
except Exception as e:
log_batch_execution(batch_name, 'FAILED', 0, str(e))
raise
実行時間の監視
import time
def monitor_batch_performance(func):
"""バッチ処理のパフォーマンス監視デコレータ"""
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
elapsed_time = time.time() - start_time
print(f"Batch completed in {elapsed_time:.2f} seconds")
# 実行時間が閾値を超えたら警告
if elapsed_time > 3600: # 1時間
print(f"WARNING: Batch took longer than expected")
return result
except Exception as e:
elapsed_time = time.time() - start_time
print(f"Batch failed after {elapsed_time:.2f} seconds: {e}")
raise
return wrapper
@monitor_batch_performance
def daily_sales_batch():
"""売上集計バッチ(監視付き)"""
return generate_daily_report(datetime.now())
バッチ処理のベストプラクティス
効果的なバッチ処理を実装するためのポイントをまとめます。
1. べき等性を保つ
同じバッチを複数回実行しても、結果が変わらないようにします。
2. リトライ可能な設計
失敗した場合に、途中から再開できる仕組みを用意します。
3. チェックポイントの保存
処理の進捗を定期的に保存し、障害時に最初からやり直さなくて済むようにします。
4. タイムアウトの設定
処理が異常に長時間実行されないよう、タイムアウトを設定します。
5. データ検証
処理前後でデータ件数や主要メトリクスを検証し、異常がないか確認します。
まとめと次回予告
バッチ処理は、一定量のデータをまとめて処理する方式で、定期実行、大量データ処理、非リアルタイム性という特徴があります。チャンク処理、増分処理、並列処理などのパターンを活用することで、効率的なバッチジョブを構築できます。適切なスケジューリングとモニタリングにより、信頼性の高いバッチシステムを実現できます。
次回は、「ストリーム処理の基本」について学びます。リアルタイムでデータを処理するストリームパイプラインの仕組みと、バッチ処理との違いを詳しく解説します。
感想
ベストベストプラクティスについてはどれも取り入れたいポイントですね。
Calude君にコード例を出してもらって見てみたのですが、無意識に入れているものとそうでないものがあったので、チェックリストにして取り入れるようにしたいと思いました。