数か月前、初めてデータレイクを作りました。最近は、それを拡張しようと作業を進めています。その間、自分がやらかしたと気づいた間違いをまとめていきます。
この記事に出るコードには、DuckDB、pandas、Polars(Python API)、Amazon S3の例が含まれていますが、これらはあくまで概念を説明するためのものです。この記事の主題はデータレイクの整理方法であり、特定のクエリツールやクラウドストレージではありません。そもそも、データレイクを利用する考え方の一つは、ストレージとクエリツールを分離することにあります。
目次
データレイクとは何か?
コンセプトはシンプルです。Amazon S3などのオブジェクトストレージに、データをそのままで、まとめておいたものです。ストレージは安価です。もし、そのデータを構造化、あるいは少なくとも半構造化しておけば、好きなクエリエンジン(例えば、Amazon AthenaやPrestoなどのラウドベースのサービス、または、ローカルでも実行可能なDuckDBやpolarsなど)を使って便利にクエリを実行できます。
クエリを実行するために必要な要素は次の3つです。
- ストレージ:データは必ずどこかに存在
- クエリエンジン:データに対してクエリを実行できるプログラムが必要
- 実行環境:そのプログラムを実行する環境が必要
従来のRDBMS(PostgreSQLやMySQLなどといったOLTPでも、Big queryやRedshiftなどといったOLAPでも)はこれら3つすべてを提供しますが、特に頻繁にクエリをする必要がない場合、ストレージが高価になります。データレイクはこのコストの問題を解決します。また、ストレージとクエリエンジンが分離されているため、クエリエンジンを変更してもデータの移行は不要です。
さて、私のやらかした間違いについて話しましょう。
以下の話を具体的にするために、時系列で生成されるトランザクションデータがあると仮定します。非常に単純な例として、次のような売上データがあるとします。
| date | store_id | item_id | unit_price | number_sold |
|---|---|---|---|---|
| 2025-01-01 | 1 | 2 | 3 | 4 |
データスワンプ
私の間違いの一つ目は、ファイル名に日付情報を含めて、すべてのファイルを一つのフラットなフォルダに置くことです。次のような構造です。
sales
├─ data_2025-01-01.parquet
├─ data_2025-01-02.parquet
├─ data_2025-01-03.parquet
└─ data_2025-01-04.parquet
このアンチパターンはデータスワンプ(data swamp、データの沼)と呼ばれます。(沼も、一応湖ではありますが、きれいではない湖です。)
「データスワンプ」をインターネットで検索すれば、なぜそれが悪いのか、多くの理由が見つかります。私がこうやってよくないと気づいたのは、ファイルの範囲を指定するグロブパターンを書きにくいからです。
データをクエリする際、クエリエンジンにすべてのファイルを読み込ませたくはありません。上記の売上の例のだと、次のような、グロブパターンを使っているクエリが書けます。
SELECT item_id, SUM(unit_price * number_sold) AS sales
FROM 's3://my_bucket/sales/data_2025-01-*.parquet'
WHERE "date" BETWEEN '2025-01-10' AND '2025-01-20'
しかし、次の複数の年の四半期の条件に合わせたファイル名のグロブパターンが非常に書きにくいです。
WHERE date_part('year', "date") BETWEEN 2020 AND 2025
AND date_part('month', "date") BETWEEN 1 AND 3
書きにくいかどうかの以前に、そもそも、絞り込みの条件をすべてWHERE句の方に入れるべきであり、FROM句とWHERE句にバラバラに記述するのは、場当たり的な回避策に見えるコードです。
正しい方法の例を見ってみましょう。ディレクトリ構造自体が、クエリ可能な列として扱われます。例えば、次のように構成できます。
sales
├─ year=2024
| ├─ month=1
| | ├─ day=1
| | | └─ data.parquet
| | └─ day=2
| | └─ data.parquet
| └─ month=2
| └─ day=1
| └─ data.parquet
└─ year=2025
概念を説明するためだけ、この例ではフォルダは日ごとまですが、実際にはParquetファイルが小さすぎると、クエリのオーバーヘッドが増えます。
各フォルダに複数のファイルを含めることができます。月ごとな、適度な数の大きなParquetファイルにデータを統合するのが一般的です。
日付情報はすでにファイルパスに含まれており、クエリエンジンはファイルパスは列として扱えるため、data.parquet ファイル自体は次の4つの列だけで問題ありません。
| store_id | item_id | unit_price | number_sold |
|---|---|---|---|
| 1 | 2 | 3 | 4 |
物理的にはそれぞれのdata.parquetファイルは以上の4列しか持っていませんが、論理的には、ファイルパスの情報と合わせて、以下のようなリレーションとして扱われます。
| store_id | item_id | unit_price | number_sold | year | month | day |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | 2025 | 1 | 1 |
このパターンはHiveパーティショニングと呼ばれます。これは元々、Hadoop上に構築されたデータウェアハウスソフトウェアであるApache Hiveに由来するものであり、オブジェクトストレージでデータを整理するための事実上の標準となりました。主要なクエリエンジンやデータ処理フレームワークは、このディレクトリ構造をネイティブにサポートしており、column=value形式のパスを自動的に列として認識します。
上記の、2020~2025年、それぞれ1~3月という条件のクエリコードの例を挙げます。
DuckDBの例は次のとおりです。
SELECT item_id, SUM(unit_price * number_sold) AS sales
FROM 's3://my_bucket/sales/*/*/*/*.parquet'
WHERE year BETWEEN 2020 AND 2025
AND month BETWEEN 1 AND 3
Polarsの例は次のようになります。
import polars as pl
df = (
pl.scan_parquet("s3://my_bucket/sales/")
.filter(
(pl.col("year").is_between(2020, 2025)) &
(pl.col("month").is_between(1, 3))
)
.group_by("item_id")
.agg((pl.col("unit_price") * pl.col("number_sold")).sum().alias("sales"))
.collect()
)
手作りのパーティション
私がやらかした2つ目の間違いは、書き込み時にHiveパーティショニングを手作りで実装したことです。
最初は次のようなものを書きました。
from pathlib import Path
import boto3
import duckdb
con = duckdb.connect("local.duckdb")
try:
con.execute("COPY ... TO 'local_data.parquet' (FORMAT PARQUET);")
s3_client = boto3.client("s3")
s3_client.upload_file(
"local_data.parquet",
"my_bucket",
"sales/year=2025/month=1/day=1/data.parquet"
)
finally:
con.close()
file_path = Path("local_data.parquet")
if file_path.exists():
file_path.unlink()
今にして思えば、少なくともローカルファイルを書き出して、S3クライアントでアップロードする部分を省くことはできました。SQLは次のようになります。
COPY ...
TO 's3://my_bucket/sales/year=2025/month=1/day=1/data.parquet'
(FORMAT PARQUET);
しかし、正しい方法は、ライブラリやツールにパーティショニングを自動で処理させることです。以下はDuckDBのSQLの正しい例です。
COPY (
SELECT
date_part('year', "date") AS year,
date_part('month', "date") AS month,
date_part('day', "date") AS day,
...
)
TO 's3://my_bucket/sales'
(FORMAT PARQUET, PARTITION_BY (year, month, day));
もう一つの例として、pandasでは次のようにできます。
import pandas as pd
# DataFrameであるdfを定義
df["year"] = df["date"].dt.year
df["month"] = df["date"].dt.month
df["day"] = df["date"].dt.day
df.to_parquet(
"s3://my_bucket/sales/",
partition_cols=["year", "month", "day"],
engine="pyarrow"
)
パーティション列はコードで指定した順序でディレクトリ構造になります。実際のParquetファイル自体にはyear, month, dayの列は含まれません。
既存のパーティションに書き込む際は注意が必要です。
上書き、追記、部分的な上書き、あるいはエラーをスローするといった動作が可能です。以上のDuckDBとpandasのコードにオプション引数を追加することで細かく制御できます。
ドキュメント:
- DuckDB: https://duckdb.org/docs/data/partitioning/partitioned_writes
- pandas : https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_parquet.html
- PyArrow (
pandas.DataFrame.to_parquetのデフォルトなバックエンド):https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html
まとめ
- ファイルをフラットなオブジェクトストレージディレクトリにダンプするのは、データスワンプというアンチパターンの一例です
- Hiveパーティショニングを使用することで、ファイルパスのパターンを通じて論理的な列を追加でき、これは主要なクエリエンジンによってサポートされています
- 書き込む際、ライブラリやツールは、パーティションAPIを提供しており、手作りでパーティションパスを書く必要はありません
補足
厳密に言えば、ここまではパーティションを手作りで実装する必要はないと述べただけです。
では、なぜ手作りで実装することが悪いのでしょうか?
この文脈での「手作り」とは、英語の技術文書によく出てくる「hand-roll」を翻訳した表現で、既存のライブラリやツールを使用する代わりに、機能を独自に実装することを指します。(語源は手巻きタバコのことかも、しらんけど。)
私は「手作り」に反対する理由は以下の通りです。ただし、これはアプリケーション開発の話に限ります。ライブラリ開発などは別の話です。
まずはエッジケースの問題です。アプリケーション開発者が、評判の良いライブラリのエッジケース対応に匹敵する実装をすることはほぼ不可能です。
そして、可読性の問題です。これは個人的な意見かもしれませんが、「手作り」での実装は手続き的(procedural)になりがちです。一方で、ライブラリのAPIを使用することはより宣言的(declarative)なコードになります。手続き的なコードは「どう動くか」を記述するのに対し、宣言的なコードは「何をしたいか」という意図を直接的に伝えます。コード自体に書き手の意図が明確に表れていることは、良いコードの重要な要素の一つだと考えます。