はじめに
趣味作業中は配信を視聴しながらが多いのですが、ふとモニターを見たときに Youtube のライブチャット欄が見づらいなと思っていました。
ライブチャット関連のアプリはいろいろと存在するようですが、Python と TypeScript の学習もしたかったので、GitHub Copilot のエージェントと一緒に YouTube のライブチャットだけをリアルタイムに表示する Web アプリを作ってみました。
バックエンドが Python + FastAPI、フロントエンドが TypeScript + React という今風?な構成にしています。
つくったもの
Youtube の配信URLを指定することで、ライブチャットをリアルタイム表示します。
バックエンド LiveChat API
https://github.com/kerobot/livechatapi
フロントエンド LiveChat Viewer
https://github.com/kerobot/livechatviewer
主な機能
- YouTube動画IDを入力してライブチャット取得
- リアルタイムでメッセージを表示
- ユーザーアバター・バッジ表示
- 自動ポーリング(APIが推奨する間隔で更新)
- エラーハンドリング
バックエンド(livechatapi)の技術・実装
採用技術
- Python 3
- FastAPI
- 非同期処理(async/await)
- Pydantic モデル
- YouTube Data API
実装の特徴
- FastAPI で API サーバーを立てて、YouTube Data API からライブチャットのメッセージを定期的に取得してます
- 非同期でリクエストを投げることで、複数ライブや大量コメントもサクサク捌けます
- チャット取得は一定間隔でポーリングしてます。WebSocket はできれば今後対応したいです
- レスポンスは JSON で、フロント側が扱いやすいように整形してます
- シンプル設計なので、他サービスのチャット API とかにも応用しやすいと思います
レート制限対応HTTPクライアントの実装
livechatapi\app\utils\http_client.py
import time
import random
import requests
from typing import Optional, Dict, Any
from app.config import ENVIRONMENT
import logging
logger = logging.getLogger(__name__)
class RateLimitedHTTPClient:
"""レート制限対応のHTTPクライアント"""
def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
self.max_retries = max_retries
self.base_delay = base_delay
self.session = requests.Session()
# 環境別User-Agent設定
user_agent = f"LiveChatAPI/1.0 ({ENVIRONMENT})"
if ENVIRONMENT == "production":
user_agent += " (Production)"
self.session.headers.update({"User-Agent": user_agent})
def get_with_retry(self, url: str, params: Optional[Dict] = None) -> Dict[Any, Any]:
"""指数バックオフとジッターを使ったリトライ機能付きGET"""
for attempt in range(self.max_retries + 1):
try:
# 環境別タイムアウト設定
timeout = 30 if ENVIRONMENT == "production" else 10
response = self.session.get(url, params=params, timeout=timeout)
# レート制限チェック
if response.status_code == 429:
if attempt == self.max_retries:
raise Exception("Rate limit exceeded after all retries")
retry_after = response.headers.get("Retry-After")
if retry_after:
delay = int(retry_after)
else:
delay = self.base_delay * (2**attempt) + random.uniform(0, 1)
logger.warning(
f"Rate limited. Retrying in {delay:.2f} seconds... (attempt {attempt + 1})"
)
time.sleep(delay)
continue
response.raise_for_status()
data = response.json()
# YouTube APIエラーチェック
if "error" in data:
error = data["error"]
if (
error.get("code") == 403
and "quota" in error.get("message", "").lower()
):
raise Exception("YouTube API quota exceeded")
raise Exception(f"YouTube API Error: {error.get('message')}")
logger.debug(f"Request successful: {url}")
return data
except requests.RequestException as e:
if attempt == self.max_retries:
raise Exception(
f"Request failed after {self.max_retries} retries: {e}"
)
delay = self.base_delay * (2**attempt) + random.uniform(0, 1)
logger.warning(
f"Request failed (attempt {attempt + 1}). Retrying in {delay:.2f} seconds..."
)
time.sleep(delay)
raise Exception("Unexpected error in retry logic")
def close(self):
"""セッションを閉じる"""
self.session.close()
YouTube Data APIのような外部APIを利用する場合、レート制限を考慮する必要がありますので、今回の実装では、指数バックオフとジッターを組み合わせた HTTP クライアントを作成してみました。
環境に応じた初期化や設定切り替え
user_agent = f"LiveChatAPI/1.0 ({ENVIRONMENT})"
if ENVIRONMENT == "production":
user_agent += " (Production)"
クライアント初期化時に環境変数に応じて User-Agent を動的に設定しています。API 提供側でトラフィックの分析や問題を切り分けやすくするためです。
本番環境では明示的に "Production" を付与することで、開発・テスト環境と区別できるようにしています。
また、環境に応じてタイムアウト時間を切り替え、開発効率と本番安定性のバランスをとっています。
timeout = 30 if ENVIRONMENT == "production" else 10
HTTP 429 に対するリトライ制御
HTTP 429(Too Many Requests)エラーに対する処理は特に重要なので、サーバーが Retry-After
ヘッダーを提供する場合はその値に従い、そうでない場合は指数バックオフ base_delay * (2**attempt)
にジッター random.uniform(0, 1)
を加算しています。
if retry_after:
delay = int(retry_after)
else:
delay = self.base_delay * (2**attempt) + random.uniform(0, 1)
ジッターを加算することで、複数のクライアントが同時にリトライする際の「サンダリング・ハード問題」を回避します。
参考:サンダリング・ハード問題
YouTube API固有のエラー制御
YouTube API は HTTP 200 ステータスでもレスポンスボディにエラー情報を含む場合があります。クォータ制限(403エラー)があるため、これを検出できるようにしています。
if "error" in data:
error = data["error"]
if error.get("code") == 403 and "quota" in error.get("message", "").lower():
raise Exception("YouTube API quota exceeded")
このような多層的なエラーチェックにより、API固有の挙動に対応できるようになります。
YouTubeService クラスの実装詳細
livechatapi\app\services\youtube.py
import time
from typing import Optional
from app.models.youtube import LiveChatMessageListResponse
from app.config import (
YOUTUBE_API_KEY,
RATE_LIMIT_MAX_RETRIES,
RATE_LIMIT_BASE_DELAY,
RATE_LIMIT_REQUESTS_PER_SECOND,
)
from app.utils.http_client import RateLimitedHTTPClient
import logging
logger = logging.getLogger(__name__)
class YouTubeService:
def __init__(self):
self.client = RateLimitedHTTPClient(
max_retries=RATE_LIMIT_MAX_RETRIES,
base_delay=RATE_LIMIT_BASE_DELAY,
)
self.last_request_time = 0
self.min_interval = 1.0 / RATE_LIMIT_REQUESTS_PER_SECOND
logger.info("🎬 YouTubeService initialized")
def _wait_for_rate_limit(self):
"""リクエスト間隔を制御"""
elapsed = time.time() - self.last_request_time
if elapsed < self.min_interval:
wait_time = self.min_interval - elapsed
logger.debug(f"⏱️ Rate limiting: waiting {wait_time:.2f} seconds")
time.sleep(wait_time)
self.last_request_time = time.time()
def get_live_chat_id(self, video_id: str) -> Optional[str]:
"""ライブチャットIDを取得"""
logger.debug(f"🔍 Fetching live chat ID for video: {video_id}")
self._wait_for_rate_limit()
url = "https://www.googleapis.com/youtube/v3/videos"
params = {
"part": "liveStreamingDetails",
"id": video_id,
"key": YOUTUBE_API_KEY,
}
try:
data = self.client.get_with_retry(url, params)
items = data.get("items", [])
if not items:
logger.warning(f"🚫 No video found for ID: {video_id}")
return None
live_details = items[0].get("liveStreamingDetails", {})
chat_id = live_details.get("activeLiveChatId")
if chat_id:
logger.info(f"✅ Live chat ID retrieved: {chat_id} (video: {video_id})")
else:
logger.warning(f"❌ No active live chat for video: {video_id}")
return chat_id
except Exception as e:
logger.error(f"💥 Failed to get live chat ID for video {video_id}: {e}")
raise
def get_chat_messages(
self, live_chat_id: str, page_token: Optional[str] = None
) -> LiveChatMessageListResponse:
"""ライブチャットメッセージを取得"""
logger.debug(
f"💬 Fetching messages for chat: {live_chat_id}, page_token: {page_token}"
)
self._wait_for_rate_limit()
url = "https://www.googleapis.com/youtube/v3/liveChat/messages"
params = {
"liveChatId": live_chat_id,
"part": "snippet,authorDetails",
"key": YOUTUBE_API_KEY,
}
if page_token:
params["pageToken"] = page_token
try:
data = self.client.get_with_retry(url, params)
response = LiveChatMessageListResponse.model_validate(data)
message_count = len(response.items) if hasattr(response, "items") else 0
logger.info(
f"📨 Retrieved {message_count} chat messages (chat: {live_chat_id})"
)
return response
except Exception as e:
logger.error(f"💥 Failed to get chat messages for {live_chat_id}: {e}")
raise
# サービスのシングルトンインスタンス
youtube_service = YouTubeService()
このサービスは2段階の API 呼び出しによってライブチャットデータを取得し、レート制限処理も行っています。
初期化とレート制限制御
def __init__(self):
self.client = RateLimitedHTTPClient(
max_retries=RATE_LIMIT_MAX_RETRIES,
base_delay=RATE_LIMIT_BASE_DELAY,
)
self.last_request_time = 0
self.min_interval = 1.0 / RATE_LIMIT_REQUESTS_PER_SECOND
コンストラクタでは、先ほどの RateLimitedHTTPClient を組み込み、独自のレート制限機能を実装しています。
min_interval
は設定値から動的に計算し、1秒あたりの最大リクエスト数を制御して、二重のレート制限保護を実現しています。
リクエスト間隔の制御
def _wait_for_rate_limit(self):
elapsed = time.time() - self.last_request_time
if elapsed < self.min_interval:
wait_time = self.min_interval - elapsed
logger.debug(f"⏱️ Rate limiting: waiting {wait_time:.2f} seconds")
time.sleep(wait_time)
self.last_request_time = time.time()
_wait_for_rate_limit
メソッドは、前回のリクエストからの経過時間を計算し、最小間隔に達していない場合は適切な時間だけ待機します。
2段階のライブチャット取得処理
YouTube APIでライブチャットを取得するには、まず動画IDからライブチャットIDを取得し、その後実際のメッセージを取得する必要があります。
def get_live_chat_id(self, video_id: str) -> Optional[str]:
url = "https://www.googleapis.com/youtube/v3/videos"
params = {
"part": "liveStreamingDetails",
"id": video_id,
"key": YOUTUBE_API_KEY,
}
get_live_chat_id
メソッドでは、動画の liveStreamingDetails
から activeLiveChatId
を抽出します。この値が存在しない場合は、その動画がライブ配信ではないか、チャット機能が無効化されていることを意味します。
Pydanticによる型安全なレスポンス処理
data = self.client.get_with_retry(url, params)
response = LiveChatMessageListResponse.model_validate(data)
実際のチャットメッセージ取得では、APIレスポンスを Pydantic モデルで検証・変換しています。これにより、実行時の型安全性を確保し、予期しないデータ構造によるエラーを防止できます。
シングルトンパターンによるリソース管理
youtube_service = YouTubeService()
サービスクラスはシングルトンパターンで実装し、アプリケーション全体で単一のインスタンスを共有しています。これにより、レート制限状態の一貫性を保ち、不要なHTTPセッションの作成を避けます。
YouTube APIの特性を理解し、実用的なレート制限処理と堅牢なエラーハンドリングを組み合わせた、良い感じの実装としています。
フロントエンド(livechatviewer)の技術・実装
採用技術
- TypeScript
- React
- Vite
- axios
- CSSモジュール
実装の特徴
- Reactでコンポーネント化して、チャット表示部分をいい感じに分離しています
- TypeScriptで型安全に実装してるため、APIレスポンスの変更にも強いです
- axiosでAPIから最新チャットをポーリング取得して、画面側に即反映できます
- コメントの表示はCSSでカスタマイズ性を高めました
- モバイル対応も意識してるため、スマホからでも快適に閲覧できます
Reactライブチャットコンポーネントの実装詳細
livechatviewer\src\components\LiveChat.tsx
import React, { useEffect, useRef, useState } from 'react';
import type { ChatMessage, LiveChatMessageItem } from '../types/youtube';
import { extractVideoId, fetchLiveChat, formatTimestamp, validateVideoId } from '../utils/youtube';
import './LiveChat.css';
const LiveChat: React.FC = () => {
const [videoUrl, setVideoUrl] = useState<string>('');
const [videoId, setVideoId] = useState<string | null>(null);
const [messages, setMessages] = useState<ChatMessage[]>([]);
const [isConnected, setIsConnected] = useState<boolean>(false);
const [error, setError] = useState<string | null>(null);
const [pollingInterval, setPollingInterval] = useState<number>(5000);
const [nextPageToken, setNextPageToken] = useState<string | undefined>(undefined);
const [messageCount, setMessageCount] = useState<number>(0);
const intervalRef = useRef<NodeJS.Timeout | null>(null);
const messagesEndRef = useRef<HTMLDivElement>(null);
// メッセージリストの最下部にスクロール
const scrollToBottom = () => {
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
};
useEffect(() => {
scrollToBottom();
}, [messages]);
// 接続処理
const handleConnect = () => {
const extractedVideoId = extractVideoId(videoUrl);
if (!extractedVideoId) {
setError('無効な YouTube URL です。正しい URL を入力してください。');
return;
}
if (!validateVideoId(extractedVideoId)) {
setError('動画 ID の形式が正しくありません。');
return;
}
setVideoId(extractedVideoId);
setError(null);
setMessages([]);
setMessageCount(0);
setNextPageToken(undefined);
setIsConnected(true);
// 初回取得
fetchMessages(extractedVideoId);
// 定期取得を開始
intervalRef.current = setInterval(() => {
fetchMessages(extractedVideoId, nextPageToken);
}, pollingInterval);
};
// チャットメッセージ取得
const fetchMessages = async (id: string, pageToken?: string) => {
try {
const response = await fetchLiveChat(id, pageToken);
// レスポンスデータを変換
const newChatMessages: ChatMessage[] = response.items.map((item: LiveChatMessageItem) => ({
id: item.id,
authorName: item.authorDetails.displayName,
authorPhotoUrl: item.authorDetails.profileImageUrl,
message: item.snippet.displayMessage,
timestamp: item.snippet.publishedAt,
authorChannelId: item.snippet.authorChannelId,
}));
// 新しいメッセージのみを追加(重複排除)
setMessages(prevMessages => {
const existingIds = new Set(prevMessages.map(msg => msg.id));
const uniqueNewMessages = newChatMessages.filter(msg => !existingIds.has(msg.id));
const updatedMessages = [...prevMessages, ...uniqueNewMessages];
// メッセージ数を更新
setMessageCount(updatedMessages.length);
return updatedMessages;
});
// ページトークンとポーリング間隔を更新
setNextPageToken(response.nextPageToken);
if (response.pollingIntervalMillis && response.pollingIntervalMillis !== pollingInterval) {
setPollingInterval(response.pollingIntervalMillis);
// インターバルを再設定
if (intervalRef.current) {
clearInterval(intervalRef.current);
intervalRef.current = setInterval(() => {
fetchMessages(id, response.nextPageToken);
}, response.pollingIntervalMillis);
}
}
} catch (err) {
console.error('チャット取得エラー:', err);
const errorMessage = err instanceof Error ? err.message : 'チャットの取得に失敗しました。';
setError(errorMessage);
}
};
// 切断処理
const handleDisconnect = () => {
if (intervalRef.current) {
clearInterval(intervalRef.current);
intervalRef.current = null;
}
setIsConnected(false);
setVideoId(null);
setMessages([]);
setMessageCount(0);
setNextPageToken(undefined);
setError(null);
};
// コンポーネントのクリーンアップ
useEffect(() => {
return () => {
if (intervalRef.current) {
clearInterval(intervalRef.current);
}
};
}, []);
return (
<div className="live-chat">
<div className="live-chat-header">
<h1>🎥 Youtube Live Chat Viewer</h1>
<div className="connection-controls">
<input
type="text"
value={videoUrl}
onChange={(e) => setVideoUrl(e.target.value)}
placeholder="YouTube URL を入力してください。"
className="url-input"
disabled={isConnected}
/>
{!isConnected ? (
<button
onClick={handleConnect}
className="connect-btn"
disabled={!videoUrl.trim()}
>
接続する
</button>
) : (
<button onClick={handleDisconnect} className="disconnect-btn">
切断する
</button>
)}
</div>
{error && <div className="error-message">{error}</div>}
{isConnected && videoId && (
<div className="status">
<span className="connected-indicator">🟢</span>
<span className="status-text">動画ID: {videoId} に接続中...</span>
<span className="message-count">メッセージ数: {messageCount}</span>
<span className="polling-info">更新間隔: {pollingInterval / 1000}秒</span>
</div>
)}
</div>
<div className="chat-container">
{messages.length === 0 && isConnected && !error && (
<div className="no-messages">
<div className="loading-spinner">⏳</div>
<p>チャットの受信を待機中...</p>
</div>
)}
{messages.map((message) => (
<div key={message.id} className="chat-message">
<img
src={message.authorPhotoUrl}
alt={message.authorName}
className="author-avatar"
onError={(e) => {
// 画像読み込みエラーの場合はデフォルト画像を表示
e.currentTarget.src = '';
}}
/>
<div className="message-content">
<div className="message-header">
<span className="author-name">{message.authorName}</span>
<span className="timestamp">{formatTimestamp(message.timestamp)}</span>
</div>
<div className="message-text">{message.message}</div>
</div>
</div>
))}
<div ref={messagesEndRef} />
</div>
</div>
);
};
export default LiveChat;
このコンポーネントは、URL入力からメッセージ取得と表示まで、ライブチャット表示のひととおりの機能を実装しています。コンポーネント分割など見直したいですね。
状態管理と useRef の活用
const [videoUrl, setVideoUrl] = useState<string>('');
const [messages, setMessages] = useState<ChatMessage[]>([]);
const [isConnected, setIsConnected] = useState<boolean>(false);
const [pollingInterval, setPollingInterval] = useState<number>(5000);
const intervalRef = useRef<NodeJS.Timeout | null>(null);
const messagesEndRef = useRef<HTMLDivElement>(null);
状態管理では、接続状態、メッセージ配列、ポーリング間隔など、ライブチャット機能に必要な要素を分離しています。特に useRef
を活用して、インターバル処理とDOM要素への参照を管理しているところがポイントです。
自動スクロール機能
const scrollToBottom = () => {
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
};
useEffect(() => {
scrollToBottom();
}, [messages]);
新しいメッセージが追加されるたびに自動的に最下部へスクロールする機能を実装しています。scrollIntoView
の smooth
オプションにより、できるだけ自然なスクロールを行ってみました。
接続処理とバリデーション
const handleConnect = () => {
const extractedVideoId = extractVideoId(videoUrl);
if (!extractedVideoId) {
setError('無効な YouTube URL です。正しい URL を入力してください。');
return;
}
if (!validateVideoId(extractedVideoId)) {
setError('動画 ID の形式が正しくありません。');
return;
}
YouTube URL から動画 ID を抽出し、複数段階のバリデーションを実行しています。
チャットメッセージの重複排除
setMessages(prevMessages => {
const existingIds = new Set(prevMessages.map(msg => msg.id));
const uniqueNewMessages = newChatMessages.filter(msg => !existingIds.has(msg.id));
const updatedMessages = [...prevMessages, ...uniqueNewMessages];
setMessageCount(updatedMessages.length);
return updatedMessages;
});
Set
を使用してメッセージの重複を排除しています。この方法により、O(n)の計算量で既存メッセージとの重複チェックを行い、パフォーマンスを確保しています。
動的ポーリングの間隔調整
if (response.pollingIntervalMillis && response.pollingIntervalMillis !== pollingInterval) {
setPollingInterval(response.pollingIntervalMillis);
if (intervalRef.current) {
clearInterval(intervalRef.current);
intervalRef.current = setInterval(() => {
fetchMessages(id, response.nextPageToken);
}, response.pollingIntervalMillis);
}
}
YouTube APIから返されるポーリング間隔に動的に対応する仕組みとし、間隔が変更された場合は既存のインターバルをクリアして、新しい間隔で再設定しています。
メモリリーク対策とクリーンアップ
useEffect(() => {
return () => {
if (intervalRef.current) {
clearInterval(intervalRef.current);
}
};
}, []);
コンポーネントのアンマウント時に適切にインターバルをクリアすることで、メモリリークを防止しています。
おわりに
久しぶりに Python や TypeScript を利用したので環境を作り直したり、Viteなどの設定にハマったりしましたが、GitHub Copilot (Claude Sonnet 4) とのプログラミングは、実際のペアプログラミングのようにお互いの意見や考えをぶつけ合いながら機能の作りこみや品質の向上を進められて楽しかったです。
次回も Youtube Data API 関連をベースにチャット内容の解析など試してみようかなと思います。ローカルのアバターと連動させたりしたら面白いかもしれません。