生配信の切り抜きAIを作ってみたい。(現在進行系)
こんにちは、Webサービスを開発中のrkですX: @e2eXszEiBC32827。
切り抜きAIについて
最近、Youtube上でshort動画の需要が増え、YouTubeやTwitchの生配信をしている方は、自分の配信を切り抜いて編集し、shortの方にも上げたいですよね。そこで、生配信のURLを入力するだけで「切り抜きポイント」を自動で探し出し、切り抜き動画を作成するツールを開発しようとしています。名前はシンプルに「切り抜きAI」です。
とりあえずの目標
・切り抜き素材をしっかり加工する(無音部分でカットし、文字起こしやコメントがしっかりアノテーションされた状態)。
⇓
・Azure MLにデータを渡して編集され、一つの動画に。
⇓
・二人以上の喋っている動画でも話者識別をして、実装する。
なぜ作ろうとしたのか
生配信中にコメントが急増する瞬間、それは何か面白いことが起きているサイン。そういった瞬間を自動で見つけられたら、見逃した生配信のハイライトを簡単にチェックできます。また、動画編集の負担も軽減できると考えました。
どうやって作る予定?
まずは、生配信のコメントを収集して、コメントの盛り上がり具合からそのシーンの時間、配信者の喋った内容の文字起こし、コメントの内容を保存します(過程1)。その後、Azure Machine Learningを使って、それらのポイントを基に切り抜き動画を自動生成するモデルを開発(過程2)。最終的には、テロップやサムネイル、適切なタイトルをAIが提案してくれるようにする予定です(過程3)。
データの用意(過程1)
コメントの盛り上がり具合を測定するコード
今回、自動でコメントの盛り上がり具合を測定するgoogle colab上で動くプログラムを作りました。なにか不具合があったり、使い方がわからない場合はコメント頂けるとありがたいです。
from chat_downloader import ChatDownloader
import matplotlib.pyplot as plt
def chat_count(lists, interval):
"""区間ごとにリスト内の要素数をカウントする"""
max_time = max(lists) + 1
counts_length = int(max_time // interval) + 1
counts = [0] * counts_length
for time in lists:
index = int(time // interval)
counts[index] += 1
return counts
def calculate_increase_rate(counts):
"""各区間におけるコメントの増加率を計算する"""
increase_rates = [0] # 最初の区間の増加率は計算できないため0を設定
for i in range(1, len(counts)):
if counts[i-1] == 0: # 前の区間のコメント数が0の場合は増加率を計算しない
increase_rate = 0
else:
increase_rate = (counts[i] - counts[i-1]) / counts[i-1]
increase_rates.append(increase_rate)
return increase_rates
def main():
url = "https://www.youtube.com/watch?v=YyfxRRTr7zo" # 分析するYouTube動画のURL
start = '00:00:00' # 読み込みを開始する秒数(HH:MM:SS形式)
end = '00:40:00' # 読み込みを終了する秒数(HH:MM:SS形式)
interval = 20 # 秒数での区間
chat = ChatDownloader().get_chat(url, start_time=start, end_time=end) # チャットデータをダウンロード
chat_time = [] # 各コメントの投稿時間(秒)を格納するリスト
for message in chat:
time_in_seconds = message.get('time_in_seconds')
if time_in_seconds is not None:
chat_time.append(time_in_seconds)
message_count = chat_count(chat_time, interval)
increase_rates = calculate_increase_rate(message_count)
plt.figure(figsize=(12, 6))
time_stamps = [i * interval for i in range(len(increase_rates))]
plt.plot(time_stamps, increase_rates, marker='o', linestyle='-', color='red', label='Increase Rate')
plt.title('Comment Increase Rate Over Time')
plt.xlabel('Time (seconds)')
plt.ylabel('Increase Rate')
plt.grid(True)
plt.legend()
plt.show()
# 増加率の閾値を設定(例として0.5)
threshold = 0.5
threshold_intervals = []
for i, rate in enumerate(increase_rates):
if rate > threshold:
start_time = i * interval
end_time = start_time + interval
threshold_intervals.append((start_time, end_time))
print(f"Increase rate exceeds {threshold} between {start_time} and {end_time} seconds.")
threshold_intervals = merge_intervals(threshold_intervals)
# マージされた区間を出力
for start_time, end_time in threshold_intervals:
print(f"Merged segment from {start_time} to {end_time} seconds.")
def merge_intervals(intervals):
"""隣接するまたは重複する区間をマージする"""
if not intervals:
return []
merged = [intervals[0]]
for current_start, current_end in intervals[1:]:
last_end = merged[-1][1]
if current_start <= last_end + 20: # 20秒以内に隣接する区間はマージ
merged[-1] = (merged[-1][0], max(last_end, current_end)) # 終了時間を更新
else:
merged.append((current_start, current_end))
return merged
if __name__ == '__main__':
main()
備考
今回は、動画を20秒の区間に区切り、ある区間のコメント量が前の区間のコメントよりどれだけ多いのかによって切りぬくポイントを決めています。また、20秒ごとに切り抜いた場面が隣接していたり、近そうだったらそれらをマージして一つの場面として扱うようにしています。
区切りたい区間の秒数、元動画の切り抜く場面の時間指定、切り抜く場面の閾値を指定可能です。
元動画のダウンロード
他のQuitaの記事だと、ffmpegとyt-dlpをつかって秒数指定で切り抜いていますが、自分の環境だとなぜかダウンロードが非常に遅くなってしまうので、一度元動画をダウンロードしてから切り抜いていこうと思います。
元動画のダウンロードコード
from pytube import YouTube
import sys
def show_progress_bar(stream, chunk, bytes_remaining):
total_size = stream.filesize
bytes_downloaded = total_size - bytes_remaining
percentage_of_completion = bytes_downloaded / total_size * 100
bar_length = 40 # 進行状況バーの長さ
filled_length = int(bar_length * percentage_of_completion // 100)
bar = '█' * filled_length + '-' * (bar_length - filled_length)
sys.stdout.write(f'\r|{bar}| {percentage_of_completion:.1f}% 完了')
sys.stdout.flush()
url = 'https://www.youtube.com/watch?v=YyfxRRTr7zo'
yt = YouTube(url)
# on_progress_callbackを設定
yt.register_on_progress_callback(show_progress_bar)
# 音声を含むストリームを取得(通常は最高解像度)
stream = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first()
# 動画のダウンロード(現在のディレクトリに保存)
print("ダウンロード開始...")
stream.download()
print("\nダウンロード完了")
切り抜きコード
def cut_merged_intervals(video_path, merged_intervals, output_dir):
"""
マージされた区間に基づいて動画をカットし、指定されたディレクトリに保存する。
:param video_path: カットする元の動画ファイルへのパス。
:param merged_intervals: マージされた区間のリスト。各要素は(start_sec, end_sec)の形式。
:param output_dir: カットされた動画を保存するディレクトリ。
"""
# 出力ディレクトリを確認し、存在しない場合は作成
if not os.path.exists(output_dir):
os.makedirs(output_dir)
for i, (start, end) in enumerate(merged_intervals):
output_file = os.path.join(output_dir, f"cut_{i+1}.mp4")
cmd = [
'ffmpeg',
'-y', # 既存の出力ファイルを上書き
'-ss', str(start),
'-i', video_path,
'-t', str(end - start),
'-c', 'copy', # エンコードを変更しない
output_file
]
subprocess.run(cmd)
print(f"Created {output_file}")
動画に字幕を追加する
上で得られた動画を無音部分でカットしてwhisperなどで文字起こしをし、assファイルを作成します。
それとffmpegを使い、元動画に合成します。
このあたりの処理はかなりごちゃごちゃしているので、要望があればコードの公開したいと思います。
試しに作った動画をXの方で上げているので、よければ見ていってください!
sample動画
問題点
動画を見ていただいた方はお気づきだと思うのですが、いくつかの問題点があるのでここに挙げさせていただきます。
1.動画の喋ってる声が途中で途切れてしまう部分がある
⇛恐らくはじめに20秒ごとに盛り上がるポイントを処理したため、
動画の一番最後の部分が切られてしまってる。
解決策:動画の前後に3秒程度の動画を付け足してから処理する。
2.音声翻訳があっていない部分がある。
解決策:whisperをfune-tunigして精度上げる。
3.字幕が画面から飛び出ている
解決策:字幕のass fileを作るときに、改行や字幕の省略メソッドを追加する。
これらの修正点は今後直していきます。
AIモデルの開発(過程2)
これまでデータ作成と初期の動画制作を進めてきましたが、まだ品質向上の余地があります。そこで、コメント内容をデータセットに加え、動画の質を高めるとともに、タイトルとサムネイルの制作にも取り掛かります!
AIモデル調査
今まで全くモデル開発をしたことがないので、これから調査していきます
備考
AI開発をする予定だったのですが、すでにできている動画の質が思ったよりも良かったためこれからは動画投稿の方に力を入れていきます!
動画投稿のほうが順調でしたら、またAI開発も考えていきます。
詳しくはこちらのQuitaの記事にて書いていきます!
生配信の盛り上がりポイントを自動で探し出す「切り抜きAI」、なにかご意見等あれば、Xまでお知らせいただけると嬉しいです。今後もこの記事の方更新していくのでよろしくお願いします!