45
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Apache Pinotで始めるリアルタイム分析

Last updated at Posted at 2023-12-08

はじめに

これはRetailAI Adventurers Advent Calendar 2023の9日目の記事です.
昨日は@atsukishさんの『画像生成AIで生成した画像はCNNで判別できる?』でした。
この記事ではApache Pinotでサンプルの購買データを使って集計し、ダッシュボードに表示してみます。

Apache Pinotとは?

まず、Apache Pinotについて簡単に説明します。
Apache Pinotとは低レイテンシーでOLAPクエリを実行するように設計されたリアルアイム分散OLAPデータストアです。
例えば、スーパーで顧客が買い物をするたびに発生する購買データをApache Pinotを使って素早くリアルタイムに集計できます。
スーパーでの購買データは多岐にわたりますが、Apache Pinotはその膨大なデータを瞬時に処理し、ダッシュボード上で直感的で分かりやすい形で表示できます。これにより、経営者やマーケターはリアルタイムで売上や顧客の嗜好などの重要な情報を把握できるようになります。
有名どころだとUberで使われているようです。

今回作るもの

それでは実際にPinotを使ってデータを集計し、ダッシュボードへ集計結果を表示してみたいと思います。
今回はサンプルの購買データを使って、店舗及び商品の部門別ごとにグルーピングした売上金額の合計を表示してみたいと思います。

Go用のクライアントライブラリが提供されているのでPinotからGoクライアント経由で集計データを抽出し、抽出したデータをREST API経由でクライアント側に返してダッシュボードに表示します。
また、ダッシュボードはRetailAI Adventurers Advent Calendar 2023の4日目の記事@yoshitake_tatsuhiroさんが紹介してくださったstreamlitを使います。

やってみた

詳しい実装内容はこちらにあげています。

コンテナの用意

docker-composeで起動させようと思います。
必要なPinotのコンポーネント及びGoのクライアントを用意します。

docker-compose.yml
version: "3.7"
services:
  pinot-zookeeper:
    image: zookeeper:latest
    container_name: pinot-zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  pinot-controller:
    image: apachepinot/pinot:latest
    command: "StartController -zkAddress pinot-zookeeper:2181"
    container_name: pinot-controller
    restart: unless-stopped
    ports:
      - "9000:9000"
    environment:
      JAVA_OPTS: "-Dplugins.dir=/opt/pinot/plugins -Xms1G -Xmx4G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xloggc:gc-pinot-controller.log"
    depends_on:
      - pinot-zookeeper
    volumes:
      - ./config:/config
      - ./data:/data
  pinot-broker:
    image: apachepinot/pinot:latest
    command: "StartBroker -zkAddress pinot-zookeeper:2181"
    restart: unless-stopped
    container_name: "pinot-broker"
    ports:
      - "8099:8099"
    environment:
      JAVA_OPTS: "-Dplugins.dir=/opt/pinot/plugins -Xms4G -Xmx4G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xloggc:gc-pinot-broker.log"
    depends_on:
      - pinot-controller
    volumes:
      - ./config:/config
      - ./data:/data
  pinot-server:
    image: apachepinot/pinot:latest
    command: "StartServer -zkAddress pinot-zookeeper:2181"
    restart: unless-stopped
    container_name: "pinot-server"
    ports:
      - "8098:8098"
    environment:
      JAVA_OPTS: "-Dplugins.dir=/opt/pinot/plugins -Xms4G -Xmx16G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xloggc:gc-pinot-server.log"
    depends_on:
      - pinot-broker
    volumes:
      - ./config:/config
      - ./data:/data
  app:
    image: pinot-client-go
    container_name: pinot_app
    build:
      context: .
      dockerfile: ./Dockerfile
      target: debug
    volumes:
      - .:/app
    ports:
      - 8000:8000
    restart: always
    depends_on:
      - pinot-server

データとテーブルの用意

データを投入する方法としてはApache Kafkaのようなストリーミングデータを取り扱えるonlineとGoogle Cloud Storageなどストレージサービスからバッジ処理として取り込むofflineの2つがありますが、今回はofflineでデータを入れています。
サンプルデータ用のcsvファイルとスキーマの定義を行います。

schema.json
{
  "schemaName": "sales",
  "dimensionFieldSpecs": [
    {
      "name": "date",
      "dataType": "STRING"
    },
    {
      "name": "storeCode",
      "dataType": "INT"
    },
    {
      "name": "store",
      "dataType": "STRING"
    },
    {
      "name": "divisionCode",
      "dataType": "INT"
    },
    {
      "name": "division",
      "dataType": "STRING"
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "totalPrice",
      "dataType": "INT"
    }
  ]
}

日付、店舗コード、店舗名、部門コード、部門、売上数のデータをサンプルで入れます。
今回は10万レコード程度用意しました。

sales.csv
date,storeCode,store,divisionCode,division,totalPrice
20230814,2,Kumamoto,2,味噌,610
20230925,6,Kagoshima,4,油,436
20231027,6,Kagoshima,4,油,938
20230907,1,Saga,0,米,961
20231018,0,Fukuoka,2,味噌,997
...

table.jsonではレプリカ数やインデックスなどテーブルに関する設定を行います。

table.json
{
  "tableName": "sales",
  "segmentsConfig": {
    "replication": "1",
    "schemaName": "sales"
  },
  "tableIndexConfig": {
    "invertedIndexColumns": [],
    "loadMode": "MMAP"
  },
  "tenants": {
    "broker": "DefaultTenant",
    "server": "DefaultTenant"
  },
  "tableType": "OFFLINE",
  "metadata": {}
}

job-spec.ymlはセグメントを生成・実行・プッシュするときに必要になります。

job-spec.yml
executionFrameworkSpec:
  name: "standalone"
  segmentGenerationJobRunnerClassName: "org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner"
  segmentTarPushJobRunnerClassName: "org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner"
  segmentUriPushJobRunnerClassName: "org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner"
jobType: SegmentCreationAndTarPush
inputDirURI: "/data/"
includeFileNamePattern: "glob:**/*.csv"
outputDirURI: "/opt/pinot/data/sales/segments/"
overwriteOutput: true
pushJobSpec:
  pushFileNamePattern: "glob:**/*.tar.gz"
pinotFSSpecs:
  - scheme: file
    className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
  dataFormat: "csv"
  className: "org.apache.pinot.plugin.inputformat.csv.CSVRecordReader"
  configClassName: "org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig"
tableSpec:
  tableName: "sales"
  schemaURI: "http://localhost:9000/tables/sales/schema"
  tableConfigURI: "http://localhost:9000/tables/sales"
pinotClusterSpecs:
  - controllerURI: "http://localhost:9000"

起動

コンテナを立ち上げた後、localhost:9000にアクセスすると以下のような画面が開きます。
スクリーンショット 2023-12-08 13.21.46.png

Pinotはいくつかのコンポーネントから成り立っており、その一つにコントローラーがあります。コントローラーは主に各コンポーネントの管理を行いますが、ダッシュボードのUIも提供しています。
ダッシュボードにアクセスして、実際にクエリを投げてみることも可能です。

それでは最後にダッシュボードにデータの表示を行いたいと思います。
詳しくは上記で挙げたリポジトリにコードがありますが、以下のようなレスポンスが返ってくるエンドポイントを用意しました。

curl -X GET http://localhost:8000/sales/summaries
{"data":[{"store_code":1,"store":"Saga","division_code":1,"division":"卵","total_price":1483670},{"store_code":1,"store":"Saga","division_code":0,"division":"米","total_price":1536817},{"store_code":6,"store":"Kagoshima","division_code":3,"division":"肉","total_price":1558020},{"store_code":5,"store":"Nagasaki","division_code":4,"division":"油","total_price":1535054},{"store_code":3,"store":"Oita","division_code":5,"division":"野菜","total_price":1586730},{"store_code":4,"store":"Miyazaki","division_code":2,"division":"味噌","total_price":1537278},{"store_code":5,"store":"Nagasaki","division_code":3,"division":"肉","total_price":1563557},{"store_code":1,"store":"Saga","division_code":2,"division":"味噌","total_price":1552472},{"store_code":1,"store":"Saga","division_code":5,"division":"野菜","total_price":1534440},{"store_code":2,"store":"Kumamoto","division_code":2,"division":"味噌","total_price":1561910},{"store_code":3,"store":"Oita","division_code":4,"division":"油","total_price":1529267},{"store_code":6,"store":"Kagoshima","division_code":5,"division":"野菜","total_price":1534618},{"store_code":5,"store":"Nagasaki","division_code":5,"division":"野菜","total_price":1588164},{"store_code":2,"store":"Kumamoto","division_code":0,"division":"米","total_price":1501309},{"store_code":3,"store":"Oita","division_code":0,"division":"米","total_price":1597682},{"store_code":0,"store":"Fukuoka","division_code":3,"division":"肉","total_price":1567797},{"store_code":6,"store":"Kagoshima","division_code":4,"division":"油","total_price":1575007},{"store_code":3,"store":"Oita","division_code":1,"division":"卵","total_price":1551815},{"store_code":1,"store":"Saga","division_code":4,"division":"油","total_price":1560918},{"store_code":2,"store":"Kumamoto","division_code":1,"division":"卵","total_price":1552825},{"store_code":6,"store":"Kagoshima","division_code":0,"division":"米","total_price":1491855},{"store_code":4,"store":"Miyazaki","division_code":3,"division":"肉","total_price":1568506},{"store_code":6,"store":"Kagoshima","division_code":1,"division":"卵","total_price":1574272},{"store_code":2,"store":"Kumamoto","division_code":4,"division":"油","total_price":1569607},{"store_code":-2147483648,"store":"null","division_code":-2147483648,"division":"null","total_price":0},{"store_code":0,"store":"Fukuoka","division_code":4,"division":"油","total_price":1529407},{"store_code":4,"store":"Miyazaki","division_code":5,"division":"野菜","total_price":1542876},{"store_code":1,"store":"Saga","division_code":3,"division":"肉","total_price":1558422},{"store_code":5,"store":"Nagasaki","division_code":2,"division":"味噌","total_price":1505121},{"store_code":6,"store":"Kagoshima","division_code":2,"division":"味噌","total_price":1513347},{"store_code":0,"store":"Fukuoka","division_code":2,"division":"味噌","total_price":1513401},{"store_code":4,"store":"Miyazaki","division_code":4,"division":"油","total_price":1535423},{"store_code":0,"store":"Fukuoka","division_code":5,"division":"野菜","total_price":1593589},{"store_code":2,"store":"Kumamoto","division_code":5,"division":"野菜","total_price":1519526},{"store_code":5,"store":"Nagasaki","division_code":0,"division":"米","total_price":1476175},{"store_code":4,"store":"Miyazaki","division_code":1,"division":"卵","total_price":1537356},{"store_code":3,"store":"Oita","division_code":2,"division":"味噌","total_price":1590498},{"store_code":2,"store":"Kumamoto","division_code":3,"division":"肉","total_price":1578098},{"store_code":4,"store":"Miyazaki","division_code":0,"division":"米","total_price":1553047},{"store_code":3,"store":"Oita","division_code":3,"division":"肉","total_price":1562534},{"store_code":0,"store":"Fukuoka","division_code":0,"division":"米","total_price":1539771},{"store_code":0,"store":"Fukuoka","division_code":1,"division":"卵","total_price":1555064},{"store_code":5,"store":"Nagasaki","division_code":1,"division":"卵","total_price":1546240}]}

上記のAPIを呼び、stremlitでダッシュボードを作成したらこんな感じで集計結果を表示できました!
newplot.png

ハマったこと

Pinotを使う上でハマったこととして2つありました。

①デフォルトではクエリのlimitが10件まで

例えば以下のようなクエリを実行したとき、該当するレコードが100件あってもPinotの場合デフォルトで10件しか結果が返されません。

select * from users (= select * from users limit 10 と同義)

そのため、10件よりも多いレコードを返して欲しい場合は明示的にlimitで定義する必要があります。

何故10件だけなのかと疑問に思った方が私以外にもいたようで、コミュニティに質問していらっしゃいました。
https://apache-pinot.slack.com/archives/CDRCA57FC/p1622050555163500
Pinotが大規模なデータを取り扱うことが前提だからこそ、limitをかけておかないとうっかり大量のデータが取れてしまい、サーバーに負荷が掛かるのを避けたいという理由だそうです。

②集計するカラムはmetricFieldSpecsで定義する

テーブルのスキーマを定義する際にフィールドの一つとしてmetricFieldSpecsがあります。これはsumやcountといった集計を行うカラムに対して指定するべきフィールドで、metricFieldSpecsで定義しないと正しく集計が行われません。
今回の例では部門及び店舗ごとにグルーピングされた購入金額の合計を取得したかったのでtotalPriceをmetricFieldSpecsで定義しています。

まとめ

Apache Pinotを使ってデータの集計からダッシュボードへの反映まで簡単にできました!
Retail AIでは顧客の購買データや商品の発注データなど、日々大量のデータを収集・蓄積しています。
それらのデータを上手く分析・活用することによって弊社の掲げているミッションである「流通のムダ・ムラ・ムリ」を減らすことに繋げたいですね!

それでは皆さま良いお年を〜:dragon:

参考

Apache Pinot公式

45
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
45
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?