こんにちは、今回はETLのTransformです。分析のプロセスと近しい部分もあるので個人的には理解しやすかったです。
一言サマリー
変換処理の設計原則を守るべし
以下、生成AIが作成した記事です。
【第3回】ETLの基礎 - Transform(変換)
Transformとは何か
Transform(変換)は、ETLプロセスの中核となるステップです。抽出した生データを、分析や利用に適した形に加工する処理全般を指します。
生データは、そのままでは使えないことがほとんどです。データの欠損、形式の不統一、不要な情報の混在など、様々な問題を含んでいます。Transformは、これらの問題を解決し、データの品質を高める重要な工程です。
変換の主な種類
データ変換は、目的に応じて大きく5つのカテゴリに分類できます。
クレンジング(データクリーニング)
データの品質を改善する処理です。欠損値の補完、異常値の除去、重複レコードの削除などを行います。
import pandas as pd
# サンプルデータ
df = pd.DataFrame({
'name': ['田中太郎', 'Tanaka Taro', '田中太郎', None],
'age': [25, 25, 25, 30],
'email': ['tanaka@example.com', 'tanaka@example.com',
'tanaka@example.com', 'sato@example.com']
})
# 重複を削除
df_cleaned = df.drop_duplicates(subset=['email'])
# 欠損値を処理
df_cleaned = df_cleaned.fillna({'name': '名前不明'})
正規化(標準化)
データの形式を統一する処理です。日付形式の統一、文字列の大文字小文字変換、単位の統一などを行います。
# 日付形式の統一
df['order_date'] = pd.to_datetime(df['order_date'],
format='mixed',
errors='coerce')
# 文字列の正規化
df['email'] = df['email'].str.lower().str.strip()
# 金額の単位統一(円→千円)
df['amount_k'] = df['amount_yen'] / 1000
エンリッチメント(データの付加)
他のデータソースと結合して、情報を追加する処理です。
# 注文データに顧客情報を結合
orders = pd.DataFrame({
'order_id': [1, 2, 3],
'customer_id': [101, 102, 101],
'amount': [5000, 3000, 7000]
})
customers = pd.DataFrame({
'customer_id': [101, 102],
'name': ['田中太郎', '佐藤花子'],
'segment': ['プレミアム', '一般']
})
enriched = orders.merge(customers, on='customer_id', how='left')
集約(アグリゲーション)
データをグループ化して統計値を算出する処理です。
# 顧客別の購入金額合計
customer_summary = df.groupby('customer_id').agg({
'amount': 'sum',
'order_id': 'count'
}).rename(columns={
'amount': 'total_amount',
'order_id': 'order_count'
})
派生カラムの生成
既存データから新しい情報を計算して追加します。
# 購入日から曜日を追加
df['day_of_week'] = df['order_date'].dt.day_name()
# 購入金額から顧客セグメントを判定
df['customer_tier'] = pd.cut(df['total_amount'],
bins=[0, 10000, 50000, float('inf')],
labels=['ブロンズ', 'シルバー', 'ゴールド'])
実践的な変換パターン
実務でよく使われる変換パターンをいくつか紹介します。
データ型の変換と検証
受け取ったデータが想定通りの型かを確認し、必要に応じて変換します。
# スキーマ定義
expected_schema = {
'order_id': 'int64',
'customer_id': 'int64',
'order_date': 'datetime64[ns]',
'amount': 'float64',
'status': 'object'
}
# 型の検証と変換
for col, dtype in expected_schema.items():
if col in df.columns:
try:
df[col] = df[col].astype(dtype)
except ValueError as e:
print(f"列 {col} の型変換に失敗: {e}")
文字列の正規化とクレンジング
ユーザー入力データは、表記ゆれや不要な空白が含まれることが多いため、統一的な処理が必要です。
# 住所の正規化
df['address'] = df['address'].str.strip() # 前後の空白除去
df['address'] = df['address'].str.replace(' ', ' ') # 全角空白を半角に
df['address'] = df['address'].str.replace(r'\s+', ' ', regex=True) # 連続空白を1つに
# 電話番号の正規化(ハイフン除去)
df['phone'] = df['phone'].str.replace('-', '')
欠損値の処理戦略
欠損値の扱い方は、データの性質によって変わります。
# 数値データ: 平均値で補完
df['age'].fillna(df['age'].mean(), inplace=True)
# カテゴリデータ: 最頻値で補完
df['prefecture'].fillna(df['prefecture'].mode()[0], inplace=True)
# 欠損が多いカラムは削除
threshold = 0.5 # 50%以上欠損
df = df.dropna(thresh=len(df) * threshold, axis=1)
# 欠損フラグを追加(欠損自体が意味を持つ場合)
df['email_missing'] = df['email'].isna().astype(int)
異常値の検出と処理
統計的手法で異常値を検出し、適切に処理します。
# IQR法による異常値検出
Q1 = df['amount'].quantile(0.25)
Q3 = df['amount'].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
# 異常値をフラグ
df['is_outlier'] = ((df['amount'] < lower_bound) |
(df['amount'] > upper_bound))
# 異常値を上下限値でキャップ
df['amount_capped'] = df['amount'].clip(lower_bound, upper_bound)
変換処理の設計原則
効果的な変換処理を設計するための原則があります。
べき等性の確保
同じ入力に対して、何度実行しても同じ結果が得られるようにします。ランダム関数や現在時刻への依存は避けるべきです。
# 悪い例: 実行のたびに結果が変わる
df['processed_at'] = datetime.now()
# 良い例: 入力データから決定的に算出
df['age_group'] = pd.cut(df['age'], bins=[0, 20, 40, 60, 100],
labels=['若年層', '中年層', '壮年層', '高齢層'])
処理の分割と順序
複雑な変換は、小さなステップに分割します。各ステップは独立してテスト可能にします。
def clean_customer_data(df):
"""顧客データのクレンジング"""
df = remove_duplicates(df)
df = normalize_names(df)
df = validate_emails(df)
return df
def enrich_customer_data(df, master_data):
"""顧客データのエンリッチメント"""
df = add_geographic_info(df, master_data['prefectures'])
df = add_customer_segment(df, master_data['segments'])
return df
検証とログ
変換の各段階で、データの品質を確認し、問題があればログに記録します。
# 変換前後のレコード数チェック
original_count = len(df)
df_cleaned = clean_data(df)
cleaned_count = len(df_cleaned)
if cleaned_count < original_count * 0.9: # 10%以上減少
print(f"警告: レコードが大幅に減少しました ({original_count} → {cleaned_count})")
# データ品質メトリクスのログ
print(f"欠損値率: {df.isna().sum().sum() / df.size * 100:.2f}%")
print(f"重複率: {df.duplicated().sum() / len(df) * 100:.2f}%")
パフォーマンスの最適化
大規模データの変換では、パフォーマンスが重要になります。
# 遅い: ループ処理
for idx, row in df.iterrows():
df.at[idx, 'total'] = row['price'] * row['quantity']
# 速い: ベクトル化演算
df['total'] = df['price'] * df['quantity']
# より速い: NumPy配列を直接操作
df['total'] = df['price'].values * df['quantity'].values
まとめと次回予告
Transformは、生データを分析可能な形に加工する重要なステップです。クレンジング、正規化、エンリッチメント、集約、派生カラム生成という5つの基本パターンがあります。べき等性の確保、処理の分割、検証とログという設計原則を守ることで、信頼性の高い変換処理を実現できます。
次回は、ETLの最終ステップである「Load(ロード)」について学びます。変換済みデータをどこに、どのように保存するか、パフォーマンスと信頼性を両立させる方法を解説します。
感想
5つのカテゴリのデータ変換を学びました。普段仕事では変換後のデータを使うことが多いのですが、正しく丁寧に変換してデータを保存してくれているおかげで効率的に仕事ができているので本当に感謝です。一方で、Transformは最低限(正規化やクレンジングのみ)という考え方も個人的にはありだと思っています。というのも長く多くの人が使うからこそ丁寧なTransformが効果を発揮するものなので、今後使うかもわからない段階では最低限の処理でデータを貯めるだけのほうがコストは低く済むのではないかと考えています。
みなさまのご意見お待ちしています!
ではまたお会いしましょう。