はじめに
LangChainとStreamlitを使って、複数のYouTube動画を同時に検索&要約するアプリを作ってみました。最近、DeepResearch系のクローンを見ていると、非同期処理でタスクを効率化してる例が多くて、「自分ももっと非同期処理を使いこなしたい!」と思ったのがきっかけです。そんなわけで、今回はPythonの非同期処理を試してみたくて、実験的に作ったアプリです。試験的な実装なので、その点はご容赦ください!
作ったもの紹介:YouTube検索アプリ
このアプリでは、ユーザーが入力したキーワードに基づいてYouTube動画を検索し、見つかった複数の動画のトランスクリプトを取得して要約を行います。動画だとわかりにくいですが、各動画の要約は非同期処理を活用しています。要約が完了したものから順番にアプリ上へ表示されています。
コード全文は本記事の最下部に記載しています。
処理の流れ
- 最初に、ユーザー入力からYoutube動画のURLリストを取得します
- YoutubeのURL検索にはLangChainのツール
YouTubeSearchTool
を使用します - ユーザー入力から検索クエリの生成には、LangChainの
llm.bind_tools
メソッドを使用してツール使用の可否や、検索クエリの生成を行っています
- YoutubeのURL検索にはLangChainのツール
- 次に、動画ごとに取得&要約タスクを生成し、非同期で実行します
- asyncioを使用して、非同期でタスクを処理します
- llmの非同期実行は、LangChainの
llm.ainvoke
メソッドを使用します - また、完了したタイミングで画面に結果を反映するようにしています
バージョン
- Python 3.11.9
langchain-community==0.3.20
langchain-google-genai==2.1.1
python-dotenv==1.0.1
streamlit==1.43.2
youtube-search==2.1.2
youtube-transcript-api==1.0.2
実装説明
本アプリの実装を以下の4点に分割して説明します。
- YouTubeURL検索機能
- 動画要約機能
- 並列処理
- Streamlitアプリ
1. YouTubeURL検索機能
動画URLの取得は、LangChainのYouTubeSearchTool
を使用します。ツールの仕様判断や検索クエリの生成には、LangChainのbind_tools
メソッドを使用し、LLMにYouTubeSearchToolを紐付けています。
def search_youtube_urls(prompt:str)->Tuple[str, List[str]]:
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash")
tool = YouTubeSearchTool()
llm_with_tools = llm.bind_tools([tool])
response = llm_with_tools.invoke(prompt)
if response.tool_calls==[]:
ai_message = response.content
return ai_message, []
else:
tool_query = response.tool_calls[0]['args']
response_tool = tool.invoke(tool_query)
response_tool = eval(response_tool)
ai_message = f'{len(response_tool)}件の動画を検索します。'
return ai_message, response_tool
コメント
- bind_toolを使用するため、自分でプロンプトや処理を書かなくて良いため、実装が楽です!
- ただし、ユーザーの入力から検索クエリを生成する工程の精度は悪く、全く期待通りに動いてくません
- 理由はおそらくYouTubeSearchToolの説明になぜか人物に限定されたツールと記載されているためです。
search for youtube videos associated with a person.
- こういうことは多いため、実務の場合はLangChainに設定されているプロンプトは過信せず、自分でプロンプトを書いた方が良いと考えます
2. 動画要約機能(非同期実装)
YouTube動画のトランスクリプトを取得し、LLMを使用して要約する非同期関数です。
async def summarize_youtube_videos(url:str, task_id:int) -> str:
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash")
ytt_api = YouTubeTranscriptApi()
video_id = url.split("v=")[1].split('&')[0]
try:
ytt_api_res = ytt_api.fetch(video_id, languages=['ja', 'en'])
except Exception as e:
message = f"動画のトランスクリプトを取得できませんでした。\n\n{url}"
return message, url
transcript = '\n'.join([s.text for s in ytt_api_res.snippets])
llm_response = await llm.ainvoke(f'以下のトランスクリプトを要約して。可能な限り短く\n\n{transcript}')
message = llm_response.content + f'\n\n{url}'
return message, url
コメント
-
ytt_api.fetch
の行で、日本語または英語のトランスクリプトを取得しています -
await llm.ainvoke
の行でコルーチンの実行を一時停止します
3. 並列処理の実装
複数の動画を並列に処理するための非同期関数です。
async def parallel_summarize(
urls :List[str],
) -> List[str]:
messages = [
asyncio.create_task(summarize_youtube_videos(url, task_id))
for task_id, url in enumerate(urls)
]
completed_count = 0
for future in asyncio.as_completed(messages):
message, url = await future
st.session_state.messages.append({"role": "agent", "content": message})
with st.chat_message("1"):
st.write(f"{message}")
st.video(url)
completed_count += 1
progress = int((completed_count / len(urls)) * 100)
progress_bar.progress(progress, text=f"現在{completed_count}件のタスクが完了しました ({progress}%)")
コメント
- トランスクリプト取得&要約タスクの生成は
asyncio.create_task
を使用しいます - タスク完了後の処理は
asyncio.as_completed
を使用しています。完了するたびに完了したタスクから順に結果を処理します - 進捗確認するため、処理の進捗状況をStreamlitのプログレスバーで表示し、非同期タスクの進捗を確認できるようにしています
4. Streamlitアプリ
StreamlitによるWebアプリの実装も紹介します。
ユーザーからの入力をもとに
st.title("Parallel YouTube Search")
if "messages" not in st.session_state:
st.session_state.messages = []
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.write(message["content"])
prompt = st.chat_input("Youtubeをパラレルに検索します。何を検索しますか?(例,東京都たこやき3件)")
if prompt :
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.write(prompt)
ai_message, youtube_urls = search_youtube_urls(prompt)
st.session_state.messages.append({"role": "assistant", "content": ai_message})
with st.chat_message("assistant"):
st.write(ai_message)
if youtube_urls != []:
progress_bar = st.progress(0, text="タスクを開始待ち...")
asyncio.run(parallel_summarize(youtube_urls))
progress_bar.empty()
コメント
- チャットインターフェースでユーザーの入力を受け付けます
まとめ
今回は、LangChainとStreamlitを組み合わせて、非同期処理を活用したYouTube検索・要約アプリケーションを開発しました。
かなり簡単に実装できましたが、複雑なエージェントシステムを構築する場合はLLMの処理部分とアプリ部分の適切な分離設計が重要だと感じました。今後調査したいと思います。
コード全文
ソースコード
import asyncio
import streamlit as st
from typing import List, Tuple
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_community.tools import YouTubeSearchTool
from youtube_transcript_api import YouTubeTranscriptApi
def search_youtube_urls(prompt:str)->Tuple[str, List[str]]:
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash")
tool = YouTubeSearchTool()
llm_with_tools = llm.bind_tools([tool])
response = llm_with_tools.invoke(prompt)
if response.tool_calls==[]:
ai_message = response.content
return ai_message, []
else:
tool_query = response.tool_calls[0]['args']
response_tool = tool.invoke(tool_query)
response_tool = eval(response_tool)
ai_message = f'{len(response_tool)}件の動画を検索します。'
return ai_message, response_tool
async def summarize_youtube_videos(
url :str,
task_id : int,
) -> str:
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash")
ytt_api = YouTubeTranscriptApi()
video_id = url.split("v=")[1].split('&')[0]
try:
ytt_api_res = ytt_api.fetch(video_id, languages=['ja', 'en'])
except Exception as e:
message = f"動画のトランスクリプトを取得できませんでした。\n\n{url}"
return message, url
transcript = '\n'.join([s.text for s in ytt_api_res.snippets])
llm_response = await llm.ainvoke(f'以下のトランスクリプトを要約して。可能な限り短く\n\n{transcript}')
message = llm_response.content + f'\n\n{url}'
return message, url
async def parallel_summarize(
urls :List[str],
) -> List[str]:
messages = [
asyncio.create_task(summarize_youtube_videos(url, task_id))
for task_id, url in enumerate(urls)
]
completed_count = 0
for future in asyncio.as_completed(messages):
message, url = await future
st.session_state.messages.append({"role": "agent", "content": message})
with st.chat_message("1"):
st.write(f"{message}")
st.video(url)
completed_count += 1
progress = int((completed_count / len(urls)) * 100)
progress_bar.progress(progress, text=f"現在{completed_count}件のタスクが完了しました ({progress}%)")
st.title("Parallel YouTube Search")
if "messages" not in st.session_state:
st.session_state.messages = []
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.write(message["content"])
prompt = st.chat_input("Youtubeをパラレルに検索します。何を検索しますか?(例,東京都たこやき3件)")
if prompt :
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.write(prompt)
ai_message, youtube_urls = search_youtube_urls(prompt)
st.session_state.messages.append({"role": "assistant", "content": ai_message})
with st.chat_message("assistant"):
st.write(ai_message)
if youtube_urls != []:
progress_bar = st.progress(0, text="タスクを開始待ち...")
asyncio.run(parallel_summarize(youtube_urls))
progress_bar.empty()