PandasのDataFrameに対して、OpenAI APIを利用した処理を並列で呼び出す方法
PandasのDataFrameに対して、OpenAI APIを利用した処理を並列で呼び出す方法を調べてみました。結論としては、pandarallelを使うのが最も簡単だろうという感じです。
from pandarallel import pandarallel
pandarallel.initialize()
summary_df["summary"] = summary_df["url"].parallel_apply(get_summary)
display(summary_df)
前提条件
- openai==1.16.2
- pandarallel==1.6.5
事前準備
from __future__ import annotations
import openai
import urllib
import pandas as pd
urls = [
"https://www.inoue-kobo.com/llm/openai-reduce-embedding-dim/",
"https://www.inoue-kobo.com/aws/selenium-serverless/",
"https://www.inoue-kobo.com/aws/aws-service-summary/",
"https://www.inoue-kobo.com/ai_ml/duckduckgo-langchain-langsmith/",
"https://www.inoue-kobo.com/ai_ml/llamaindex-pdf-gradio/",
]
summary_df = pd.DataFrame(urls, columns=["url"])
pd.set_option("display.max_colwidth", None)
単純に apply するだけ
def get_summary(url: str) -> str | None:
res_web = urllib.request.urlopen(url) # type: ignore
content = res_web.read().decode("utf-8")
res_openai = openai.chat.completions.create(
model="gpt-3.5-turbo",
temperature=0,
messages=[
{
"role": "system",
"content": "以下はWebサイトの内容です。HTMLタグを削除した上で、150文字以内で要約してください。",
},
{"role": "user", "content": content},
],
)
return res_openai.choices[0].message.content
summary_df["summary"] = summary_df["url"].apply(get_summary)
display(summary_df)
実行時間は12.3sでした。applyしただけでは並列処理は行われないため、この処理時間が基準になります。
asyncを使う
import asyncio
async def aget_summary(url: str) -> str | None:
res_web = urllib.request.urlopen(url) # type: ignore
content = res_web.read().decode("utf-8")
res_openai = openai.chat.completions.create(
model="gpt-3.5-turbo",
temperature=0,
messages=[
{
"role": "system",
"content": "以下はWebサイトの内容です。HTMLタグを削除した上で、150文字以内で要約してください。",
},
{"role": "user", "content": content},
],
)
return res_openai.choices[0].message.content
summary_df["summary"] = await asyncio.gather(*[aget_summary(url) for url in urls])
display(summary_df)
実行時間は12.8sでした。asyncioでは期待したような並列処理(正確にはノンブロッキングIO)が行われていないことが確認できます。
ThreadPoolExecutorを使う
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
summary_df["summary"] = list(executor.map(get_summary, urls))
display(summary_df)
実行時間は4.3sでした。applyを適用しただけの順次処理よりも大幅に処理時間が短くなっています。並列処理が行われていることが確認できます。
pandarallelを使う
from pandarallel import pandarallel
pandarallel.initialize()
summary_df["summary"] = summary_df["url"].parallel_apply(get_summary)
display(summary_df)
実行時間は3.0sでした。こちらも並列処理が行われていることが確認できます。なお、ThreadPoolExecutor
よりも処理時間が早くなっていますが、これはOpenAI APIのレスポンス時間の揺れによるものです。
おまけ
async-openaiというものがあるらしいですが(非公式ライブラリです)、今回は未検証です。