LoginSignup
1
1

Python: APIなしでYouTubeライブの動画から主要チャット内容を拾う(スパチャ対応)

Last updated at Posted at 2023-08-14

注意・免責

注意:著作権上の問題もありますので利用には著作権者への同意をご自身でお願いします。

免責:私個人は取得されたデータによるいかなる損害も保証しません

概要

動作確認:Python3.11
Python3を使用してYouTubeライブの動画から主要チャットの内容の

  • 元々のJSONファイル
  • プレーンテキストファイル(UTF-8)
    を出力します。なお、テキストファイルには「日付、ユーザー名、チャット内容」のみが抽出されているものが出力されます。

どのような動作をするか

URLからJSONファイルを取得し、pythonのソースコードと同じディレクトリ(フォルダ)に出力し、それから「日付、ユーザー名、チャット内容」が抽出されたテキストファイルも同じディレクトリ(フォルダ)に出力します。

つまり
URLを代入 → JSONファイル出力 → テキストファイル出力

前準備

必要なライブラリは以下のとおりです。インストール済みの方はそのままで結構です。

pip install youtube_dl
pip install json

ソースコード

絶対時間(配信日時に合わせたコメント時間)での出力

ソースコード内でURLを指定してソースコードと同じディレクトリに出力

詳細はコメントに記入してありますが、“url”変数にチャットを取得したいYouTubeライブのURLを代入します。

YouTube_Chat_to_Text.py
from yt_dlp import YoutubeDL
import json
import os
from datetime import datetime, timedelta

# YouTube動画のURL
url = '<ここにURLを記入>'

# YouTube動画のURLを入れ忘れた場合
if url == '<ここにURLを記入>':
    print("合致するYouTubeのURLを記入してください")
    exit()

# 動画IDの取得
video_id = url.split('=')[1].split('&')[0]

# チャットJSONファイルの取得
ydl_video_opts = {
    'outtmpl': f'{video_id}_.mp4',
    'format': 'best',
    'writesubtitles': True,
    'skip_download': True
}

with YoutubeDL(ydl_video_opts) as ydl:
    result = ydl.download([url])

# 生成されたJSONファイルから可読できるテキストファイルの生成input_file_name = f'{video_id}_.live_chat.json'
output_file_name = os.path.splitext(input_file_name)[0] + '.txt'

with open(input_file_name, 'r', encoding='utf-8') as infile, \
     open(output_file_name, 'w', encoding='utf-8') as outfile:

    lines = infile.readlines()

    for line in lines:
        data = json.loads(line)
        action = data["replayChatItemAction"]["actions"][0]

        if "addChatItemAction" in action:
            item = action["addChatItemAction"]["item"]

            if "liveChatTextMessageRenderer" in item:
                message_items = item["liveChatTextMessageRenderer"]["message"]["runs"]
                message = ''
                for element in message_items:
                    if "text" in element:
                        if "navigationEndpoint" in element:
                            url_endpoint = element["navigationEndpoint"]["urlEndpoint"]["url"]
                            query_index = url_endpoint.find('q=')
                            if query_index != -1:
                                target_url = urllib.parse.unquote(url_endpoint[query_index + 2:])
                                message += target_url
                            else:
                                message += element["text"]
                        else:
                            message += element["text"]
                    elif "emoji" in element:
                        message += element["emoji"]["image"]["accessibility"]["accessibilityData"]["label"]
                
                author_name = item["liveChatTextMessageRenderer"]["authorName"]["simpleText"]
                timestamp_usec = item["liveChatTextMessageRenderer"]["timestampUsec"]
                timestamp = datetime(1970, 1, 1) + timedelta(microseconds=int(timestamp_usec))
                timestamp_jst = timestamp + timedelta(hours=9)
                formatted_timestamp = timestamp_jst.strftime("%Y-%m-%d %H:%M:%S")
                outfile.write(f"{formatted_timestamp}: {author_name}: {message}\n")

            # 有料メッセージ処理
            elif "liveChatPaidMessageRenderer" in item:
                purchase_amount = item["liveChatPaidMessageRenderer"]["purchaseAmountText"]["simpleText"]
                message = ""

                # messageキーの確認
                if "message" in item["liveChatPaidMessageRenderer"]:
                    message = item["liveChatPaidMessageRenderer"]["message"]["runs"][0]["text"]

                message += f"(purchase:{purchase_amount}"
                author_name = item["liveChatPaidMessageRenderer"]["authorName"]["simpleText"]
                timestamp_usec = item["liveChatPaidMessageRenderer"]["timestampUsec"]
                timestamp = datetime(1970, 1, 1) + timedelta(microseconds=int(timestamp_usec))
                timestamp_jst = timestamp + timedelta(hours=9)
                formatted_timestamp = timestamp_jst.strftime("%Y-%m-%d %H:%M:%S")
                outfile.write(f"{formatted_timestamp}: {author_name}: {message}\n")

クリップボードからURLを取得してソースコードと同じディレクトリに出力

YouTube_Chat_to_Text_Clip-abs.py
from yt_dlp import YoutubeDL
import pyperclip
import json
import re
import os
from datetime import datetime, timedelta
import urllib.parse

# YouTube動画のURLをクリップボードを取得
url = pyperclip.paste()

if not url:
    print("URLを記入してください")
    exit()
    
youtube_pattern = r'(https?://)?(www\.)?(youtube\.com|youtu\.?be)/[\w-]+'
if re.match(youtube_pattern, url):
    print("YouTubeのURLが確認されました")
else:
    print("合致するYouTubeのURLを記入してください")
    exit()
    
# 動画IDを取得
video_id = url.split('=')[1].split('&')[0]

# JSON取り出し
ydl_video_opts = {
    'outtmpl': f'{video_id}_.mp4',
    'format': 'best',
    'writesubtitles': True,
    'skip_download': True
}

with YoutubeDL(ydl_video_opts) as ydl:
    result = ydl.download([url])

# JSON -> txt
input_file_name = f'{video_id}_.live_chat.json'
output_file_name = os.path.splitext(input_file_name)[0] + '.txt'

with open(input_file_name, 'r', encoding='utf-8') as infile, \
     open(output_file_name, 'w', encoding='utf-8') as outfile:

    lines = infile.readlines()

    for line in lines:
        data = json.loads(line)
        action = data["replayChatItemAction"]["actions"][0]

        if "addChatItemAction" in action:
            item = action["addChatItemAction"]["item"]

            if "liveChatTextMessageRenderer" in item:
                message_items = item["liveChatTextMessageRenderer"]["message"]["runs"]
                message = ''
                for element in message_items:
                    if "text" in element:
                        if "navigationEndpoint" in element:
                            url_endpoint = element["navigationEndpoint"]["urlEndpoint"]["url"]
                            query_index = url_endpoint.find('q=')
                            if query_index != -1:
                                target_url = urllib.parse.unquote(url_endpoint[query_index + 2:])
                                message += target_url
                            else:
                                message += element["text"]
                        else:
                            message += element["text"]
                    elif "emoji" in element:
                        message += element["emoji"]["image"]["accessibility"]["accessibilityData"]["label"]
                
                author_name = item["liveChatTextMessageRenderer"]["authorName"]["simpleText"]
                timestamp_usec = item["liveChatTextMessageRenderer"]["timestampUsec"]
                timestamp = datetime(1970, 1, 1) + timedelta(microseconds=int(timestamp_usec))
                timestamp_jst = timestamp + timedelta(hours=9)
                formatted_timestamp = timestamp_jst.strftime("%Y-%m-%d %H:%M:%S")
                outfile.write(f"{formatted_timestamp}: {author_name}: {message}\n")

            # 有料メッセージの処理
            elif "liveChatPaidMessageRenderer" in item:
                purchase_amount = item["liveChatPaidMessageRenderer"]["purchaseAmountText"]["simpleText"]
                message = ""

                # 'message' キーの存在を確認
                if "message" in item["liveChatPaidMessageRenderer"]:
                    message = item["liveChatPaidMessageRenderer"]["message"]["runs"][0]["text"]

                message += f"(purchase:{purchase_amount}"
                author_name = item["liveChatPaidMessageRenderer"]["authorName"]["simpleText"]
                timestamp_usec = item["liveChatPaidMessageRenderer"]["timestampUsec"]
                timestamp = datetime(1970, 1, 1) + timedelta(microseconds=int(timestamp_usec))
                timestamp_jst = timestamp + timedelta(hours=9)
                formatted_timestamp = timestamp_jst.strftime("%Y-%m-%d %H:%M:%S")
                outfile.write(f"{formatted_timestamp}: {author_name}: {message}\n")

相対時間(配信日時を00:00:00と起算)での出力(誤作動あり。考案中)

クリップボードからURLを取得してソースコードと同じディレクトリに出力

YouTube_Chat_to_Text_Clip-rel.py
from yt_dlp import YoutubeDL
import pyperclip
import json
import re
import os
from datetime import datetime, timedelta
import urllib.parse

def process_message(renderer):
    message_items = renderer["message"]["runs"]
    message = ''
    for element in message_items:
        if "text" in element:
            message += element["text"]
        elif "emoji" in element:
            message += element["emoji"]["image"]["accessibility"]["accessibilityData"]["label"]
    return message

def process_timestamp(renderer, video_start_usec):
    timestamp_usec = int(renderer["timestampUsec"])
    timestamp_delta = timedelta(microseconds=(timestamp_usec - video_start_usec))
    hours, remainder = divmod(timestamp_delta.total_seconds(), 3600)
    minutes, seconds = divmod(remainder, 60)
    formatted_timestamp = f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}"
    return formatted_timestamp

# クリップボードからURLを取得
url = pyperclip.paste()

if not url:
    print("合致するYouTubeのURLを記入してください")
    exit()

youtube_pattern = r'(https?://)?(www\.)?(youtube\.com|youtu\.?be)/[\w-]+'
if re.match(youtube_pattern, url):
    print("A YouTube URL has been confirmed.")
else:
    print("Please provide a valid YouTube URL.")
    exit()

# 動画IDを取得
video_id = url.split('=')[1].split('&')[0]

# JSON取得
ydl_video_opts = {
    'outtmpl': f'{video_id}_.mp4',
    'format': 'best',
    'writesubtitles': True,
    'skip_download': True
}

with YoutubeDL(ydl_video_opts) as ydl:
    result = ydl.download([url])

# JSON -> txt
input_file_name = f'{video_id}_.live_chat.json'
output_file_name = os.path.splitext(input_file_name)[0] + '.txt'

# 時間定義
video_start_usec = None

with open(input_file_name, 'r', encoding='utf-8') as infile, \
     open(output_file_name, 'w', encoding='utf-8') as outfile:

    lines = infile.readlines()

    for line in lines:
        data = json.loads(line)
        action = data["replayChatItemAction"]["actions"][0]

        if "addChatItemAction" in action:
            item = action["addChatItemAction"]["item"]

            if video_start_usec is None and "liveChatTextMessageRenderer" in item:
                video_start_usec = int(item["liveChatTextMessageRenderer"]["timestampUsec"])

            if "liveChatTextMessageRenderer" in item:
                message = process_message(item["liveChatTextMessageRenderer"])
                timestamp = process_timestamp(item["liveChatTextMessageRenderer"], video_start_usec)
                outfile.write(f"{timestamp}: {message}\n")

以上です。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1