こんにちは、今回はデータを扱うプロセスETLのうちExtract(抽出)について学んでいきます。
一言サマリー
増分抽出はフル抽出よりも効率的だが、増分が正しく取れるか注意が必要
以下、生成AIが作成しています。
【第2回】ETLの基礎 - Extract(抽出)
Extractとは何か
Extract(抽出)は、ETLプロセスの最初のステップで、様々なデータソースからデータを取り出す処理です。「データを読み込む」という単純な作業に聞こえますが、実際には多くの考慮事項があります。
データは様々な場所に、様々な形式で存在しています。リレーショナルデータベース、CSVファイル、Web API、ログファイル、外部SaaSアプリケーションなど、ソースの種類によって抽出方法が異なります。
抽出の基本パターン
データの抽出方法は、主に2つのパターンに分類されます。
フル抽出(Full Extract)は、毎回データソース全体を読み込む方法です。最もシンプルで確実な方法ですが、データ量が多いと時間とリソースを大量に消費します。
例えば、100万件の顧客マスタテーブルがあるとします。フル抽出では、変更の有無に関わらず、毎回100万件すべてを読み込みます。データソースが小さい場合や、データ全体を定期的に検証したい場合に適しています。
増分抽出(Incremental Extract)は、前回の抽出以降に変更・追加されたデータのみを取得する方法です。効率的ですが、実装がやや複雑になります。
先ほどの顧客マスタで、昨日から今日の間に1,000件の更新があったとします。増分抽出では、この1,000件だけを取得すればよいため、処理が圧倒的に速くなります。
増分抽出の実装方法
増分抽出を実現する代表的な方法を3つ紹介します。
タイムスタンプベースは、最も一般的な方法です。各レコードに「最終更新日時」のようなカラムがあることを前提とします。
# 前回の抽出時刻: 2025-10-26 09:00:00
last_extract_time = "2025-10-26 09:00:00"
# 前回以降に更新されたレコードのみ取得
query = f"""
SELECT * FROM customers
WHERE updated_at > '{last_extract_time}'
"""
この方法のメリットは実装がシンプルなことです。デメリットは、削除されたレコードを検知できないこと、タイムスタンプカラムがないテーブルでは使えないことです。
シーケンス番号ベースは、自動採番されるIDを利用する方法です。
# 前回取得した最大ID: 1000000
last_id = 1000000
# それより大きいIDのレコードを取得
query = f"""
SELECT * FROM orders
WHERE order_id > {last_id}
"""
注文履歴のように追記専用のテーブルに適しています。更新や削除がないデータに向いており、処理も高速です。
Change Data Capture (CDC)は、データベースの変更ログ自体を監視する高度な方法です。データベースが内部的に保持する変更履歴(トランザクションログ)を読み取ることで、挿入・更新・削除のすべてを正確に捉えられます。
データソース別の抽出方法
実際のデータソースごとに、抽出方法を見ていきましょう。
リレーショナルデータベースからの抽出は、SQLクエリを使って行います。
import psycopg2
# データベース接続
conn = psycopg2.connect(
host="localhost",
database="sales_db",
user="analyst",
password="secret"
)
# データ抽出
cursor = conn.cursor()
cursor.execute("SELECT * FROM products WHERE category = 'Electronics'")
rows = cursor.fetchall()
大量データを扱う場合は、一度にすべてをメモリに読み込むのではなく、バッチ処理やカーソルを使った分割読み込みが必要です。
CSVファイルからの抽出は、標準的なライブラリで簡単に実行できます。
import pandas as pd
# CSVファイルの読み込み
df = pd.read_csv('sales_data.csv', parse_dates=['order_date'])
# 基本的なフィルタリング
recent_data = df[df['order_date'] > '2025-10-01']
ファイルサイズが大きい場合は、chunksize パラメータを使って分割読み込みを行います。
Web APIからの抽出は、HTTPリクエストを通じて行います。
import requests
# API エンドポイントからデータ取得
response = requests.get(
'https://api.example.com/v1/users',
headers={'Authorization': 'Bearer YOUR_TOKEN'},
params={'limit': 100, 'offset': 0}
)
data = response.json()
APIには通常、レート制限があるため、リトライロジックや適切な待機時間の実装が必要です。
抽出時の注意点
データ抽出では、いくつかの重要な考慮事項があります。
データソースへの負荷を最小限に抑える必要があります。本番データベースから大量データを抽出すると、通常業務に影響を与える可能性があります。読み取り専用レプリカを使う、業務時間外に実行する、クエリを最適化するなどの工夫が求められます。
ネットワーク障害への対応も重要です。データ転送中にネットワークが切断されることは珍しくありません。部分的に取得したデータをどう扱うか、リトライ時に重複を避ける仕組みが必要です。
データの一貫性にも注意が必要です。複数のテーブルから抽出する場合、抽出のタイミングがずれると整合性が取れなくなる可能性があります。トランザクションやスナップショットを活用して、ある時点でのデータを確実に取得します。
抽出結果の保存
抽出したデータは、次の処理(Transform)に渡す前に、一時的に保存することが一般的です。
ファイルストレージに保存する方法は、最もシンプルです。Parquet、JSON、CSVなどの形式で保存します。Parquet形式は、列指向で圧縮効率が高く、大規模データに適しています。
メッセージキューを使う方法は、リアルタイム性が必要な場合に有効です。Kafka、RabbitMQなどを使い、抽出したデータをストリームとして次の処理に渡します。
まとめと次回予告
Extractは、データソースからデータを取り出すプロセスです。フル抽出と増分抽出の2つのパターンがあり、データソースの特性や要件に応じて選択します。タイムスタンプ、シーケンス番号、CDCなどの手法で増分抽出を実現できます。
次回は、ETLの中核となる「Transform(変換)」について学びます。抽出したデータをどのように加工し、分析可能な形に整えるのか、具体的な変換パターンと実装方法を解説します。
感想
抽出方法にはフル抽出と増分抽出の2つがありました。データが増えるほどに時間とお金が膨大になるので、増分抽出が採用されていることが多いのではないかと思いましたがどうなんでしょう。
増分抽出については、代表的な方法3つ(タイムスタンプベース、シーケンス番号ベース、ChangeDataCapture)学びました。削除や更新については論理フラグを使えば解消できる場面もありそうです。どちらかというとリアルタイム性がポイントで、そのような要件ではCDCの検討が必要になってくるようです。
ではまたお会いしましょう。