1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

KubernetesAdvent Calendar 2024

Day 10

RunnableK8sをbatch()とstream()対応したけど使い道がわからない

Posted at

これは、Kubernetes Advent Calendar 2024の10日目の記事で、以前投稿した「LangChainのRunnableをKubernetesで実行するRunnableを書いてみた」という記事の続き。

RunnableK8sとは

RunnableK8sは、LangChainのRunnalbeをラップすると、その部分をKubernetesで実行できるRunnable。
以下のような感じで、ラップしたRunnableとそのインプットをシリアライズして、Podの標準入力に流し込んで渡して、Pod内で実行し、結果をPodの標準出力経由で返す、という風に動く。

image.png

前回の記事の時点ではRunnableの最も基本的なメソッドであるinvoke()だけを実装していた。
今回の記事では、batch()stream()を実装して動かしてみる。

batch()とstream()とは

invoke()が、ひとつのインプットを受け取り、Runnableを同期的に一回一括で実行して一つのアウトプットを生成するのに対して、batch()stream()は以下のような感じ。

  • batch(): 複数のインプットを受け取り、(並列で)Runnableを同期的にインプットの数だけまとめて実行して、複数のアウトプットを生成する。
  • stream(): 一つのインプットを受け取り、Runnableを同期的に1回実行しつつ、アウトプットをストリームとして生成する。もう少し正確に言うと、stream()を実行すると、アウトプットの断片(chunk)を順次生成するイテレータを返す。

batch()の実装

batch()は自前のコードを書く必要がなかった。
Runnableクラスに実装されているbatch()デフォルト実装が、インプットの数だけスレッドを立ち上げてinvoke()を呼ぶというもので、それで十分なので継承したものをそのまま採用。

このbatch()を実行するサンプルコードは以下。

sample_batch.py
import sys

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from runnables.runnable_k8s import RunnableK8s


prompt = ChatPromptTemplate.from_messages([
    (
        "system",
        "あなたはポケモンのエキスパートです。ユーザの質問に答えてください。",
    ),
    (
        "human",
        "質問: {input}",
    )
])
llm = ChatOpenAI(model="gpt-4o-mini")

chain = prompt | RunnableK8s(bound=llm) | StrOutputParser()

outputs = chain.batch([
  {'input': sys.argv[1]},
  {'input': sys.argv[2]},
  {'input': sys.argv[3]},
])

for o in outputs:
    print(o + '\n')

このコードは、コマンドラインから受け取った3つのポケモンに関する質問について、プロンプトを生成し、OpenAIに投げて、回答をテキストに変換して標準出力に吐く、というのをbatch()でまとめて実行する。
OpenAIに投げる部分をRunnableK8sでラップしているので、そこだけKubernetesで実行される。

これを実行した様子は以下。

batch.gif

RunnableK8sが3つのpodを同時に作って、質問を並列で処理できている。
回答は全部間違ってるけど。

stream()の実装

stream()は、Runnableクラスのデフォルト実装が単にinvoke()を呼び出すというもので、ストリーム処理になってないので、自前で実装した。

実装の中身は、冒頭の図のような処理をするinvoke()と大きく変わらないが、podのなかでRunnableのstream()を実行し、そのchunkを順次標準出力に吐き、RunnableK8s側のイテレータで順次返すようにした。

このstream()を実行するサンプルコードは以下。

sample_stream.py
import sys

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from runnables.runnable_k8s import RunnableK8s


prompt = ChatPromptTemplate.from_messages([
    (
        "system",
        "あなたはポケモンのエキスパートです。ユーザの質問に答えてください。",
    ),
    (
        "human",
        "質問: {input}",
    )
])
llm = ChatOpenAI(model="gpt-4o-mini")

chain = prompt | RunnableK8s(bound=llm) | StrOutputParser()

for output in chain.stream({'input': sys.argv[1]}):
    print(f'{output}/', end='', flush=True)
print()

このコードは、コマンドラインから受け取った1つのポケモンに関する質問について、プロンプトを生成し、OpenAIに投げて、回答をテキストに変換して標準出力に吐く、というのをstream()でストリーム実行する。

RunnableK8sでラップしたOpenAIのLLMからは回答がchunkで分割されて順次くるので、それを標準出力に吐くときに、chunkの間に/を挟んで切れ目をわかりやすくしている。

これを実行した様子は以下。

stream.gif

RunnableK8sでストリーム処理できた。
回答もちょっとくどいけど合ってる。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?