1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【React+FastAPI】YouTube Live Chat をリアルタイム表示するWebアプリを作った🎬💬

Posted at

はじめに

趣味作業中は配信を視聴しながらが多いのですが、ふとモニターを見たときに Youtube のライブチャット欄が見づらいなと思っていました。

ライブチャット関連のアプリはいろいろと存在するようですが、Python と TypeScript の学習もしたかったので、GitHub Copilot のエージェントと一緒に YouTube のライブチャットだけをリアルタイムに表示する Web アプリを作ってみました。

バックエンドが Python + FastAPI、フロントエンドが TypeScript + React という今風?な構成にしています。

つくったもの

WS000002.JPG

Youtube の配信URLを指定することで、ライブチャットをリアルタイム表示します。

バックエンド LiveChat API
https://github.com/kerobot/livechatapi

フロントエンド LiveChat Viewer
https://github.com/kerobot/livechatviewer

主な機能

  • YouTube動画IDを入力してライブチャット取得
  • リアルタイムでメッセージを表示
  • ユーザーアバター・バッジ表示
  • 自動ポーリング(APIが推奨する間隔で更新)
  • エラーハンドリング

バックエンド(livechatapi)の技術・実装

採用技術

  • Python 3
  • FastAPI
  • 非同期処理(async/await)
  • Pydantic モデル
  • YouTube Data API

実装の特徴

  • FastAPI で API サーバーを立てて、YouTube Data API からライブチャットのメッセージを定期的に取得してます
  • 非同期でリクエストを投げることで、複数ライブや大量コメントもサクサク捌けます
  • チャット取得は一定間隔でポーリングしてます。WebSocket はできれば今後対応したいです
  • レスポンスは JSON で、フロント側が扱いやすいように整形してます
  • シンプル設計なので、他サービスのチャット API とかにも応用しやすいと思います

レート制限対応HTTPクライアントの実装

livechatapi\app\utils\http_client.py

import time
import random
import requests
from typing import Optional, Dict, Any
from app.config import ENVIRONMENT
import logging

logger = logging.getLogger(__name__)


class RateLimitedHTTPClient:
    """レート制限対応のHTTPクライアント"""

    def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.session = requests.Session()

        # 環境別User-Agent設定
        user_agent = f"LiveChatAPI/1.0 ({ENVIRONMENT})"
        if ENVIRONMENT == "production":
            user_agent += " (Production)"

        self.session.headers.update({"User-Agent": user_agent})

    def get_with_retry(self, url: str, params: Optional[Dict] = None) -> Dict[Any, Any]:
        """指数バックオフとジッターを使ったリトライ機能付きGET"""
        for attempt in range(self.max_retries + 1):
            try:
                # 環境別タイムアウト設定
                timeout = 30 if ENVIRONMENT == "production" else 10
                response = self.session.get(url, params=params, timeout=timeout)

                # レート制限チェック
                if response.status_code == 429:
                    if attempt == self.max_retries:
                        raise Exception("Rate limit exceeded after all retries")

                    retry_after = response.headers.get("Retry-After")
                    if retry_after:
                        delay = int(retry_after)
                    else:
                        delay = self.base_delay * (2**attempt) + random.uniform(0, 1)

                    logger.warning(
                        f"Rate limited. Retrying in {delay:.2f} seconds... (attempt {attempt + 1})"
                    )
                    time.sleep(delay)
                    continue

                response.raise_for_status()
                data = response.json()

                # YouTube APIエラーチェック
                if "error" in data:
                    error = data["error"]
                    if (
                        error.get("code") == 403
                        and "quota" in error.get("message", "").lower()
                    ):
                        raise Exception("YouTube API quota exceeded")
                    raise Exception(f"YouTube API Error: {error.get('message')}")

                logger.debug(f"Request successful: {url}")
                return data

            except requests.RequestException as e:
                if attempt == self.max_retries:
                    raise Exception(
                        f"Request failed after {self.max_retries} retries: {e}"
                    )

                delay = self.base_delay * (2**attempt) + random.uniform(0, 1)
                logger.warning(
                    f"Request failed (attempt {attempt + 1}). Retrying in {delay:.2f} seconds..."
                )
                time.sleep(delay)

        raise Exception("Unexpected error in retry logic")

    def close(self):
        """セッションを閉じる"""
        self.session.close()

YouTube Data APIのような外部APIを利用する場合、レート制限を考慮する必要がありますので、今回の実装では、指数バックオフとジッターを組み合わせた HTTP クライアントを作成してみました。

環境に応じた初期化や設定切り替え

user_agent = f"LiveChatAPI/1.0 ({ENVIRONMENT})"
if ENVIRONMENT == "production":
    user_agent += " (Production)"

クライアント初期化時に環境変数に応じて User-Agent を動的に設定しています。API 提供側でトラフィックの分析や問題を切り分けやすくするためです。

本番環境では明示的に "Production" を付与することで、開発・テスト環境と区別できるようにしています。

また、環境に応じてタイムアウト時間を切り替え、開発効率と本番安定性のバランスをとっています。

timeout = 30 if ENVIRONMENT == "production" else 10

HTTP 429 に対するリトライ制御

HTTP 429(Too Many Requests)エラーに対する処理は特に重要なので、サーバーが Retry-After ヘッダーを提供する場合はその値に従い、そうでない場合は指数バックオフ base_delay * (2**attempt) にジッター random.uniform(0, 1) を加算しています。

if retry_after:
    delay = int(retry_after)
else:
    delay = self.base_delay * (2**attempt) + random.uniform(0, 1)

ジッターを加算することで、複数のクライアントが同時にリトライする際の「サンダリング・ハード問題」を回避します。

参考:サンダリング・ハード問題

YouTube API固有のエラー制御

YouTube API は HTTP 200 ステータスでもレスポンスボディにエラー情報を含む場合があります。クォータ制限(403エラー)があるため、これを検出できるようにしています。

if "error" in data:
    error = data["error"]
    if error.get("code") == 403 and "quota" in error.get("message", "").lower():
        raise Exception("YouTube API quota exceeded")

このような多層的なエラーチェックにより、API固有の挙動に対応できるようになります。

YouTubeService クラスの実装詳細

livechatapi\app\services\youtube.py

import time
from typing import Optional
from app.models.youtube import LiveChatMessageListResponse
from app.config import (
    YOUTUBE_API_KEY,
    RATE_LIMIT_MAX_RETRIES,
    RATE_LIMIT_BASE_DELAY,
    RATE_LIMIT_REQUESTS_PER_SECOND,
)
from app.utils.http_client import RateLimitedHTTPClient
import logging

logger = logging.getLogger(__name__)


class YouTubeService:
    def __init__(self):
        self.client = RateLimitedHTTPClient(
            max_retries=RATE_LIMIT_MAX_RETRIES,
            base_delay=RATE_LIMIT_BASE_DELAY,
        )
        self.last_request_time = 0
        self.min_interval = 1.0 / RATE_LIMIT_REQUESTS_PER_SECOND
        logger.info("🎬 YouTubeService initialized")

    def _wait_for_rate_limit(self):
        """リクエスト間隔を制御"""
        elapsed = time.time() - self.last_request_time
        if elapsed < self.min_interval:
            wait_time = self.min_interval - elapsed
            logger.debug(f"⏱️ Rate limiting: waiting {wait_time:.2f} seconds")
            time.sleep(wait_time)
        self.last_request_time = time.time()

    def get_live_chat_id(self, video_id: str) -> Optional[str]:
        """ライブチャットIDを取得"""
        logger.debug(f"🔍 Fetching live chat ID for video: {video_id}")
        self._wait_for_rate_limit()

        url = "https://www.googleapis.com/youtube/v3/videos"
        params = {
            "part": "liveStreamingDetails",
            "id": video_id,
            "key": YOUTUBE_API_KEY,
        }

        try:
            data = self.client.get_with_retry(url, params)
            items = data.get("items", [])

            if not items:
                logger.warning(f"🚫 No video found for ID: {video_id}")
                return None

            live_details = items[0].get("liveStreamingDetails", {})
            chat_id = live_details.get("activeLiveChatId")

            if chat_id:
                logger.info(f"✅ Live chat ID retrieved: {chat_id} (video: {video_id})")
            else:
                logger.warning(f"❌ No active live chat for video: {video_id}")

            return chat_id

        except Exception as e:
            logger.error(f"💥 Failed to get live chat ID for video {video_id}: {e}")
            raise

    def get_chat_messages(
        self, live_chat_id: str, page_token: Optional[str] = None
    ) -> LiveChatMessageListResponse:
        """ライブチャットメッセージを取得"""
        logger.debug(
            f"💬 Fetching messages for chat: {live_chat_id}, page_token: {page_token}"
        )
        self._wait_for_rate_limit()

        url = "https://www.googleapis.com/youtube/v3/liveChat/messages"
        params = {
            "liveChatId": live_chat_id,
            "part": "snippet,authorDetails",
            "key": YOUTUBE_API_KEY,
        }

        if page_token:
            params["pageToken"] = page_token

        try:
            data = self.client.get_with_retry(url, params)
            response = LiveChatMessageListResponse.model_validate(data)

            message_count = len(response.items) if hasattr(response, "items") else 0
            logger.info(
                f"📨 Retrieved {message_count} chat messages (chat: {live_chat_id})"
            )

            return response

        except Exception as e:
            logger.error(f"💥 Failed to get chat messages for {live_chat_id}: {e}")
            raise


# サービスのシングルトンインスタンス
youtube_service = YouTubeService()

このサービスは2段階の API 呼び出しによってライブチャットデータを取得し、レート制限処理も行っています。

初期化とレート制限制御

def __init__(self):
    self.client = RateLimitedHTTPClient(
        max_retries=RATE_LIMIT_MAX_RETRIES,
        base_delay=RATE_LIMIT_BASE_DELAY,
    )
    self.last_request_time = 0
    self.min_interval = 1.0 / RATE_LIMIT_REQUESTS_PER_SECOND

コンストラクタでは、先ほどの RateLimitedHTTPClient を組み込み、独自のレート制限機能を実装しています。
min_interval は設定値から動的に計算し、1秒あたりの最大リクエスト数を制御して、二重のレート制限保護を実現しています。

リクエスト間隔の制御

def _wait_for_rate_limit(self):
    elapsed = time.time() - self.last_request_time
    if elapsed < self.min_interval:
        wait_time = self.min_interval - elapsed
        logger.debug(f"⏱️ Rate limiting: waiting {wait_time:.2f} seconds")
        time.sleep(wait_time)
    self.last_request_time = time.time()

_wait_for_rate_limit メソッドは、前回のリクエストからの経過時間を計算し、最小間隔に達していない場合は適切な時間だけ待機します。

2段階のライブチャット取得処理

YouTube APIでライブチャットを取得するには、まず動画IDからライブチャットIDを取得し、その後実際のメッセージを取得する必要があります。

def get_live_chat_id(self, video_id: str) -> Optional[str]:
    url = "https://www.googleapis.com/youtube/v3/videos"
    params = {
        "part": "liveStreamingDetails",
        "id": video_id,
        "key": YOUTUBE_API_KEY,
    }

get_live_chat_id メソッドでは、動画の liveStreamingDetails から activeLiveChatId を抽出します。この値が存在しない場合は、その動画がライブ配信ではないか、チャット機能が無効化されていることを意味します。

Pydanticによる型安全なレスポンス処理

data = self.client.get_with_retry(url, params)
response = LiveChatMessageListResponse.model_validate(data)

実際のチャットメッセージ取得では、APIレスポンスを Pydantic モデルで検証・変換しています。これにより、実行時の型安全性を確保し、予期しないデータ構造によるエラーを防止できます。

シングルトンパターンによるリソース管理

youtube_service = YouTubeService()

サービスクラスはシングルトンパターンで実装し、アプリケーション全体で単一のインスタンスを共有しています。これにより、レート制限状態の一貫性を保ち、不要なHTTPセッションの作成を避けます。

YouTube APIの特性を理解し、実用的なレート制限処理と堅牢なエラーハンドリングを組み合わせた、良い感じの実装としています。

フロントエンド(livechatviewer)の技術・実装

採用技術

  • TypeScript
  • React
  • Vite
  • axios
  • CSSモジュール

実装の特徴

  • Reactでコンポーネント化して、チャット表示部分をいい感じに分離しています
  • TypeScriptで型安全に実装してるため、APIレスポンスの変更にも強いです
  • axiosでAPIから最新チャットをポーリング取得して、画面側に即反映できます
  • コメントの表示はCSSでカスタマイズ性を高めました
  • モバイル対応も意識してるため、スマホからでも快適に閲覧できます

Reactライブチャットコンポーネントの実装詳細

livechatviewer\src\components\LiveChat.tsx

import React, { useEffect, useRef, useState } from 'react';
import type { ChatMessage, LiveChatMessageItem } from '../types/youtube';
import { extractVideoId, fetchLiveChat, formatTimestamp, validateVideoId } from '../utils/youtube';
import './LiveChat.css';

const LiveChat: React.FC = () => {
  const [videoUrl, setVideoUrl] = useState<string>('');
  const [videoId, setVideoId] = useState<string | null>(null);
  const [messages, setMessages] = useState<ChatMessage[]>([]);
  const [isConnected, setIsConnected] = useState<boolean>(false);
  const [error, setError] = useState<string | null>(null);
  const [pollingInterval, setPollingInterval] = useState<number>(5000);
  const [nextPageToken, setNextPageToken] = useState<string | undefined>(undefined);
  const [messageCount, setMessageCount] = useState<number>(0);

  const intervalRef = useRef<NodeJS.Timeout | null>(null);
  const messagesEndRef = useRef<HTMLDivElement>(null);

  // メッセージリストの最下部にスクロール
  const scrollToBottom = () => {
    messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
  };

  useEffect(() => {
    scrollToBottom();
  }, [messages]);

  // 接続処理
  const handleConnect = () => {
    const extractedVideoId = extractVideoId(videoUrl);
    
    if (!extractedVideoId) {
      setError('無効な YouTube URL です。正しい URL を入力してください。');
      return;
    }

    if (!validateVideoId(extractedVideoId)) {
      setError('動画 ID の形式が正しくありません。');
      return;
    }
    
    setVideoId(extractedVideoId);
    setError(null);
    setMessages([]);
    setMessageCount(0);
    setNextPageToken(undefined);
    setIsConnected(true);
    
    // 初回取得
    fetchMessages(extractedVideoId);
    
    // 定期取得を開始
    intervalRef.current = setInterval(() => {
      fetchMessages(extractedVideoId, nextPageToken);
    }, pollingInterval);
  };

  // チャットメッセージ取得
  const fetchMessages = async (id: string, pageToken?: string) => {
    try {
      const response = await fetchLiveChat(id, pageToken);
      
      // レスポンスデータを変換
      const newChatMessages: ChatMessage[] = response.items.map((item: LiveChatMessageItem) => ({
        id: item.id,
        authorName: item.authorDetails.displayName,
        authorPhotoUrl: item.authorDetails.profileImageUrl,
        message: item.snippet.displayMessage,
        timestamp: item.snippet.publishedAt,
        authorChannelId: item.snippet.authorChannelId,
      }));

      // 新しいメッセージのみを追加(重複排除)
      setMessages(prevMessages => {
        const existingIds = new Set(prevMessages.map(msg => msg.id));
        const uniqueNewMessages = newChatMessages.filter(msg => !existingIds.has(msg.id));
        const updatedMessages = [...prevMessages, ...uniqueNewMessages];
        
        // メッセージ数を更新
        setMessageCount(updatedMessages.length);
        
        return updatedMessages;
      });
      
      // ページトークンとポーリング間隔を更新
      setNextPageToken(response.nextPageToken);
      if (response.pollingIntervalMillis && response.pollingIntervalMillis !== pollingInterval) {
        setPollingInterval(response.pollingIntervalMillis);
        
        // インターバルを再設定
        if (intervalRef.current) {
          clearInterval(intervalRef.current);
          intervalRef.current = setInterval(() => {
            fetchMessages(id, response.nextPageToken);
          }, response.pollingIntervalMillis);
        }
      }
      
    } catch (err) {
      console.error('チャット取得エラー:', err);
      const errorMessage = err instanceof Error ? err.message : 'チャットの取得に失敗しました。';
      setError(errorMessage);
    }
  };

  // 切断処理
  const handleDisconnect = () => {
    if (intervalRef.current) {
      clearInterval(intervalRef.current);
      intervalRef.current = null;
    }
    setIsConnected(false);
    setVideoId(null);
    setMessages([]);
    setMessageCount(0);
    setNextPageToken(undefined);
    setError(null);
  };

  // コンポーネントのクリーンアップ
  useEffect(() => {
    return () => {
      if (intervalRef.current) {
        clearInterval(intervalRef.current);
      }
    };
  }, []);

  return (
    <div className="live-chat">
      <div className="live-chat-header">
        <h1>🎥 Youtube Live Chat Viewer</h1>
        
        <div className="connection-controls">
          <input
            type="text"
            value={videoUrl}
            onChange={(e) => setVideoUrl(e.target.value)}
            placeholder="YouTube URL を入力してください。"
            className="url-input"
            disabled={isConnected}
          />
          
          {!isConnected ? (
            <button 
              onClick={handleConnect} 
              className="connect-btn"
              disabled={!videoUrl.trim()}
            >
              接続する
            </button>
          ) : (
            <button onClick={handleDisconnect} className="disconnect-btn">
              切断する
            </button>
          )}
        </div>
        
        {error && <div className="error-message">{error}</div>}
        
        {isConnected && videoId && (
          <div className="status">
            <span className="connected-indicator">🟢</span>
            <span className="status-text">動画ID: {videoId} に接続中...</span>
            <span className="message-count">メッセージ数: {messageCount}</span>
            <span className="polling-info">更新間隔: {pollingInterval / 1000}</span>
          </div>
        )}
      </div>

      <div className="chat-container">
        {messages.length === 0 && isConnected && !error && (
          <div className="no-messages">
            <div className="loading-spinner"></div>
            <p>チャットの受信を待機中...</p>
          </div>
        )}
        
        {messages.map((message) => (
          <div key={message.id} className="chat-message">
            <img 
              src={message.authorPhotoUrl} 
              alt={message.authorName}
              className="author-avatar"
              onError={(e) => {
                // 画像読み込みエラーの場合はデフォルト画像を表示
                e.currentTarget.src = '';
              }}
            />
            <div className="message-content">
              <div className="message-header">
                <span className="author-name">{message.authorName}</span>
                <span className="timestamp">{formatTimestamp(message.timestamp)}</span>
              </div>
              <div className="message-text">{message.message}</div>
            </div>
          </div>
        ))}
        
        <div ref={messagesEndRef} />
      </div>
    </div>
  );
};

export default LiveChat;

このコンポーネントは、URL入力からメッセージ取得と表示まで、ライブチャット表示のひととおりの機能を実装しています。コンポーネント分割など見直したいですね。

状態管理と useRef の活用

const [videoUrl, setVideoUrl] = useState<string>('');
const [messages, setMessages] = useState<ChatMessage[]>([]);
const [isConnected, setIsConnected] = useState<boolean>(false);
const [pollingInterval, setPollingInterval] = useState<number>(5000);

const intervalRef = useRef<NodeJS.Timeout | null>(null);
const messagesEndRef = useRef<HTMLDivElement>(null);

状態管理では、接続状態、メッセージ配列、ポーリング間隔など、ライブチャット機能に必要な要素を分離しています。特に useRef を活用して、インターバル処理とDOM要素への参照を管理しているところがポイントです。

自動スクロール機能

const scrollToBottom = () => {
  messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
};

useEffect(() => {
  scrollToBottom();
}, [messages]);

新しいメッセージが追加されるたびに自動的に最下部へスクロールする機能を実装しています。scrollIntoViewsmooth オプションにより、できるだけ自然なスクロールを行ってみました。

接続処理とバリデーション

const handleConnect = () => {
  const extractedVideoId = extractVideoId(videoUrl);
  
  if (!extractedVideoId) {
    setError('無効な YouTube URL です。正しい URL を入力してください。');
    return;
  }

  if (!validateVideoId(extractedVideoId)) {
    setError('動画 ID の形式が正しくありません。');
    return;
  }

YouTube URL から動画 ID を抽出し、複数段階のバリデーションを実行しています。

チャットメッセージの重複排除

setMessages(prevMessages => {
  const existingIds = new Set(prevMessages.map(msg => msg.id));
  const uniqueNewMessages = newChatMessages.filter(msg => !existingIds.has(msg.id));
  const updatedMessages = [...prevMessages, ...uniqueNewMessages];
  
  setMessageCount(updatedMessages.length);
  return updatedMessages;
});

Set を使用してメッセージの重複を排除しています。この方法により、O(n)の計算量で既存メッセージとの重複チェックを行い、パフォーマンスを確保しています。

動的ポーリングの間隔調整

if (response.pollingIntervalMillis && response.pollingIntervalMillis !== pollingInterval) {
  setPollingInterval(response.pollingIntervalMillis);
  
  if (intervalRef.current) {
    clearInterval(intervalRef.current);
    intervalRef.current = setInterval(() => {
      fetchMessages(id, response.nextPageToken);
    }, response.pollingIntervalMillis);
  }
}

YouTube APIから返されるポーリング間隔に動的に対応する仕組みとし、間隔が変更された場合は既存のインターバルをクリアして、新しい間隔で再設定しています。

メモリリーク対策とクリーンアップ

useEffect(() => {
  return () => {
    if (intervalRef.current) {
      clearInterval(intervalRef.current);
    }
  };
}, []);

コンポーネントのアンマウント時に適切にインターバルをクリアすることで、メモリリークを防止しています。

おわりに

久しぶりに Python や TypeScript を利用したので環境を作り直したり、Viteなどの設定にハマったりしましたが、GitHub Copilot (Claude Sonnet 4) とのプログラミングは、実際のペアプログラミングのようにお互いの意見や考えをぶつけ合いながら機能の作りこみや品質の向上を進められて楽しかったです。

次回も Youtube Data API 関連をベースにチャット内容の解析など試してみようかなと思います。ローカルのアバターと連動させたりしたら面白いかもしれません。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?