🧭 はじめに
自分の配信を見返して
「どこが一番盛り上がってたんだろう?」
と思ったことはありませんか?
YouTubeライブのチャットは、視聴者の“熱量”がそのまま現れる場所。
「かわいい」「草」などの反応を集計してグラフにすれば、配信の盛り上がりタイミングが一目でわかります。
今回は、
👉 公式のYouTube APIだけを使って
👉 自分の配信チャットをリアルタイムに分析し
👉 SQLiteに15秒ごとの件数を保存する
という安全&シンプルなツールを作ります。
💡 出来上がるもの
自分の配信(ライブ中)を自動検出
チャットから「かわいい」「草」などを抽出
15秒ごとの出現回数をSQLiteに保存
再実行しても重複しない(INSERT OR REPLACE)
⚙️ 準備
1. YouTube APIの設定(Google Cloud側)
-
Google Cloud Consoleにアクセス
-
新しいプロジェクトを作成
-
「APIとサービス」→「ライブラリ」で YouTube Data API v3 を有効化
-
「認証情報」→「認証情報を作成」→ OAuthクライアントID → デスクトップアプリ
-
ダウンロードしたファイルを client_secret.json にリネームしてスクリプトと同じフォルダに置く
2. 必要なPythonライブラリを入れる
pip install google-api-python-client google-auth-oauthlib google-auth-httplib2
3. 初回実行時に認可する
最初の実行時にブラウザが開くので、
YouTubeを運営している 自分のGoogleアカウントで認可します。
これで token.json が作成され、次回から自動で再利用されます。
🐍 スクリプト本体(chat_peaks_rt.py)
以下をファイルとして保存してください。
そのままコピペでOKです(300行未満、コメント付き)。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
YouTube Live の自チャンネルのチャットをリアルタイム取得し、
指定ワードを15秒などのバケットで集計してSQLiteに保存する簡単ツール。
- 公式APIのみ使用(規約準拠)
- 自分の配信だけを対象(mine=True)
- 集計値のみを保存(本文やユーザー名は保存しない)
"""
import argparse
import datetime as dt
import os
import sqlite3
import sys
import time
from collections import defaultdict
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google_auth_oauthlib.flow import InstalledAppFlow
from google.oauth2.credentials import Credentials
SCOPES = ["https://www.googleapis.com/auth/youtube.readonly"]
CREDENTIALS_FILE = "client_secret.json"
TOKEN_FILE = "token.json"
# ========================== SQLite 関連 ==========================
def ensure_db(db_path: str):
conn = sqlite3.connect(db_path)
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS word_count (
video_id TEXT NOT NULL,
word TEXT NOT NULL,
second INTEGER NOT NULL,
count INTEGER NOT NULL,
PRIMARY KEY (video_id, word, second)
)
""")
conn.commit()
return conn
def upsert_counts(conn, video_id: str, word: str, bucket_counts: dict):
if not bucket_counts:
return
data = [(video_id, word, sec, cnt) for sec, cnt in bucket_counts.items()]
cur = conn.cursor()
cur.executemany("""
INSERT INTO word_count (video_id, word, second, count)
VALUES (?, ?, ?, ?)
ON CONFLICT(video_id, word, second) DO UPDATE SET count=excluded.count
""", data)
conn.commit()
# ========================== YouTube API 関連 ==========================
def get_youtube():
creds = None
if os.path.exists(TOKEN_FILE):
creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES)
if not creds or not creds.valid:
if not os.path.exists(CREDENTIALS_FILE):
print(f"[ERROR] {CREDENTIALS_FILE} がありません。Google Cloudから取得してください。")
sys.exit(1)
flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_FILE, SCOPES)
creds = flow.run_local_server(port=0)
with open(TOKEN_FILE, "w") as f:
f.write(creds.to_json())
return build("youtube", "v3", credentials=creds)
def find_active_broadcast(youtube):
resp = youtube.liveBroadcasts().list(
part="id,snippet,contentDetails,liveStreamingDetails",
broadcastStatus="active",
broadcastType="all",
mine=True
).execute()
items = resp.get("items", [])
if not items:
return None
return items[0]
def get_live_chat_messages(youtube, live_chat_id, page_token=None, max_results=200):
return youtube.liveChatMessages().list(
liveChatId=live_chat_id,
part="id,snippet",
pageToken=page_token,
maxResults=max_results
).execute()
# ========================== 集計ロジック ==========================
def iso_to_dt(iso_str: str) -> dt.datetime:
return dt.datetime.fromisoformat(iso_str.replace("Z", "+00:00"))
def floor_div_sec(delta: dt.timedelta, bucket: int) -> int:
return int(delta.total_seconds()) // bucket
def aggregate_loop(youtube, conn, words, bucket, flush_interval=10):
import re
live = find_active_broadcast(youtube)
if not live:
print("現在アクティブな配信が見つかりません。配信を開始してから再実行してください。")
return
video_id = live["id"]
snippet = live["snippet"]
details = live.get("liveStreamingDetails", {})
live_chat_id = snippet.get("liveChatId") or details.get("activeLiveChatId")
if not live_chat_id:
print("チャットが有効な配信を確認してください。")
return
start_iso = details.get("actualStartTime") or snippet.get("publishedAt")
start_dt = iso_to_dt(start_iso) if start_iso else dt.datetime.now(dt.timezone.utc)
print(f"[開始] video_id={video_id}")
print(f"[集計ワード] {words}")
print(f"[バケット秒] {bucket}")
compiled = [re.compile(w) for w in words]
page_token = None
in_memory_counts = [defaultdict(int) for _ in words]
last_flush = time.time()
try:
while True:
try:
resp = get_live_chat_messages(youtube, live_chat_id, page_token=page_token)
except HttpError as e:
print(f"[WARN] APIエラー: {e}. 5秒後に再試行します。")
time.sleep(5)
continue
items = resp.get("items", [])
if items:
for it in items:
snip = it["snippet"]
text = snip.get("displayMessage", "")
pub = iso_to_dt(snip["publishedAt"])
bucket_idx = floor_div_sec(pub - start_dt, bucket)
for j, pat in enumerate(compiled):
if pat.search(text):
in_memory_counts[j][bucket_idx] += 1
page_token = resp.get("nextPageToken")
wait_ms = resp.get("pollingIntervalMillis", 2000)
if time.time() - last_flush >= flush_interval:
for j, word in enumerate(words):
if in_memory_counts[j]:
upsert_counts(conn, video_id, word, in_memory_counts[j])
in_memory_counts[j].clear()
last_flush = time.time()
time.sleep(wait_ms / 1000.0)
except KeyboardInterrupt:
print("\n[INFO] Ctrl+Cで終了、最終フラッシュ中...")
for j, word in enumerate(words):
if in_memory_counts[j]:
upsert_counts(conn, video_id, word, in_memory_counts[j])
print("[完了] 集計を終了しました。")
# ========================== CLIエントリ ==========================
def main():
parser = argparse.ArgumentParser(description="YouTube Liveチャットのリアルタイム集計(自チャンネルのみ)")
parser.add_argument("--words", action="append", help="数えたい語。複数指定可。例: --words 'かわいい|可愛い' --words '草'")
parser.add_argument("--bucket", type=int, default=15, help="バケット秒。デフォルト15")
parser.add_argument("--db", type=str, default=None, help="出力SQLiteファイル名(省略時: YYYY-MM-DD.sqlite3)")
parser.add_argument("--flush", type=int, default=10, help="何秒ごとにDBへ保存するか(既定10秒)")
args = parser.parse_args()
words = args.words or ["かわいい|可愛い", "草"]
today = dt.datetime.now().date()
db_path = args.db or f"{today}.sqlite3"
conn = ensure_db(db_path)
yt = get_youtube()
try:
aggregate_loop(yt, conn, words, args.bucket, flush_interval=args.flush)
finally:
conn.close()
if __name__ == "__main__":
main()
▶️ 実行方法
配信を開始した状態で、以下を実行:
python chat_peaks_rt.py --words "かわいい|可愛い" --words "草" --bucket 15 --db out.sqlite3
終了するには Ctrl + C
。
実行中は定期的にチャットを取得し、out.sqlite3
に保存します。
📊 結果の確認(SQLiteで見る)
-- 特定ワードのピーク
SELECT second, count
FROM word_count
WHERE word='草'
ORDER BY count DESC
LIMIT 5;
🧠 仕組みの概要
-
liveBroadcasts.list(mine=True) で自分のライブを検出
-
liveChatMessages.list(liveChatId=...) でメッセージを定期取得
-
各メッセージの本文を正規表現でマッチ
-
配信開始からの経過秒数を15秒で割って「バケット番号」を作成
-
そのバケットごとの出現回数をSQLiteに蓄積
⚠️ 注意とポリシー(大切!)
-
自分のチャンネルでのみ利用してください
-
公式APIのみ使用しており、規約に準拠しています
-
保存されるのは集計値のみで、チャット本文やユーザー情報は記録しません
-
規約やポリシーは更新されるため、運用前に公式ドキュメントをご確認ください
🌱 応用アイデア
-
「拍手(8888)」「神」「やばい」などの拡張
-
配信後に .sqlite3 をグラフ化して「ハイライト秒」を抽出
🎁 まとめ
機能 | 説明 |
---|---|
🎬 対象 | 自分のYouTubeライブ(配信中) |
🧩 技術 | YouTube Data API v3 / Python / SQLite |
⏱️ 粒度 | 15秒ごとに「かわいい」「草」を集計 |
🔒 安全性 | 公式API+自分のチャンネルのみ |
📊 出力 | word_count(video_id, word, second, count) |
手軽に使える「自分専用アナリティクス」です。
数字で見える“配信の盛り上がり”を楽しんでみましょう!
✍️ 筆者メモ:
初心者でも「自分のデータで学ぶAPI活用」の練習として最適です。
コードを眺めるだけでも、YouTube APIの構造が理解できると思います。