1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Cursorで実装しながら、Discordの活動ログをDatabricksに流してDashboardを作った話

1
Posted at

1. はじめに

本題に入る前に、僕が何者かだけ先にお話しします。

慶應義塾大学の数理科学科で、データや統計を学んでいる大学生です。

並行して arcbricks 株式会社でもデータまわりの仕事をしています。業務と個人の学習の両方で Databricks を触ることが多く、Data Analyst / Machine Learning / Generative AI Engineer の Associate 資格に加え、Data Engineer Professional も取得しています。

もう一つの顔は、大学eスポーツサークルの Discord コミュニティの運営です。昨年度、僕はサークル代表として、サークルの再建を主導し、新入生の定着率を250%伸ばしたこともありますが、コミュニティが大きくなるほど「運営っぽいこと」の難易度はぐっと上がりました。

この記事で書くことは次のとおりです。

  • Discord API(Bot)でログ取得 → Databricks に Bronze → Silver → Gold で蓄積 → Dashboard で可視化、という Phase 1 の開発記録
  • 実装の全体像を一通りカバーした、手を動かしながらの記録

実装の要点は、Bot が PAT(Personal Access Token:Databricks への認証トークン)で Raw テーブルに直接書き込みし、その先を DLT(Delta Live Tables:Databricks のパイプライン機能)や Workflows で Silver・Gold までつなぐ形です。

2. 背景と目的

大学サークルの運営は、思ったより意思決定の連続です。

  • いつイベントを打つか
  • 何を告知するか
  • どのタイミングで投稿するか
  • 振り返りで何を改善するか

でも、その根拠はだいたい経験則。実際の運営でも、「人が集まりやすそうな日程」を勘で選ぶことが多かったです。

もちろん経験は大切です。ただ Discord にはログが残る。なら、そこから

  • 日ごとの活動量
  • 曜日別の傾向
  • 時間帯ごとの山
  • チャンネルごとの活発さ

くらいは、数として見せたい——と思いました。

やりたかったのは、勘を捨てることではなく、勘にデータを足すことです。
勘だけだと「たぶんこう」。グラフが付くと「少なくとも直近はこう」。差は大きいです(会議で「感覚的には」と言い切る回数が減るのは、精神衛生にも効きます)。

3. 全体像と技術スタック

流れはシンプルです。

  1. Discord Bot で活動データ(メッセージ・ボイス・チャンネル)を取得する
  2. Databricks に渡す(Bot が PAT で SQL Warehouse 経由で Raw テーブルに直接 INSERT)
  3. Bronze / Silver / Gold で整理する(DLT も使う)
  4. Lakehouse Dashboard で可視化する

全体像を図にすると、だいたい次の流れです。

Discord 側は、Developer Portal で Bot を作成し、Message Content Intent などを有効にして、Bot Token でイベントを受け取る想定です。Databricks 側は Unity Catalog が有効なワークスペースを使い、Bot からは PAT(Personal Access Token)と SQL Warehouse の HTTP パスで接続して、discord_messages_raw などの Raw テーブルに直接書き込みます。Discord の Bot Token はコードに埋めず、.env や Databricks Secrets で管理する前提です。

開発の体感としては Cursor をかなり使いました。API 連携、Databricks への書き込み、スキーマ設計、DLT、ダッシュボード用のクエリなど、細かい実装の往復が多いプロジェクトです。Cursor のプロジェクトルールも使いつつ、AI に任せすぎず自分で読んで直す前提で進めました。

実際の進め方は、だいたい次のループでした。

  • 要件を頭で考える
  • Cursor に叩き台を書かせる
  • 自分で読む(Discord API や Databricks の仕様に合わせて直す)
  • また Cursor に補助してもらう

実装の壁打ち相手としてかなり優秀だった、という印象です。

4. Discord Bot とデータ取得基盤

可視化の前にデータが必要です。最初の仕事は Discord から活動データを取ることでした。

ここは地味にハードルがあります。

  • Discord Developer Portal で Bot を作る
  • Message Content Intent などを有効にする
  • Bot Token を安全に管理する(.env や Secrets)
  • Bot をどこで常駐させ、どう Databricks に渡すかを決める

このプロジェクトでは、Bot を常時動かす場所として EC2(AWS の仮想サーバー)を用意し、そこで Bot を動かす形にしました。Bot はメッセージ受信・ボイス状態更新・チャンネル一覧を取得し、Databricks の SQL Warehouse に PAT で接続して、Raw テーブル(discord_messages_raw / discord_voice_activity_raw / discord_channels_raw)に直接 INSERT します。ローカルに JSON を落としてからアップロードするのではなく、Bot がそのまま Databricks に書く構成にしたので、「実行場所と受け渡し」の設計がそのまま「Bot + PAT + SQL Warehouse」になりました。

通常、この手のログは一度 JSONL などでローカルや S3 に吐き出してから DLT で読み込む構成にすることが多いですが、今回はインフラを極力シンプルにするため、Bot の Python から databricks-sql-connector を使って Raw テーブルに直接 INSERT しています。

実際のコードはおおむね次のとおりです。コネクションプーリングやトークン管理も別途意識しつつ、かなり泥臭く書いています。

bot.py
from typing import Any, Dict

from databricks import sql

def _db_insert_message_sync(data: Dict[str, Any]) -> None:
    """1件のメッセージを Databricks の Bronze(Raw)テーブルに直接 INSERT"""
    if not _DATABRICKS_ENABLED:
        return

    # PAT と Warehouse 情報を環境変数から取得して接続(セッション切れ再接続等は別関数でラップ)
    conn = _db_connect_sync()
    catalog, schema = _DB_CATALOG, _DB_SCHEMA

    sql_query = f"""
    INSERT INTO {catalog}.{schema}.discord_messages_raw (
        message_id, channel_id, channel_name, category_id, guild_id, guild_name,
        user_id, user_name, content, timestamp, edited_timestamp,
        attachment_count, reaction_count, is_pinned
    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """
    params = _build_message_insert_params(data)  # 14 列分を data から組み立て

    with conn.cursor() as cur:
        cur.execute(sql_query, params)
    conn.commit()

最初期は「まず Bot が動いて、Databricks に 1 行でも書けるか」を確認するところから始めました。この段階は地味ですがかなり大事で、ここが通らないと後ろの Bronze / Silver / Gold が全部宙に浮きます。華やかな可視化に行く前に、地味で泥くさい API と認証の山を越える。開発あるあるです(しかも誰も代わりに越えてはくれない)。

5. Databricks への認証(PAT)

Databricks を触るために PAT(Personal Access Token)を取得し、次の2系統で使いました。

  1. 自分(ローカル開発): DATABRICKS_HOSTDATABRICKS_TOKEN を設定し、CLI やノートブックからカタログ・スキーマ・テーブルを触る
  2. Bot(EC2 上): DATABRICKS_ACCESS_TOKEN と SQL Warehouse の SERVER_HOSTNAME / HTTP_PATH.env に置き、Bot が Raw テーブルに INSERT する

PAT が通ると、「少なくとも Databricks には触れる」「Unity Catalog も見える」「あとは流し込むだけ」みたいな気分になります。もちろん実際は“あとは流し込むだけ”では全然ないんですが、PAT が通ると人は強気になるのは事実です。この手の記事で「認証まわり」がふわっと終わることが多いので、どう認証したか・トークンはどう管理したかは、ちゃんと書いておく価値があります。

6. Bronze:Raw データの格納

Discord Bot から渡す先は、Databricks 上の Bronze 相当の Raw テーブルです。メダリオンアーキテクチャでは、

  • Bronze(Raw): API / Bot から取得した生データ
  • Silver: クレンジング・検証・正規化したデータ
  • Gold: 集計・可視化向けデータ

の三層にしています。Bot は PAT で Databricks に接続し、discord_messages_raw などに直接 INSERT するので、「取得 → どこかに保存 → 別ジョブで読みにいく」ではなく Bot がその場で Bronze 層に書き込む形です。Bronze は append と mergeSchema を前提にした生データ保持、Silver は DLT でクレンジング・検証、Gold は集計用で overwriteSchema を使う設計にしました。

この分け方にした理由は、次のとおりです。

  1. 生データを残したかった … 後で変換ロジックを直したくなるので、まずは生のまま残す
  2. 整形と可視化を分けたかった … Dashboard 用にその場で重い変換をかけるのはしんどいので、Silver で整えて Gold でまとめる
  3. 再実行しやすくしたかった … 再実行や冪等性を意識していたので、「1本のノートで全部やる」より層を分けたほうが先が楽そうだった

未来の自分は、だいたい今の自分より短気なので。

7. Silver:モデリングと DLT

Bronze に入れたら終わり、ではありません。Bronze はあくまで原材料です。

Silver では、分析や集計に使いやすいようにデータを整えました。データモデリングには 3NF(第3正規形)を採用し、「1事実1箇所」「冗長を減らして更新不整合を防ぐ」を意識してテーブルを切りました。そのうえで、ディメンション(次元)とファクト(事実)に分ける形にしています。

イメージとしては次のような構成です。

  • ディメンション
    • guild_dim … ギルド(サーバ)ごと1行
    • user_dim … ユーザごと1行
    • category_dim … チャンネルカテゴリごと1行
    • channel_dim … チャンネルごと・スナップショット日ごと1行(いつどのチャンネルが存在したか)
  • ファクト
    • message_fact … メッセージ1件1行(channel_id などでディメンションと結合)
    • voice_chat_fact … ボイスセッション1回1行(参加・退出時刻など)

「どのメッセージがどのチャンネル・どのユーザに属するか」をキーでつなげる形にしたので、Gold で曜日別・時間帯別・チャンネル別に集計するときに JOIN がしやすくなっています。実装は DLT(Delta Live Tables)で Silver を組み、クレンジング済みスキーマと検疫テーブル(不正データを分けて理由を残す)まで要件に入れています。

Silver 層の構築では DLT をフル活用しました。単に型変換するだけでなく、@dlt.expect でデータ品質を明示的に担保しています。ここで不正な行を検知(またはドロップ)しておくことで、Gold での集計エラーや「なぜか数字がおかしい」を防ぎます。

01_silver_cleansing_dlt_integrated.py
import dlt
from pyspark.sql import functions as F


@dlt.table(
    name="voice_chat_fact",
    comment="Silver fact: one row per unique voice session.",
    partition_cols=["session_date"],
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true",
    },
)
@dlt.expect("session_date_matches_joined_at", "DATE(joined_at) = session_date")
@dlt.expect("valid_session_duration", "left_at IS NULL OR left_at >= joined_at")
def voice_chat_fact():
    return _transform_voice_chat_fact()

ここで大事なのは、「取れたデータ」と「使えるデータ」は違うということです。API から取れた時点では嬉しいですが、欠損・型のブレ・想定外の値・JOIN しにくい構造を放置すると、可視化の段階で地獄を見ます。3NF で一度「使える形」に整えておくと、あとで Gold が気持ちよく作れます。

8. Gold:集計とボイス按分

Gold では、曜日・時間帯・日次・ユーザ別・チャンネル別で活動傾向を集計しています。実装では 4 本の Gold テーブルを軸にしています。

  • activity_by_weekday_hour … 曜日×時間帯のメッセージ数・ボイス時間(按分あり)
  • activity_daily … 日次のメッセージ数・ボイス時間(日をまたぐセッションは日ごとに按分)
  • user_activity … ユーザ別の活動量
  • channel_activity … チャンネル・カテゴリ別の活動量

ここで地味にハマったのがボイスセッションの按分です。たとえば 23:30 に参加して 01:15 に退出したとき、合計時間だけ出しても「どの時間帯のヒートマップを濃くすべきか」が分かりません(「深夜までボイスしてたの、どの曜日に入れる?」で頭を抱えました)。ボイスは数時間・日をまたぐことが多いので、「参加した 1 時間に全部乗せる」と他の時間帯が過小評価になります。

そこで PySpark の sequenceexplode で、1 回のセッションを 1 時間ごとのブロックに分割し、各ブロックに実在する秒数だけを按分するロジックにしました。日をまたぐ場合も同様に、その日に属する秒数だけを各 activity_date に配分しています。

01_gold_aggregation_dlt.py
from pyspark.sql import functions as F

SECONDS_PER_HOUR = 3600


def _transform_voice_by_weekday_hour_prorated():
    # ... 前処理で voice に joined_at / left_at 等を用意 ...

    # 1. 開始と終了を 1 時間刻みのシーケンスにし、explode で行に展開
    voice = voice.withColumn(
        "_hour_sec",
        F.explode(
            F.expr(f"sequence(_start_hour_sec, _end_hour_sec, {SECONDS_PER_HOUR})")
        ),
    )

    # 2. 各「時間枠(hour_bucket)」内に実在した区間をクリッピング
    voice = voice.withColumn(
        "hour_bucket", F.to_timestamp(F.col("_hour_sec").cast("long"))
    ).withColumn(
        "segment_start", F.greatest(F.col("joined_at"), F.col("hour_bucket"))
    ).withColumn(
        "segment_end",
        F.least(F.col("left_at"), F.col("hour_bucket") + F.expr("INTERVAL 1 HOUR")),
    ).withColumn(
        "duration_seconds",
        (
            F.unix_timestamp("segment_end") - F.unix_timestamp("segment_start")
        ).cast("double"),
    ).filter(F.col("duration_seconds") > 0)

    # ... この後 (weekday, hour_slot) で GROUP BY して集計 ...

この按分ロジックのおかげで、長時間の通話でもヒートマップに自然に乗るようになりました。

4 本そろうと、「最近盛り上がってるか」を日次で見る(activity_daily)、「どの曜日のどの時間が活発か」をヒートマップで見る(activity_by_weekday_hour)、「どのチャンネルがよく使われているか」(channel_activity)が一気に話せるようになります。ここまで来ると、Discord のログが運営の会話に使えるデータになってきます。Gold は mode("overwrite")overwriteSchema でフルリフレッシュしやすい設計にしてあり、試行錯誤がしやすいようにしています。

9. Workflows によるパイプライン運用

単発でノートブックを回して終わりではなく、取得(Bot)→ Bronze(Bot が直接書く)→ Silver → Gold の流れのうち、Silver 以降を Databricks Workflows に乗せる前提で組みました。日次バッチでの取得、データ鮮度、実行時間、冪等性まで、指標として意識しています。

ここまで来ると、単なる分析ノートではなく、運用寄りのデータ基盤に近づきます(どう実行するか・どう再実行するか・どこまで成功とみなすかが見えてくる、という意味で)。本番級の監視や可用性まではまだ求めていませんが、「とりあえず Workflows で回る」が見えているだけでも、だいぶ気持ちが楽になります。

10. Lakehouse Dashboard

ここが一番テンションが上がったところです。

データを取って、Databricks に入れて、整えて、集計して、最後に Lakehouse ダッシュボードに載せる。Time series・Bar chart・Heatmap のようなウィジェットを置いて、曜日×時間帯の活動量や日次のトレンドを見られるようにしました。

Dashboard に載ると、一気に「それっぽく」なります。それまでは「JSON がある」「テーブルがある」「SQL がある」という状態で、正直まだ地味です。でも Dashboard に並んだ瞬間、日ごとの活動量・曜日ごとの偏り・曜日×時間帯のヒートマップ・チャンネルごとの活性度が、一気に会議で見ながら話せる材料になります。可視化の効果は大きい。個人的にヒートマップが好きで、コミュニティがいつ活発かが一目で分かるのがうれしいです。

実際に組んだ Lakehouse Dashboard の画面例です。

Lakehouse Dashboard の可視化例(1)

Lakehouse Dashboard の可視化例(2)

ダッシュボードでは、Gold の 4 本(activity_by_weekday_hour, activity_daily, user_activity, channel_activity)を参照する形にしています。

11. Cursor を活用した開発

今回の開発で地味に大きかったのは、Cursor を使いながら進められたことです。

この手のプロジェクトは実装タスクが細かいです。Discord Bot の取得コード、Databricks 接続まわり、Delta への書き込み、DDL や SQL、Bronze / Silver / Gold の整理、ダッシュボード用クエリの叩き台……。全部をゼロから手で書くとかなり重いです。

Cursor がよかったのは、実装の初速を上げてくれることでした。とくに「この責務なら関数をこう分けたほうがよさそう」「この SQL の叩き台を出したい」「Databricks 向けにこの処理を整理したい」みたいな場面ではかなり助かりました。もちろんそのまま信用すると危ないです。Discord API も Databricks も、実行環境・権限・書き込み仕様がすべてなので、最後は自分で読む必要があります。それでも最初の一歩と修正の往復を速くしてくれるという意味で、かなりよかったです。

12. 進捗と今後の課題

このプロジェクトは二人(@cheng_wang)で開発していて、僕は arcbricks 株式会社でインターンをしながら、実装に加えてプロジェクトマネージャーとしてタスクや進捗を管理しています。

ここまで来ると、最初の「なんとなく運営している」状態からは、ちゃんと前に進んだ感じがあります。勘は引き続き大事です。でも、そこに直近の日次推移・曜日別傾向・時間帯ヒートマップ・チャンネル別活動量が加わると、会話の質が変わります。

今の段階を一言で言うなら、運営の感覚に、ようやくグラフが追いついてきた、という感じです。

13. おわりに

今回は、大学の Discord コミュニティ運営の中で感じていた課題から出発して、Discord Bot × PAT × Databricks(Bronze→Silver→Gold)× Dashboard で活動ログを可視化し始めた話を書きました。

まだやりたいことはあります。Bot の安定化(Docker 化や EC2 の整理)、Workflows の運用の詰め、Dashboard の見やすさ、LLM実装やダッシュボード改良など。それでも、まずは Dashboard でいろいろ見えるところまで来たことが、大きな進捗でした。

サークル運営のように一見ふわっとしたテーマでも、Discord API と Databricks を使えば、きちんとデータ基盤の題材になる。それが今回かなり面白かったところです。同じように「コミュニティの活動を可視化したい」と思っている人がいれば、この記事が少しでも参考になれば嬉しいです。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?