22
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Quarkusで作る小売向け統計分析API

Last updated at Posted at 2025-12-21

はじめに

TRIAL&RetailAI Advent Calendar 2025 の 22日目の記事です。
昨日は@akaitigoさんの『Quarkus+Kotlin開発向けAgentSkillsを作ってみた。プロジェクト生成+エラー分析スキル
という記事でした。

私のPCのClaudeを確認したところ、MCPやAgentはありましたが、skillsはゼロ。
記事を見て、勉強させて頂きます。

自己紹介

TRIALの基盤システム部に所属しています。
基幹システムのバックエンドエンジニアです。

入社して4年が経ち、MacBookの入れ替えを行ったのですが、入社してからの4年間TouchIDをOFFで使用していました。

ただただ設定していなかったのです。
右上の小さいキーは電源の機能しか有していないものと思ってました。

データを上手に扱うには?

日々刻々と機能追加していく中で、テーブル構造も変わっていき、多くのシステムとのデータ連携も必要になってきます。貯めれば貯めるほどえげつない容量のテーブルになって、メンテやレスポンスなど問題も出てきます。

今の業務において、構造化データ・非構造化データ、様々なものを扱っていますが、
「非構造化データをストレージに貯めて参照する」方法ってどんなものがあるのか、AIと壁打ちしながら調べてみました。

  1. JSON / JSONB の参照
  2. Key-Value ストアによる参照
  3. ドキュメント指向DB
  4. 全文検索エンジン(インデックス参照)
  5. オブジェクトストレージ+メタデータ参照
  6. データレイク(Delta Lake / Parquet)
  7. ベクトル化(Embedding)による意味検索

1. JSON / JSONB

RDB(PostgreSQL など)に JSON / JSONB カラムを持たせ、
属性を非構造化のまま保持しつつ、必要なキーだけを参照する。

【特徴】
構造化と非構造化の中間
GIN / expression index で高速化可能

2. Key-Valueストア

Redis、Memcachedなど「キー、値」という最小構造で参照する。

【特徴】
超高速
検索は弱い(基本はキー指定)

3.ドキュメント指向DB

MongoDB、DynamoDB、FirestoreなどJSONドキュメント単位で保存・参照する。

【特徴】
完全スキーマレス
ネスト構造の参照が容易
JOIN は弱い

4.全文検索エンジン

ElasticsearchやOpenSearchなど、非構造化テキストを 検索用インデックスに変換して参照する。

【特徴】
高速全文検索
スコアリング・あいまい検索
正規化データの一次保存には不向き

5.オブジェクトストレージ+メタデータ参照

ファイル(非構造化)として保存し、参照用メタデータだけを別途管理する。

GCS / S3 + RDB
GCS + BigQuery External Table

構成イメージ

GCS:
  gs://bucket/data/2025/06/file.json

RDB:
  id | path | created_at | type

【特徴】
超大容量向き
安価
直接検索は不可(別レイヤーが必要)

6.データレイク

非構造化・半構造化データを列指向ファイル+メタ情報をして参照する。

参照方法

SELECT *
FROM delta.`/data/sales`
WHERE store_code = 10;

【特徴】
スキーマ進化(Schema Evolution)
時系列参照(Time Travel)
バッチ・分析向け

7.ベクトル化(Embedding)による意味検索

非構造化データを 数値ベクトルに変換し、「意味の近さ」で参照。pgvectorなど。

【特徴】
意味検索・類似検索
LLM / RAG と相性が良い

Delta Lake

小売にあるパターンだと「今日のxxx店の商品yyyはzzz円」と店ごとに金額も違えば、ある日を境に金額の増減は普通にあります。店ごと、日次集計、店舗別、期間別など色んなフォーマットがあるので、その為にRDBであれば都度必要なテーブルをJOINしなければならなかったりするわけです。

データレイク=「オブジェクトストレージ上のファイル」を中心にした基盤(RDBのようにテーブル固定ではない)です。

GCS/S3 などのオブジェクトストレージに、CSV/JSON/Parquet といったファイルを大量に貯め、後から分析・参照する考え方ですが、「フォルダにファイルを置くだけ」だと、更新・整合性・履歴・同時書き込みが弱く、実運用で破綻しがちです。

壁打ちしている中で、Delta Lakeの存在を知ります。

Delta Lake は、 Parquetデータファイルをファイルベースのトランザクションログで拡張し、ACID トランザクション とスケーラブルなメタデータ処理を実現するオープンソースのソフトウェアです。Delta Lake は Apache Spark APIと完全に互換性があり、構造化ストリーミングとの緊密な統合のために開発されたため、バッチ操作とストリーミング操作の両方でデータの単一コピーを簡単に使用でき、大規模な増分処理を提供できます。

なるほど。

課題 Delta Lake
同時更新 ACID トランザクション
更新・削除 MERGE / UPDATE / DELETE
履歴管理 Time Travel
品質保証 Schema Enforcement / Evolution
開発元 Databricks

オープンソース(Linux Foundation 管轄)
Delta Lake = Spark + Parquet + Transaction Log

データの内部構造

delta-data/
└── education/                              # テーブル名
    ├── _delta_log/                         # トランザクションログ
    │   └── 00000000000000000000.json       # コミット #0
    ├── year=2011/                          # パーティション
    │   └── part-00000-xxx.snappy.parquet   # データファイル
    ├── year=2012/
    │   └── part-00000-xxx.snappy.parquet
    └── ...

jsonファイルの中身

commitInfo

{
  "commitInfo": {
    "timestamp": 1766133254038,
    "operation": "WRITE",
    "operationParameters": {
      "mode": "Overwrite",
      "partitionBy": "[\"year\"]"
    },
    "isolationLevel": "Serializable",
    "isBlindAppend": false,
    "operationMetrics": {
      "numFiles": "12",
      "numOutputRows": "564",
      "numOutputBytes": "94958"
    },
    "engineInfo": "Apache-Spark/3.5.1 Delta-Lake/3.1.0",
    "txnId": "b617d685-886a-4266-a78d-4a890e965667"
  }
}
フィールド 説明
timestamp コミット時刻(Unix時間) 1766133254038
operation 操作種別 WRITE, MERGE, DELETE, UPDATE
operationParameters 操作パラメータ mode=Overwrite, partitionBy
isolationLevel トランザクション分離レベル Serializable
operationMetrics 実行メトリクス ファイル数、行数、バイト数
engineInfo 実行エンジン情報 Spark/Delta バージョン
txnId トランザクションID UUID

metaData

{
  "metaData": {
    "id": "efe25800-9225-4fa9-8e38-45bd1b4feeae",
    "format": {
      "provider": "parquet",
      "options": {}
    },
    "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"year\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"prefecture_code\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"kindergartens\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},...]}",
    "partitionColumns": ["year"],
    "configuration": {},
    "createdTime": 1766133253050
  }
}
フィールド 説明
id テーブルのユニークID
format.provider データ形式(常にparquet)
schemaString Sparkスキーマ(JSON形式)
partitionColumns パーティションカラム
createdTime テーブル作成時刻

protocol

{
  "protocol": {
    "minReaderVersion": 1,
    "minWriterVersion": 2
  }
}
フィールド 説明
minReaderVersion 読み取りに必要な最小プロトコルバージョン
minWriterVersion 書き込みに必要な最小プロトコルバージョン

add

{
  "add": {
    "path": "year=2011/part-00000-dd408092-92c5-4583-b97e-ade7072bd2b2.c000.snappy.parquet",
    "partitionValues": {"year": "2011"},
    "size": 7909,
    "modificationTime": 1766133253339,
    "dataChange": true,
    "stats": "{\"numRecords\":47,\"minValues\":{\"prefecture_code\":\"R01000\",\"kindergartens\":39,...},\"maxValues\":{\"prefecture_code\":\"R47000\",\"kindergartens\":1051,...},\"nullCount\":{\"prefecture_code\":0,...}}"
  }
}
フィールド 説明
path Parquetファイルのパス year=2011/part-xxx.parquet
partitionValues パーティション値 {"year": "2011"}
size ファイルサイズ(バイト) 7909
modificationTime 更新時刻 1766133253339
dataChange データ変更を含むか true
stats 統計情報(クエリ最適化用) 後述

stats

{
  "numRecords": 47,
  "minValues": {
    "prefecture_code": "R01000",
    "kindergartens": 39,
    "kindergarten_teachers": 452
  },
  "maxValues": {
    "prefecture_code": "R47000",
    "kindergartens": 1051,
    "kindergarten_teachers": 11122
  },
  "nullCount": {
    "prefecture_code": 0,
    "kindergartens": 0
  }
}

Snappy Parquetファイル

ファイル名の構造
part-00000-dd408092-92c5-4583-b97e-ade7072bd2b2.c000.snappy.parquet
 │     │                  │                      │     │       │
 │     │                  │                      │     │       └─ Parquet形式
 │     │                  │                      │     └─ Snappy圧縮
 │     │                  │                      └─ タスクID(c000 = 最初のタスク)
 │     │                  └─ ファイルUUID
 │     └─ パーティション番号(0から開始)
 └─ Sparkのデータファイルプレフィックス

Parquetの特徴

特徴 説明
カラム指向 列単位でデータを格納。特定列のみ読み取り可能
スキーマ埋め込み データとスキーマが一体化
効率的な圧縮 同じ型のデータが連続するため高圧縮率
述語プッシュダウン フィルタ条件をファイルレベルで評価

Snappy圧縮

項目 説明
圧縮アルゴリズム Google開発の高速圧縮
特徴 圧縮率より速度を重視
圧縮率 約2〜4倍(gzipより低いが高速)
用途 リアルタイム分析向け

データ構造まとめ

コンポーネント 役割
_delta_log/*.json トランザクションログ(ACIDの要)
commitInfo 操作履歴とメトリクス
metaData スキーマとパーティション定義
protocol 互換性バージョン管理
add/remove ファイル操作の記録
stats Data Skipping用の統計情報
*.snappy.parquet 実データ(カラム指向+圧縮)

この仕組みにより、Delta Lakeは以下を実現しています。

  • ✅ ACID トランザクション - トランザクションログによる原子性保証
  • ✅ Time Travel - 過去バージョンへのアクセス
  • ✅ Data Skipping - 統計情報による高速クエリ
  • ✅ Schema Evolution - スキーマ変更の追跡

使ってみますか・・・

データ準備

まずはストレージにデータを作成する必要があります。
かといって今回はアドベントカレンダー。
会社のデータ使用できないので、何か使えるデータはないものか・・・

先日の勉強会で存在を知り、良い機会なのでこのデータを使わさせて頂きます。

実際に使ったデータはSSDSE-Bになります。

47都道府県×12年次×多分野109項目  
人口、経済、教育、労働、医療、福祉など、様々な分野の統計データを、12年分の時系列で収録しています

カラムコード体系(SSDSE-B)

プレフィックス カテゴリ 含まれる指標
A 人口・世帯 総人口、年齢別人口、出生・死亡、転入・転出
B 自然環境 気温、降水量、日照時間
C 経済基盤 着工建築、宿泊施設、地価
E 教育 幼稚園、小学校、中学校、高校、大学
F 労働 求人・求職、失業率
G 観光 観光客数、宿泊者数
H 居住 新築住宅、持家・賃貸
I 環境 ごみ排出量、リサイクル率
J 医療・福祉 病院数、診療所数、保育所
L 家計 消費支出(食料、住居、被服など)

となっているので、

delta-data/prefecture_stats/
├── _delta_log/           # トランザクションログ
├── year=2011/            # 2011年データ(47都道府県)
├── year=2012/
├── ...
└── year=2022/

のようにカテゴリ別 + 年度パーティションにデータを分割させることにしました。

Quarkusで実装

使用したライブラリは以下になります。

ライブラリ バージョン 用途
Delta Lake (delta-spark_2.12) 3.1.0 ACIDトランザクション対応のレイクハウスストレージ
Apache Spark (spark-sql_2.12, spark-core_2.12) 3.5.1 分散データ処理エンジン
Hadoop (hadoop-common, hadoop-client) 3.3.6 分散ファイルシステムサポート
Quarkus REST Platform版 RESTful APIエンドポイント
Quarkus ARC Platform版 CDI(依存性注入)コンテナ

コードは
https://github.com/satoshihiraishi/quarkus-deltalake-sample
においています。

CSVデータを以下のように分割して格納しています。

テーブル レコード数 パーティション
prefectures 47 -
population 564 year (2011-2022)
vital_statistics 564 year (2011-2022)
weather 564 year (2011-2022)
education 564 year (2011-2022)
healthcare 564 year (2011-2022)
household_spending 564 year (2011-2022)

合計: 3,431レコード

スクリーンショット 2025-12-21 17.43.18.png
スクリーンショット 2025-12-21 17.44.13.png

簡単に動かしてみましたが、データ件数が少ないのでローカルのファイルを読み取るAPIでは200ms前後でAPIのレスポンスは返ってきました。
今の状態ではRDBと遜色もない状態なので、データ量を増やしてクラウドストレージだとどうなるのか、など確認したいと思います。

ファイル容量も、元のCSVが360KBで、データ分割したフォルダ自体も1.1MBだから、分割の仕方次第ではデータ量もそんなに増えることはないかな?

Time Travel、使う時間ありませんでした。

まとめ

Delta Lakeを使ってフォーマットを合わせれば、データレイクのデータをRDBのように扱うことができます。

次回予告

@azuma-takuya さんがGaaSについて書いてくれるようです。

RetailAIとTRIALではエンジニアを募集しています。

興味がある方はご連絡ください!

22
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
22
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?