5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

NoSQLでJOINする日が来た:Firestore Pipelines API(コレクション間JOIN)を実機で試してみた【Next '26】

5
Posted at

こんにちは!
KDDIアイレットの取り組みとして6月22日〜7月3日の期間で開催中の「Google Cloud Next '26 / Google I/Oやってみた系ブログリレー」、本日は9日目の投稿です。
今回は「Firestore」を対象に、実際に検証してみた様子をお届けします!

前回の記事はこちらです。

はじめに

Firestore(NoSQLドキュメントDB)を本気で使ったことがある人なら、一度はこう思ったはずです。

JOINができず、サーバー側の集計も貧弱。だから結局アプリ側で突合するか、BigQueryに流して分析するしかない

Firestoreは読み書きのスケールとリアルタイム同期に強い一方で、構造的にコレクションをまたいだJOINができず、サーバー側集計もcount / sum / average程度に限られていました。その結果、現場はだいたい次のどれかを強いられます。

  • 過剰な非正規化
  • アプリ側JOIN
  • BigQueryへエクスポートして分析

Google Cloud Next '26でGAになったFirestore Pipelines APIは、ここに正面から手を入れてきました。collection → where → aggregate → sort → limitをパイプラインで繋ぎ、さらに相関サブクエリでコレクション間JOINまでサーバー側で実行できます。

本記事では「顧客マスタ×注文」という、誰のシステムにもある題材で、JOIN・集計・グルーピングをFirestoreだけで完結させてみます。最後に「どこまで実用になるか/どこが落とし穴か」を考察します。

この記事でやること

ゴール:Enterprise editionのFirestoreにcustomers / ordersコレクションを用意 → Pipelines APIで「JOINエンリッチ」「顧客別売上ランキング(集計+JOIN)」「担当者別集計(JOINしたフィールドでグルーピング)」を実行するところまでをPythonで通します。

[Firestore Enterprise: nxt26-pipelines-demo]
   ├─ customers コレクション  … customer_id / name / segment / account_manager
   └─ orders   コレクション  … order_id / customer_id / amount / status / order_date
          │
          ▼
   db.pipeline()
     .collection("orders")
     .define(...)          ← 親ドキュメントの値を変数に束縛
     .add_fields(          ← 相関サブクエリ(= JOIN)
        db.pipeline().collection("customers").where(... == Variable(...))
          .to_scalar_expression()
     )
     .aggregate(...) / .sort(...) / .limit(...)
          │
          ▼
   JOIN・集計済みの結果(ETL不要・ライブデータに直接)

STEP 0. 準備:Enterprise editionのDBを作る

0-1. データベース作成

前提として、Firestore Pipelines APIはFirestoreがEnterprise editionである必要があります。
したがってJOINを使うにもEnterprise editionが要ります。
セットアップはgcloudで一発です(コンソールからは「エディション」でEnterpriseを選択)。

gcloud firestore databases create \
  --database=nxt26-pipelines-demo \
  --edition=enterprise \
  --location=asia-northeast1 \
  --enable-firestore-data-access

実行するとdatabaseEdition: ENTERPRISE / firestoreDataAccessMode: DATA_ACCESS_MODE_ENABLEDで作成されます。

0-2. Python環境

uv init && uv add google-cloud-firestore

0-3. ダミーデータ投入

from google.cloud import firestore
db = firestore.Client(database="nxt26-pipelines-demo")

customers = [
    {"customer_id": "C001", "name": "Acme商事",  "segment": "enterprise", "account_manager": "田中"},
    {"customer_id": "C002", "name": "Beta工業",  "segment": "smb",        "account_manager": "佐藤"},
    {"customer_id": "C003", "name": "Gamma物流", "segment": "enterprise", "account_manager": "田中"},
    {"customer_id": "C004", "name": "Delta食品", "segment": "smb",        "account_manager": "鈴木"},
]
orders = [
    {"order_id": "O1001", "customer_id": "C001", "amount": 120000, "status": "paid",    "order_date": "2026-06-01"},
    {"order_id": "O1002", "customer_id": "C001", "amount":  80000, "status": "paid",    "order_date": "2026-06-10"},
    {"order_id": "O1003", "customer_id": "C002", "amount":  30000, "status": "pending", "order_date": "2026-06-12"},
    {"order_id": "O1004", "customer_id": "C003", "amount": 250000, "status": "paid",    "order_date": "2026-06-15"},
    {"order_id": "O1005", "customer_id": "C003", "amount":  50000, "status": "paid",    "order_date": "2026-06-18"},
    {"order_id": "O1006", "customer_id": "C003", "amount":  90000, "status": "pending", "order_date": "2026-06-20"},
    {"order_id": "O1007", "customer_id": "C004", "amount":  15000, "status": "paid",    "order_date": "2026-06-21"},
    {"order_id": "O1008", "customer_id": "C001", "amount": 200000, "status": "pending", "order_date": "2026-06-22"},
]
for c in customers:
    db.collection("customers").document(c["customer_id"]).set(c)
for o in orders:
    db.collection("orders").document(o["order_id"]).set(o)

STEP 1. まずは集計だけ:パイプラインの基本形

JOINの前に、Pipelinesの基礎を確認します。「全注文の合計金額と件数」をサーバー側で集計します。

from google.cloud.firestore_v1.pipeline_expressions import Field

snap = (
    db.pipeline()
    .collection("orders")
    .aggregate(
        Field.of("amount").sum().as_("total"),
        Field.of("order_id").count().as_("cnt"),
    )
    .execute()
)
for r in snap:
    print(r.data())
{'cnt': 8, 'total': 835000}

Field.of("col")でフィールドを参照し、.sum() / .count()などの集計関数を.as_("別名")で命名する、というSQLに近い書き方です。

STEP 2. 本題:コレクション間JOIN(相関サブクエリ)

ordersの各ドキュメントに、customersから引いた顧客名を付けます。

  1. define()で、注文側のcustomer_idを変数oid_custに束縛する
  2. add_fields()の中にcustomersを引くサブパイプラインを書き、where(... == Variable("oid_cust"))で相関させ、to_scalar_expression()で1値に畳んで列にする
from google.cloud.firestore_v1.pipeline_expressions import Field, Variable

q1 = (
    db.pipeline()
    .collection("orders")
    .define(Field.of("customer_id").as_("oid_cust"))      # ① 親の値を変数へ
    .add_fields(
        db.pipeline()                                     # ② 相関サブクエリ = JOIN
        .collection("customers")
        .where(Field.of("customer_id").equal(Variable("oid_cust")))
        .select(Field.of("name").as_("cust_name"))
        .to_scalar_expression()
        .as_("customer_name")
    )
    .select("order_id", "customer_id", "amount", "status", "customer_name")
    .sort(Field.of("amount").descending())
)
for r in q1.execute():
    print(r.data())
{'order_id': 'O1004', 'customer_id': 'C003', 'amount': 250000, 'status': 'paid',    'customer_name': 'Gamma物流'}
{'order_id': 'O1008', 'customer_id': 'C001', 'amount': 200000, 'status': 'pending', 'customer_name': 'Acme商事'}
{'order_id': 'O1001', 'customer_id': 'C001', 'amount': 120000, 'status': 'paid',    'customer_name': 'Acme商事'}
{'order_id': 'O1006', 'customer_id': 'C003', 'amount':  90000, 'status': 'pending', 'customer_name': 'Gamma物流'}
{'order_id': 'O1002', 'customer_id': 'C001', 'amount':  80000, 'status': 'paid',    'customer_name': 'Acme商事'}
{'order_id': 'O1005', 'customer_id': 'C003', 'amount':  50000, 'status': 'paid',    'customer_name': 'Gamma物流'}
{'order_id': 'O1003', 'customer_id': 'C002', 'amount':  30000, 'status': 'pending', 'customer_name': 'Beta工業'}
{'order_id': 'O1007', 'customer_id': 'C004', 'amount':  15000, 'status': 'paid',    'customer_name': 'Delta食品'}

アプリ側でN+1ループを書かずに、サーバー側で突合された結果がそのまま返ってきました。
金額が降順で、customer_nameがJOINで付与されていることを確認してください。
これが従来Firestoreでできなかったことです。

STEP 3. ビジネスの本丸:顧客別売上ランキング(集計+JOIN)

「顧客マスタに、その顧客の注文合計・件数をぶら下げてランキング」
従来ならBigQueryにエクスポートしてGROUP BYしていた集計です。

customersを起点に、各顧客についてordersを相関サブクエリで集計(sumcount)し、合計の降順に並べます。

q2 = (
    db.pipeline()
    .collection("customers")
    .define(Field.of("customer_id").as_("cid"))
    .add_fields(
        db.pipeline().collection("orders")
          .where(Field.of("customer_id").equal(Variable("cid")))
          .aggregate(Field.of("amount").sum().as_("s"))
          .to_scalar_expression().as_("total_amount"),
        db.pipeline().collection("orders")
          .where(Field.of("customer_id").equal(Variable("cid")))
          .aggregate(Field.of("order_id").count().as_("c"))
          .to_scalar_expression().as_("order_count"),
    )
    .sort(Field.of("total_amount").descending())
    .select("name", "segment", "account_manager", "order_count", "total_amount")
)
for r in q2.execute():
    print(r.data())
{'name': 'Acme商事',  'segment': 'enterprise', 'account_manager': '田中', 'order_count': 3, 'total_amount': 400000}
{'name': 'Gamma物流', 'segment': 'enterprise', 'account_manager': '田中', 'order_count': 3, 'total_amount': 390000}
{'name': 'Beta工業',  'segment': 'smb',        'account_manager': '佐藤', 'order_count': 1, 'total_amount':  30000}
{'name': 'Delta食品', 'segment': 'smb',        'account_manager': '鈴木', 'order_count': 1, 'total_amount':  15000}

マスタ項目(segment・担当者)と集計値が1つの結果セットに同居しています。これを管理画面のダッシュボードにそのまま流せるというのが効きどころです。

STEP 4. JOINしたフィールドでグルーピングする

続いて、担当者(account_manager)別の入金済み売上を出します。ordersには担当者情報が無いので、JOINで担当者を付けてからgroups=[...]でグルーピング集計します。status == "paid"のフィルタも効かせます。

q3 = (
    db.pipeline()
    .collection("orders")
    .where(Field.of("status").equal("paid"))                 # 入金済みだけ
    .define(Field.of("customer_id").as_("oid_cust"))
    .add_fields(
        db.pipeline().collection("customers")
          .where(Field.of("customer_id").equal(Variable("oid_cust")))
          .select(Field.of("account_manager").as_("mgr"))
          .to_scalar_expression().as_("manager")
    )
    .aggregate(
        Field.of("amount").sum().as_("paid_total"),
        Field.of("order_id").count().as_("paid_orders"),
        groups=["manager"],                                   # JOIN したフィールドで GROUP BY
    )
    .sort(Field.of("paid_total").descending())
)
for r in q3.execute():
    print(r.data())
{'manager': '田中', 'paid_orders': 4, 'paid_total': 500000}
{'manager': '鈴木', 'paid_orders': 1, 'paid_total':  15000}

検算すると:田中(C001/C003担当)のpaidは120,000+80,000+250,000+50,000=500,000(4件)、鈴木(C004)は15,000(1件)。佐藤(C002)の唯一の注文はpendingなので正しく除外されています。フィルタ → JOIN → グルーピング集計が、Firestoreのクエリ1本で意図通り動きました。

ここまでやって見えた「ビジネス的な意味」

意外性(NoSQLでJOIN!)だけの話ではありません。効くポイントは3つです。

  • 「Firestore+BigQuery+ETL」の3点セットを畳める場面がある
    • 「顧客別売上トップ10」「ステータス別の滞留件数」程度の運用クエリのために分析基盤を別建てしていたなら、ETLパイプラインの構築・保守コストがまるごと消える可能性があります。
  • ライブデータに直接効く
    • BigQueryへのエクスポートはバッチ遅延を伴いますが、Pipelinesはアプリの本番データへその場で集計をかけられます。運用者が「今この瞬間」の数字を見られる。
  • MongoDBからの移行の受け皿になる
    • FirestoreはMongoDB互換も推進中で、PipelinesはMongoDBのaggregationパイプライン($lookup等)の移行先として現実味があります。

限界と思われる部分

  • BigQueryの完全代替にはなりえない
    • 全社規模の大量スキャン・本格DWH分析は引き続きBigQueryの仕事。Pipelinesの価値はアプリのライブデータに対する軽量〜中量の運用クエリだと思われます。
  • 相関サブクエリはスキャン量に直結する
    • 「顧客ごとにordersを引く」構造は、件数が増えると読み取りユニット(=課金)が膨らみます。結合キー(今回ならcustomer_id)にインデックスを張る、対象をwhereで絞る、といった設計が前提です。配列サブクエリ(to_array_expression)を含め、マテリアライズされるデータにはクエリ全体で128 MiBの上限がある点にも注意です。
  • 全文検索・地理空間クエリはまだPublic Preview
    • JOIN・サブクエリはGA、DMLもupdate/deleteはGAですが(insert/transactionは順次提供)、検索系は提供状況を要確認です。

まとめ

Firestore Pipelines API(GA)は、長年NoSQLの弱点だったJOIN・サーバー側集計・グルーピングをFirestore単体で実現します。
効きどころは「アプリのライブデータに対する運用クエリ」。Firestore + BigQuery + ETLの3点セットを、ユースケース次第でFirestore1つにできます。
ただしEnterprise editionが必須、コストはスキャン量次第、検索系はPreview。「ETL前のその場クエリでどこまで戦えるか」の線引きはしっかりと行なってから採用するのが誠実でしょう。

NoSQLにJOINが来た、というインパクトは大きいですが、万能ではありません。まずは小さなコレクションで「JOIN+集計が本当に意図通り動くか」を体感し、スキャン量と相談しながら適用範囲を見極めるのがおすすめです。


参考(執筆時に確認したページ)

5
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?