今回はべき等性について学びます。
以下からAIが文章を作成しています。
べき等性とは何か
べき等性(Idempotency)とは、同じ操作を何度実行しても、結果が変わらない性質のことです。1回実行した結果と、2回、3回実行した結果が同じになることを意味します。
データパイプラインでは、ネットワーク障害、部分的な失敗、手動での再実行など、様々な理由で同じ処理が複数回実行されることがあります。べき等性があれば、これらの場合でもデータの整合性が保たれます。
非べき等な操作の例
# 悪い例: べき等でない
def process_order_non_idempotent(order_id):
order = get_order(order_id)
# 在庫を減らす(何度実行しても減り続ける)
current_stock = get_stock(order['product_id'])
new_stock = current_stock - order['quantity']
update_stock(order['product_id'], new_stock)
# 売上を記録(重複して記録される)
record_sale({'order_id': order_id, 'amount': order['amount']})
# 1回目: 在庫100 → 95, 売上記録1件
# 2回目: 在庫95 → 90, 売上記録2件(重複)
パターン1: 一意キーによる重複チェック
def process_order_idempotent(order_id):
# すでに処理済みかチェック
if is_already_processed(order_id):
return get_processing_result(order_id)
order = get_order(order_id)
# 処理実行
current_stock = get_stock(order['product_id'])
new_stock = current_stock - order['quantity']
update_stock(order['product_id'], new_stock)
result = record_sale({'order_id': order_id, 'amount': order['amount']})
# 処理完了を記録
mark_as_processed(order_id, result)
return result
パターン2: UPSERTによる置き換え
def update_customer_summary_idempotent(customer_id):
# 顧客の全注文を集計(決定的な計算)
query = f"""
SELECT
customer_id,
COUNT(*) as total_orders,
SUM(amount) as total_amount
FROM orders
WHERE customer_id = '{customer_id}'
GROUP BY customer_id
"""
summary = pd.read_sql(query, con=engine)
# 既存レコードを置き換え(何度実行しても結果は同じ)
with engine.begin() as conn:
conn.execute(text("""
INSERT INTO customer_summary
SELECT * FROM customer_summary_temp
ON CONFLICT (customer_id)
DO UPDATE SET
total_orders = EXCLUDED.total_orders,
total_amount = EXCLUDED.total_amount
"""))
パターン3: 決定的な計算
def calculate_monthly_report_idempotent(year, month):
# 対象期間を明確に定義
start_date = f"{year}-{month:02d}-01"
last_day = calendar.monthrange(year, month)[1]
end_date = f"{year}-{month:02d}-{last_day}"
# 期間内のデータのみを使用
query = f"""
SELECT category, COUNT(*) as order_count, SUM(amount) as total_sales
FROM orders
WHERE order_date >= '{start_date}' AND order_date <= '{end_date}'
GROUP BY category
"""
report = pd.read_sql(query, con=engine)
# 同じレポートIDなら上書き
report_id = f"{year}{month:02d}"
report.to_sql(f'monthly_report_{report_id}', con=engine, if_exists='replace', index=False)
return report
ファイル処理でのべき等性
class IdempotentFileProcessor:
def __init__(self):
self.processed_files = self.load_processed_files()
def process_file(self, file_path):
# ファイルのハッシュを計算
file_hash = self.calculate_file_hash(file_path)
file_id = f"{os.path.basename(file_path)}_{file_hash}"
# すでに処理済みかチェック
if file_id in self.processed_files:
print(f"File already processed: {file_path}")
return None
# ファイル処理
data = pd.read_csv(file_path)
result = self.transform_data(data)
# 結果を保存
output_path = f"processed/{os.path.basename(file_path)}"
result.to_csv(output_path, index=False)
# 処理済みとして記録
self.processed_files.add(file_id)
self.save_processed_files()
return result
ベストプラクティス
- 一意キーを必ず含める - すべてのイベント・リクエストに一意のIDを付与
- 処理済みログを保持 - どの処理が完了したかを記録
- 決定的な処理にする - 現在時刻やランダム値を避ける
- UPSERTを活用 - 既存レコードを置き換える形で更新
- トランザクションを使う - 複数の操作を原子的に実行
まとめ
べき等性は信頼性の高いパイプライン構築に不可欠です。一意キーによる重複チェック、UPSERT、決定的な計算などのパターンで実装できます。
感想
スクリプトを書く際にnotebookなど対話型のものをよく使うので、べき等性は常に意識していきたいですね。最悪は”最初から実行し直せばいいや”の考えが使えるものの、結果として作業時間が増えたり、ストレージへの書き込みでミスが起こったりするのでここは気を引き締めねばと思いました。