こんにちは!
KDDIアイレットの取り組みとして6月22日〜7月3日の期間で開催中の「Google Cloud Next '26 / Google I/Oやってみた系ブログリレー」、本日は9日目の投稿です。
今回は「Firestore」を対象に、実際に検証してみた様子をお届けします!
前回の記事はこちらです。
はじめに
Firestore(NoSQLドキュメントDB)を本気で使ったことがある人なら、一度はこう思ったはずです。
JOINができず、サーバー側の集計も貧弱。だから結局アプリ側で突合するか、BigQueryに流して分析するしかない
Firestoreは読み書きのスケールとリアルタイム同期に強い一方で、構造的にコレクションをまたいだJOINができず、サーバー側集計もcount / sum / average程度に限られていました。その結果、現場はだいたい次のどれかを強いられます。
- 過剰な非正規化
- アプリ側JOIN
- BigQueryへエクスポートして分析
Google Cloud Next '26でGAになったFirestore Pipelines APIは、ここに正面から手を入れてきました。collection → where → aggregate → sort → limitをパイプラインで繋ぎ、さらに相関サブクエリでコレクション間JOINまでサーバー側で実行できます。
本記事では「顧客マスタ×注文」という、誰のシステムにもある題材で、JOIN・集計・グルーピングをFirestoreだけで完結させてみます。最後に「どこまで実用になるか/どこが落とし穴か」を考察します。
この記事でやること
ゴール:Enterprise editionのFirestoreにcustomers / ordersコレクションを用意 → Pipelines APIで「JOINエンリッチ」「顧客別売上ランキング(集計+JOIN)」「担当者別集計(JOINしたフィールドでグルーピング)」を実行するところまでをPythonで通します。
[Firestore Enterprise: nxt26-pipelines-demo]
├─ customers コレクション … customer_id / name / segment / account_manager
└─ orders コレクション … order_id / customer_id / amount / status / order_date
│
▼
db.pipeline()
.collection("orders")
.define(...) ← 親ドキュメントの値を変数に束縛
.add_fields( ← 相関サブクエリ(= JOIN)
db.pipeline().collection("customers").where(... == Variable(...))
.to_scalar_expression()
)
.aggregate(...) / .sort(...) / .limit(...)
│
▼
JOIN・集計済みの結果(ETL不要・ライブデータに直接)
STEP 0. 準備:Enterprise editionのDBを作る
0-1. データベース作成
前提として、Firestore Pipelines APIはFirestoreがEnterprise editionである必要があります。
したがってJOINを使うにもEnterprise editionが要ります。
セットアップはgcloudで一発です(コンソールからは「エディション」でEnterpriseを選択)。
gcloud firestore databases create \
--database=nxt26-pipelines-demo \
--edition=enterprise \
--location=asia-northeast1 \
--enable-firestore-data-access
実行するとdatabaseEdition: ENTERPRISE / firestoreDataAccessMode: DATA_ACCESS_MODE_ENABLEDで作成されます。
0-2. Python環境
uv init && uv add google-cloud-firestore
0-3. ダミーデータ投入
from google.cloud import firestore
db = firestore.Client(database="nxt26-pipelines-demo")
customers = [
{"customer_id": "C001", "name": "Acme商事", "segment": "enterprise", "account_manager": "田中"},
{"customer_id": "C002", "name": "Beta工業", "segment": "smb", "account_manager": "佐藤"},
{"customer_id": "C003", "name": "Gamma物流", "segment": "enterprise", "account_manager": "田中"},
{"customer_id": "C004", "name": "Delta食品", "segment": "smb", "account_manager": "鈴木"},
]
orders = [
{"order_id": "O1001", "customer_id": "C001", "amount": 120000, "status": "paid", "order_date": "2026-06-01"},
{"order_id": "O1002", "customer_id": "C001", "amount": 80000, "status": "paid", "order_date": "2026-06-10"},
{"order_id": "O1003", "customer_id": "C002", "amount": 30000, "status": "pending", "order_date": "2026-06-12"},
{"order_id": "O1004", "customer_id": "C003", "amount": 250000, "status": "paid", "order_date": "2026-06-15"},
{"order_id": "O1005", "customer_id": "C003", "amount": 50000, "status": "paid", "order_date": "2026-06-18"},
{"order_id": "O1006", "customer_id": "C003", "amount": 90000, "status": "pending", "order_date": "2026-06-20"},
{"order_id": "O1007", "customer_id": "C004", "amount": 15000, "status": "paid", "order_date": "2026-06-21"},
{"order_id": "O1008", "customer_id": "C001", "amount": 200000, "status": "pending", "order_date": "2026-06-22"},
]
for c in customers:
db.collection("customers").document(c["customer_id"]).set(c)
for o in orders:
db.collection("orders").document(o["order_id"]).set(o)
STEP 1. まずは集計だけ:パイプラインの基本形
JOINの前に、Pipelinesの基礎を確認します。「全注文の合計金額と件数」をサーバー側で集計します。
from google.cloud.firestore_v1.pipeline_expressions import Field
snap = (
db.pipeline()
.collection("orders")
.aggregate(
Field.of("amount").sum().as_("total"),
Field.of("order_id").count().as_("cnt"),
)
.execute()
)
for r in snap:
print(r.data())
{'cnt': 8, 'total': 835000}
Field.of("col")でフィールドを参照し、.sum() / .count()などの集計関数を.as_("別名")で命名する、というSQLに近い書き方です。
STEP 2. 本題:コレクション間JOIN(相関サブクエリ)
ordersの各ドキュメントに、customersから引いた顧客名を付けます。
-
define()で、注文側のcustomer_idを変数oid_custに束縛する -
add_fields()の中にcustomersを引くサブパイプラインを書き、where(... == Variable("oid_cust"))で相関させ、to_scalar_expression()で1値に畳んで列にする
from google.cloud.firestore_v1.pipeline_expressions import Field, Variable
q1 = (
db.pipeline()
.collection("orders")
.define(Field.of("customer_id").as_("oid_cust")) # ① 親の値を変数へ
.add_fields(
db.pipeline() # ② 相関サブクエリ = JOIN
.collection("customers")
.where(Field.of("customer_id").equal(Variable("oid_cust")))
.select(Field.of("name").as_("cust_name"))
.to_scalar_expression()
.as_("customer_name")
)
.select("order_id", "customer_id", "amount", "status", "customer_name")
.sort(Field.of("amount").descending())
)
for r in q1.execute():
print(r.data())
{'order_id': 'O1004', 'customer_id': 'C003', 'amount': 250000, 'status': 'paid', 'customer_name': 'Gamma物流'}
{'order_id': 'O1008', 'customer_id': 'C001', 'amount': 200000, 'status': 'pending', 'customer_name': 'Acme商事'}
{'order_id': 'O1001', 'customer_id': 'C001', 'amount': 120000, 'status': 'paid', 'customer_name': 'Acme商事'}
{'order_id': 'O1006', 'customer_id': 'C003', 'amount': 90000, 'status': 'pending', 'customer_name': 'Gamma物流'}
{'order_id': 'O1002', 'customer_id': 'C001', 'amount': 80000, 'status': 'paid', 'customer_name': 'Acme商事'}
{'order_id': 'O1005', 'customer_id': 'C003', 'amount': 50000, 'status': 'paid', 'customer_name': 'Gamma物流'}
{'order_id': 'O1003', 'customer_id': 'C002', 'amount': 30000, 'status': 'pending', 'customer_name': 'Beta工業'}
{'order_id': 'O1007', 'customer_id': 'C004', 'amount': 15000, 'status': 'paid', 'customer_name': 'Delta食品'}
アプリ側でN+1ループを書かずに、サーバー側で突合された結果がそのまま返ってきました。
金額が降順で、customer_nameがJOINで付与されていることを確認してください。
これが従来Firestoreでできなかったことです。
STEP 3. ビジネスの本丸:顧客別売上ランキング(集計+JOIN)
「顧客マスタに、その顧客の注文合計・件数をぶら下げてランキング」
従来ならBigQueryにエクスポートしてGROUP BYしていた集計です。
customersを起点に、各顧客についてordersを相関サブクエリで集計(sumとcount)し、合計の降順に並べます。
q2 = (
db.pipeline()
.collection("customers")
.define(Field.of("customer_id").as_("cid"))
.add_fields(
db.pipeline().collection("orders")
.where(Field.of("customer_id").equal(Variable("cid")))
.aggregate(Field.of("amount").sum().as_("s"))
.to_scalar_expression().as_("total_amount"),
db.pipeline().collection("orders")
.where(Field.of("customer_id").equal(Variable("cid")))
.aggregate(Field.of("order_id").count().as_("c"))
.to_scalar_expression().as_("order_count"),
)
.sort(Field.of("total_amount").descending())
.select("name", "segment", "account_manager", "order_count", "total_amount")
)
for r in q2.execute():
print(r.data())
{'name': 'Acme商事', 'segment': 'enterprise', 'account_manager': '田中', 'order_count': 3, 'total_amount': 400000}
{'name': 'Gamma物流', 'segment': 'enterprise', 'account_manager': '田中', 'order_count': 3, 'total_amount': 390000}
{'name': 'Beta工業', 'segment': 'smb', 'account_manager': '佐藤', 'order_count': 1, 'total_amount': 30000}
{'name': 'Delta食品', 'segment': 'smb', 'account_manager': '鈴木', 'order_count': 1, 'total_amount': 15000}
マスタ項目(segment・担当者)と集計値が1つの結果セットに同居しています。これを管理画面のダッシュボードにそのまま流せるというのが効きどころです。
STEP 4. JOINしたフィールドでグルーピングする
続いて、担当者(account_manager)別の入金済み売上を出します。ordersには担当者情報が無いので、JOINで担当者を付けてからgroups=[...]でグルーピング集計します。status == "paid"のフィルタも効かせます。
q3 = (
db.pipeline()
.collection("orders")
.where(Field.of("status").equal("paid")) # 入金済みだけ
.define(Field.of("customer_id").as_("oid_cust"))
.add_fields(
db.pipeline().collection("customers")
.where(Field.of("customer_id").equal(Variable("oid_cust")))
.select(Field.of("account_manager").as_("mgr"))
.to_scalar_expression().as_("manager")
)
.aggregate(
Field.of("amount").sum().as_("paid_total"),
Field.of("order_id").count().as_("paid_orders"),
groups=["manager"], # JOIN したフィールドで GROUP BY
)
.sort(Field.of("paid_total").descending())
)
for r in q3.execute():
print(r.data())
{'manager': '田中', 'paid_orders': 4, 'paid_total': 500000}
{'manager': '鈴木', 'paid_orders': 1, 'paid_total': 15000}
検算すると:田中(C001/C003担当)のpaidは120,000+80,000+250,000+50,000=500,000(4件)、鈴木(C004)は15,000(1件)。佐藤(C002)の唯一の注文はpendingなので正しく除外されています。フィルタ → JOIN → グルーピング集計が、Firestoreのクエリ1本で意図通り動きました。
ここまでやって見えた「ビジネス的な意味」
意外性(NoSQLでJOIN!)だけの話ではありません。効くポイントは3つです。
-
「Firestore+BigQuery+ETL」の3点セットを畳める場面がある
- 「顧客別売上トップ10」「ステータス別の滞留件数」程度の運用クエリのために分析基盤を別建てしていたなら、ETLパイプラインの構築・保守コストがまるごと消える可能性があります。
-
ライブデータに直接効く
- BigQueryへのエクスポートはバッチ遅延を伴いますが、Pipelinesはアプリの本番データへその場で集計をかけられます。運用者が「今この瞬間」の数字を見られる。
-
MongoDBからの移行の受け皿になる
- FirestoreはMongoDB互換も推進中で、PipelinesはMongoDBのaggregationパイプライン(
$lookup等)の移行先として現実味があります。
- FirestoreはMongoDB互換も推進中で、PipelinesはMongoDBのaggregationパイプライン(
限界と思われる部分
-
BigQueryの完全代替にはなりえない
- 全社規模の大量スキャン・本格DWH分析は引き続きBigQueryの仕事。Pipelinesの価値はアプリのライブデータに対する軽量〜中量の運用クエリだと思われます。
-
相関サブクエリはスキャン量に直結する
- 「顧客ごとにordersを引く」構造は、件数が増えると読み取りユニット(=課金)が膨らみます。結合キー(今回なら
customer_id)にインデックスを張る、対象をwhereで絞る、といった設計が前提です。配列サブクエリ(to_array_expression)を含め、マテリアライズされるデータにはクエリ全体で128 MiBの上限がある点にも注意です。
- 「顧客ごとにordersを引く」構造は、件数が増えると読み取りユニット(=課金)が膨らみます。結合キー(今回なら
-
全文検索・地理空間クエリはまだPublic Preview
- JOIN・サブクエリはGA、DMLもupdate/deleteはGAですが(insert/transactionは順次提供)、検索系は提供状況を要確認です。
まとめ
Firestore Pipelines API(GA)は、長年NoSQLの弱点だったJOIN・サーバー側集計・グルーピングをFirestore単体で実現します。
効きどころは「アプリのライブデータに対する運用クエリ」。Firestore + BigQuery + ETLの3点セットを、ユースケース次第でFirestore1つにできます。
ただしEnterprise editionが必須、コストはスキャン量次第、検索系はPreview。「ETL前のその場クエリでどこまで戦えるか」の線引きはしっかりと行なってから採用するのが誠実でしょう。
NoSQLにJOINが来た、というインパクトは大きいですが、万能ではありません。まずは小さなコレクションで「JOIN+集計が本当に意図通り動くか」を体感し、スキャン量と相談しながら適用範囲を見極めるのがおすすめです。