1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Gemini Proを非同期処理で呼び出す

Posted at

Geminiについて

Geminiとは、Google提供するLLMの一つです。OpenAIのChatGPT APIのように、APIが用意されています。今回は、そのAPIへのリクエストの非同期実装を行います。
https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/gemini?hl=ja

なぜ非同期で呼び出したいのか?

動機は、Geminiへの多くのリクエストを短時間で処理したいからです。最近、学習データの特徴量生成にGemini APIを使っているのですが、めちゃくちゃ実行時間がかかります。例えば、学習データが5000行のcsvファイルである場合、Gemini APIへ5000回のリクエストを送らないと行けません。そうすると、5時間くらいかかってしまいます。さすがに、待てないです。そういうことで、非同期処理でGemini Proを呼び出すコードを実装します。技術的には、Pythonの標準パッケージであるasyncioを使います。

asyncioを使う理由

Pythonだと、非同期実行を可能にしてくれるパッケージとして、multiprocessingとasyncioの2つがあります。今回は、asyncioを選びました。API呼び出しタスクのようなI/Oタスクでの非同期処理に特化しているからです。ファイル読み込みやAPI呼び出し中に、スレッドが待ちに入ります。asyncioは、そのスレッドを使って、別の処理を実行することを可能にします。こうすることで、全体の処理の高速化が実現できます。

コード紹介

Gemini API 呼び出し編

まずは、パッケージのインポートを行います。

import os

import aiohttp
import asyncio

以下のコードは、非同期でGemini APIを呼び出すコードです。GCP経由の呼び出しなので、os.getenv('GCP_PROJECT_ID')を使っています。aiohttpを用いて、非同期で処理しています。
parse_streaming_response関数は、レスポンスからGeminiによって生成されたテキストを取り出す処理を行っています。


async def async_call_gemini_pro(prompt: str):

    url = f"https://us-central1-aiplatform.googleapis.com/v1/projects/{os.getenv('GCP_PROJECT_ID')}/locations/us-central1/publishers/google/models/gemini-1.0-pro:streamGenerateContent"
    header = {
        "Authorization": f"Bearer {os.getenv('GEMINI_API_TOKEN')}",
        "Content-Type": "application/json; charset=utf-8"
    }
    json_body = {
        "contents": {
            "role": "user",
            "parts": {
                "text": prompt
            }
        },
        "safety_settings": {
            "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
            "threshold": "BLOCK_LOW_AND_ABOVE"
        },
        "generation_config": {
            "temperature": 0.9,
            "topP": 1.0,
            "maxOutputTokens": 200
        }
    }
    async with aiohttp.ClientSession() as session:
        async with session.post(
            url,
            headers=header,
            json=json_body
        ) as response:
            streaming_response = await response.json()
            final_response = parse_streaming_response(streaming_response)
            return final_response

以下がメインの非同期処理を呼び出すコードです。

async def main():
    tasks = []
    for i in range(10):
        tasks.append(async_call_gemini_pro(
            "今の気分を詩にしてください")
        )
    results = await asyncio.gather(*tasks)
    print("Results")
    print(results)

で、結局速くなったのか?

asyncioありとなしで、10回のリクエストを処理する実行時間を比較してみました。

非同期 or Not 実行時間(秒)
非同期 2.9
Not 非同期 21.5
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?