概要
APIを組み合わせて何かしらアプリを作る練習として、Podcastエピソードの内容を要約するアプリを作ってみました。
OpenAIのWhisperとChatGPTのAPIを使っています。
全体コードはこちら。
アプリの機能紹介
- PodcastのRSSフィードを利用して、メタ情報を取得
- ユーザのボタン押下をトリガーに、当該エピソードのダウンロード、文字起こし、要約、翻訳を実行(エピソードの長さによりますが割と時間がかかるので、gifでは過程を省略しています)
処理プロセスと実装内容
上記機能を構成する各処理内容は以下のようになっています。
1. Podcastチャンネルのエピソード情報の取得
- PodcastチャンネルのRSSフィードのリンクをユーザに入力してもらい、そこから当該チャンネルの各エピソードの情報を得ます。
- ※チャンネルのRSSフィード自体は、下記のようなサイトから知ることができます。
- 全てのチャンネルで機能するのかは検証していないですが、自分が聴いている5つほどのチャンネルでは下記のような共通タグで情報を取得できました(該当コード全体はこちら)。
# コードイメージ(抜粋)
〜略〜
self.url = url # RSSフィードのurl
self.list_tag = [
"title", # そのエピソードのタイトル
"enclosure", # そのエピソードのmp3のリンク
"pubDate", # エピソード公開日時
"duration", # エピソードの長さ
"description", # エピソードの説明
"creator", # 当該チャンネルの作者
]
def _find(self, item: Tag, tag: str) -> None | str:
res = item.find(tag) # 目的のtagを与えて内容を抜き出す
if tag == "enclosure":
return res["url"]
else:
return res.text
def _get_items(self) -> list[Tag]:
r = requests.get(self.url) # RSSの情報を取得
soup = BeautifulSoup(r.content, "xml")
items = soup.find_all("item") # スクレイピング
return items
- アプリ画面では、下記のようにユーザからRSSリンクを受け取り、そこから内部的に情報を取得し、全エピソードの内容をドロップダウンで選択できるようにしています(該当コード全体はこちら)。
# コードイメージ
url = st.text_input(
"Enter a podcast RSS feed URL",
value="https://podcasts.files.bbci.co.uk/p02nrsjn.rss", # デフォルト表示(例)
)
df = get_metadata(url) # この中で、ポッドキャストの情報をスクレイピングしている
list_episode_label = [
f"No. {episode['id']}: {episode['title']} - {episode['pubDate_str']}"
for episode in df.to_dict("records")
]
# 取得した情報を元にエピソードのリストをユーザに提示し、選ばせる
episode = st.selectbox("Select an episode from the podcast channel", list_episode_label)
- UIは下記のような感じ
2. 音源のダウンロード
- ユーザがエピソードを選択し、"Generate a summary!"ボタンを押下すると、まず音源をローカルにダウンロードしてきます。
- また、OpenAIのWhisperで一度に入力として入れられるサイズには25MBという制限があるので、mp3ファイルをダウンロードした後に、chunkに分けます(該当コード全体はこちら)
-
pydub
のAudioSegment
を使うと分割することができます - なお、分割するサイズの単位はmsなので、
chunk_size = 10 * 60 * 1000
のように指定する必要があります
-
# コードイメージ
from pydub import AudioSegment
def _download_audio(url: str, title: str, output_dir: Path) -> Path:
"""Downloads the audio from the podcast"""
r = requests.get(url)
output_path = output_dir / f"{title}.mp3"
with open(output_path, "wb") as f:
f.write(r.content)
return output_path
def _chunk_audio(file_path_to_chunk: Path, chunk_size: int) -> None:
"""Chunks the audio file into chunks"""
title_audio = file_path_to_chunk.name.split(".")[0]
output_chunk_dir = file_path_to_chunk.parent / title_audio
os.makedirs(output_chunk_dir, exist_ok=True)
audio_segment = AudioSegment.from_mp3(file_path_to_chunk)
for i, chunk in enumerate(audio_segment[::chunk_size]): # chunk_sizeごとに分割して保存し直す
idx = i if i >= 10 else f"0{i}"
chunk.export(f"{output_chunk_dir}/audio_{idx}.mp3", format="mp3")
# delete the original file
os.remove(file_path_to_chunk)
3. 文字起こし
- 音源を分割して保存できたら、OpenAIのWhisper APIを使ってchunkごとに文字起こしします(該当コード全体はこちら)。
- 最終産物を単なる要約にしているので、話者分離については横着してやっていません
# コードイメージ
def transcribe(audio_file_path: Path, model_name: str = "whisper-1") -> str:
"""Transcribes the audio file using the OpenAI API"""
with audio_file_path.open("rb") as audio_file:
transcript = openai.Audio.transcribe(model_name, audio_file, verbose=True)
return transcript["text"]
余談: 複数言語が話される音声
- 現時点では、複数言語が混じった音声ファイルについて、話された言語そのままに文字起こしは上手くできないようでした(ベストプラクティスを知っている方いればお願いします)。
- APIでは、languageを指定することもできますし、しない場合は最初の30秒で言語判定がされますが、いずれにせよ、モデルの認識した言語と異なる音声が話された場合、出力が変になったり、なぜか勝手にその言語に翻訳した文字起こしをしたりするようです。自分も英語と日本語が話されるpodcastにおいて、出力が微妙な感じになるのを確認しました。
4. 要約
- 文字起こしの結果が得られたら、あとはお馴染みのChatGPTのAPIに与えて、要約させます(該当コード全体はこちら)。
- 1時間近いようなpodcastの文字起こしの分量は膨大なので、text splitterで文章を分割します。
# コードイメージ
from langchain.text_splitter import RecursiveCharacterTextSplitter
def _split_text(chunk_size: int, chunk_overlap: int) -> list[str]:
"""Split the text into multiple documents"""
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
model_name=self.model_name,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
)
texts = text_splitter.split_text(self.text)
return texts
- それぞれの分割された文章を要約させ、あとから、その要約を再度要約するmapreduceにならった方法をとりました。
- langchainを使えばmapreduce要約は関数一つでできるわけですが、最終要約前の各要約も欲しかったのと、それぞれの要約の塩梅の調整は直接APIを叩いた方が楽な気がしています。
# コードイメージ
# 分割した各文章を要約させる
def _summarize_transcript(self, title: str, text: str, max_tokens: int) -> str:
"""Generate summary from the transcript"""
user_message = f"""
Your task is to expertly summarize the content of a podcast.
The podcast title is {title}.
As you read through the transcript, please adhere to the following requirements in your summary:
- Match the Tone: The tone of your summary should align with the atmosphere of the content being discussed. If the subject matter is serious, maintain a formal tone; conversely, if the content is light-hearted, reflect that in a more casual style.
- Sectional Breakdown: Divide your summary into sections based on different topics discussed in the podcast.
- Language Consistency: Ensure that the summary is written in the same language as the transcript.
- Caution: The transcript for summarization is a segment of a larger podcast. When you summarize, focus exclusively on the segment provided. It's important to remember not to add any concluding remarks or extrapolations beyond what is presented in this specific portion. Your task is to create a concise and accurate summary of this given segment alone, adhering strictly to the content it contains.
- Format: The output should be in markdown format. Each section should start with a header '###' and the header should be the topic of the section. Do not add title header of the summary, just the sections.
The transcript of the episode is as follows:
{text}
"""
res = openai.ChatCompletion.create(
model=self.model_name,
messages=[{"role": "user", "content": user_message}],
max_tokens=max_tokens,
)
return res["choices"][0]["message"]["content"]
# 分割部分の要約のリストを得る
def get_list_summary(self, max_tokens: int) -> list[str]:
"""Generate summaries from transcripts"""
list_article = []
for text in tqdm(self.list_split_text):
article = self._summarize_transcript(
text=text, title=self.title, max_tokens=max_tokens
)
list_article.append(f"{article} \n\n")
return list_article
# 要約のリストを受け取り、全体の要約を作る
def summarize_summaries(self, texts: list[str], max_tokens: int) -> str:
"""Summarize the summaries"""
summaries = "".join(texts)
user_message = f"""
You are a professional summarizer.
You will be provided with a text that is a combination of summaries from different segments of a podcast.
Your task is to create a further condensed summary of this combined text. While doing so, please ensure to:
- Preserve the Tone: Maintain the atmosphere and style of the original summaries. Whether the content is serious, humorous, or of any other tone, your summary should reflect that.
- Language Consistency: The summary should be in the same language as the provided text.
- Topic-Based Organization: Structure your summary by dividing it into sections based on the different topics covered in the summaries.
- Format: The output should be in markdown format. Each section should start with a header '###' and the header should be the topic of the section. Summary should start with title header '##'.
Here are the combination of summaries you need to summarize:
{summaries}
"""
res = openai.ChatCompletion.create(
model=self.model_name,
messages=[{"role": "user", "content": user_message}],
max_tokens=max_tokens,
)
return res["choices"][0]["message"]["content"]
5. 翻訳
- 英語のPodcastの内容を日本語で見たかったりするので、ユーザからの言語指定を受け取って、翻訳する機能も持たせました(スニペットは略。該当コード全体はこちら)
その他余談
デプロイについて
- OpenAIのAPI(有料)を使用することになるので、今回はStreamlit Cloudなどへのパブリックなデプロイはしませんでした(Streamlitアプリのデプロイについて以前書いた記事)。
-
.env
ファイルにAPIキーなどの必要情報を記載して、自前でアプリを立ち上げる形にしています(README参照)。
Streamlit自体の実装(状態管理)について
- (そもそもStreamlitって何という場合はこちら)
- Streamlitは、何らかのアクションで画面の状態が更新されるとコードの上から再度実行され直されてしまうので、今回のような多段階の処理を実装する場合、画面の状態を上手く管理する必要があり、割とこれが大変です
- 今回でいえば下記のように、画面の部分部分で、表示を固定したり更新したりといったコントロールをしなければなりません
- 生成した要約を保持する
- オリジナルの要約を保持しつつも、新たにその翻訳を生成し、表示する
- 新たなエピソードが選ばれた場合は要約等をリセットして新規に更新する
- その際には下記のように、session_stateを使うことになりますが、自分的には、この辺りが一番Streamlitでコードを書く際に苦労するので、もし参考になれば幸いです(該当コードの全体はこちら)。
# コードイメージ(色々と抜粋)
import streamlit as st
# 状態が変わっても保持したいsession stateの定義
if "current_episode_id" not in st.session_state:
st.session_state["current_episode_id"] = None
if "generated" not in st.session_state:
st.session_state["generated"] = False
if "summary" not in st.session_state:
st.session_state["summary"] = None
# ~ 略 ~
# ユーザの操作によってはsession stateを初期化
if st.session_state["current_episode_id"] != df_episode["id"]:
st.session_state["current_episode_id"] = df_episode["id"]
st.session_state["generated"] = False
st.session_state["summary"] = None
st.session_state["summary_translated"] = None
# 何らかのトリガー(ユーザ操作)とsession stateの状態の組み合わせが条件を満たすときに処理を実行
if is_generate and not st.session_state["generated"]:
# ~略~
# 処理によってsession stateを更新していく
with st.spinner("Summarizing summaries... Please wait."):
summary = article_generator.summarize_summaries(
texts=list_summary_detail, max_tokens=4096
)
st.session_state["summary"] = summary
st.session_state["generated"] = True
# session stateの値によって、表示する内容をコントロールする
if st.session_state["generated"]:
st.subheader("Generated contents")
with st.expander("Summary"):
st.markdown(st.session_state["summary"])
# ~略~
if (
st.session_state["summary_translated"] is not None
and st.session_state["list_summary_detail_translated"] is not None
):
st.subheader("Translated contents")
with st.expander("Show summary translated"):
st.markdown(st.session_state["summary_translated"])
# ~略~