LoginSignup
37
34

実行例で理解する `Runnable` in `langchain`

Last updated at Posted at 2024-02-23

概要

本記事は、langchain フレームワーク内の Runnable クラスに焦点を当て、その理解と活用を目的としています。langchain は、OpenAIやGeminiなどの一般的に知られている言語モデル(LLM)を使ったアプリケーション開発を支援するためのフレームワークであり、本記事では Runnable クラスの役割とそのメソッドの使い方に触れます。特に Runnableクラスが提供する標準的なインターフェース以外のメソッドについて実際のコード共に説明します。

本記事で触れている Runnable クラスのメソッド一覧

メソッド 概要 ページ内リンク
Runnable.assign 出力(dict型)に新規フィールドを付与する新たな Runnable オブジェクトを作成 assign
Runnable.bind invokebatch などのキーワード引数をバインド bind
Runnable.config_schema invoke などで受けとる config のスキーマを pydantic モデルとして返す config_schema
Runnable.get_graph Runnable オブジェクトのグラフとして表現 get_graph
Runnable.get_input_schema invoke などの入力を検証するために使用できる Pydantic モデルを取得 get_input_schema
Runnable.get_name Runnable オブジェクトに付けられた名前を取得 get_name
Runnable.get_output_schema invoke などの出力を検証するために使用できる Pydantic モデルを取得 get_output_schema
Runnable.get_prompts Runnable オブジェクトに含まれるプロンプトを取得 get_prompts
Runnable.map 入力のリストを出力のリストに変換する Runnable オブジェクトを作成 map
Runnable.pick 出力(dict型)から指定されたキーの値を取得するための Runnable オブジェクトを作成 pick
Runnable.pipe 他の複数の Runnable オブジェクトを連結しチェーンを作成 pipe
Runnable.with_config Runnable オブジェクトにおいて config をバインドした新たな Runnable オブジェクトを作成 with_config
Runnable.with_fallbacks Runnable オブジェクトがエラーを投げた場合のフォールバックをアタッチした新たな Runnable オブジェクトを作成 with_fallbacks
Runnable.with_listeners Runnable オブジェクトにおいて、実行開始前・実行完了後・実行エラー時に呼ばれる callback をアタッチした新たな Runnable オブジェクトを作成 with_listeners
Runnable.with_retry Runnable オブジェクトの実行失敗時にリトライする Runnable オブジェクトを作成 with_retry
Runnable.with_types Runnable オブジェクトの入力型と出力型をバインドした Runnable オブジェクトを作成 with_types

本記事で提供されているコードは、特定の環境や条件下での動作を示すものであり、全ての環境やケースで同様に機能するとは限りません。また、時間の経過とともにソフトウェアの更新や互換性の問題が生じる可能性があるため、掲載されているコードが最新の状態であるとは限りません。本コードの使用は、読者の責任において行ってください。実行する前に、コードをよく理解し、必要に応じて環境に適合させることを推奨します。また、可能な限りバックアップを取ることを忘れないでください。本記事の内容やコードに関する質問や改善提案があれば、コメント欄やソーシャルメディアを通じてお知らせください。

2024/03/03 追記
Runnable クラスを継承したクラス群に対する説明および実行例に関する記事を執筆いたしました。併せて下記のリンク先もご参照ください。

はじめに

LLM が流行って一年程度。技術の進歩の早さに焦りを感じながらもインプットしたものを気が向くままにアウトプットしていきます。

さて本記事は langchainRunnable クラスについて詳細を見ていきます。
langchain1 は言語モデルを活用したアプリケーション開発のためのフレームワークの1つです。langchain を使えば OpenAI や Gemini など一般的によく知られている LLM などを使って RAG やエージェント作成を簡単に使うことができます。

そして、langchainRunnable クラスは langchain の LangChain Expression Language (LCEL) で中心的な役割を果たす抽象基底クラスです。

特に本書ではこの Runnable クラスについて理解を深めるべく、2のドキュメントをベースとして、Runnable クラスで定義されているメソッド深堀りし、LCEL のコーディング能力を向上させることを目的とします。

なお、LangChain Expression Language (LCEL) については下記の記事なども併せてご参考にしていただけるとより理解が深まると思います:

本書の対象読者

  • langchain を触ったことあるけど、まだよくわからない
    • Runnable クラスという言葉を聴いて、どのようなメソッドが定義されてるか思いつくかどうかが一つの判断基準です
  • langchain を触ったことなく、これから触りたい

本書の実行環境

本書の Python コードの実行例などは下記の環境に基づいています:

  • Windows 11
  • Python 3.11.13
    • langchain==0.1.8
    • langchain-openai==0.0.6
    • python-dotenv==1.0.1
    • gandarf==0.8

実行する際には OpenAI の API キーを取得の上、.envファイルを下記の内容で作成してください:

OPENAI_API_KEY={YOUR_API_KEY}

OpenAI API の利用には費用が発生します。詳細は https://openai.com/pricing をご確認ください。

Runnable クラス

役割

Runnable クラスは、langchain のカスタムチェーンの作成を簡単にするための抽象基底クラスです。

Runnable クラスでは langchain の標準的なインターフェースとして

  • stream: レスポンスのチャンクをストリームバックする
  • invoke: 入力に対してチェーンを呼び出す
  • batch: 入力リストに対してチェーンを呼び出す
  • astream: 非同期にレスポンスのチャンクをストリームバックする
  • ainvoke: 非同期に入力に対してチェーンを呼び出す
  • abatch: 非同期で入力リストに対してチェーンを呼び出す
  • astream_log: 最終的なレスポンスに加えて、中間ステップをストリームバックする

が提供されています3

Runnable クラスを継承した(つまり、上記のインタフェースを実装した)コンポーネントを下記のようにつなぎ合わせることで langchain のチェーンを作成することができます。

>>> from dotenv import load_dotenv
>>> from langchain_openai import ChatOpenAI
>>> from langchain_core.output_parsers import StrOutputParser
>>> from langchain_core.prompts import ChatPromptTemplate
>>> load_dotenv()
True
>>> prompt = ChatPromptTemplate.from_template("Explain {topic}")
>>> llm = ChatOpenAI(model='gpt-3.5-turbo')
>>> output_parser = StrOutputParser()
>>> chain = prompt | llm | output_parser

これらの詳細は下記の記事4 で説明してくださっていたので割愛させていただきます(大変参考にさせていただきました。ありがとうございます)。

そして、Runnable クラスでは上記のインターフェースだけではなく、その他にも有用なメソッドが実装されており、次節ではその一覧に触れていきます。

Runnable の属性・メソッド

属性

Attribute Description
InputType The type of input this runnable accepts specified as a type annotation.
OutputType The type of output this runnable produces specified as a type annotation.
config_specs List configurable fields for this runnable.
input_schema The type of input this runnable accepts specified as a pydantic model.
name The name of the runnable.
output_schema The type of output this runnable produces specified as a pydantic model.
[^3]より(2024/02/23)

メソッド

Method Description
init()
abatch(inputs[, config, return_exceptions]) Default implementation runs ainvoke in parallel using asyncio.gather.
ainvoke(input[, config]) Default implementation of ainvoke, calls invoke from a thread.
assign(**kwargs) Assigns new fields to the dict output of this runnable.
astream(input[, config]) Default implementation of astream, which calls ainvoke.
astream_events(input[, config, ...]) [Beta] Generate a stream of events.
astream_log() Stream all output from a runnable, as reported to the callback system.
atransform(input[, config]) Default implementation of atransform, which buffers input and calls astream.
batch(inputs[, config, return_exceptions]) Default implementation runs invoke in parallel using a thread pool executor.
bind(**kwargs) Bind arguments to a Runnable, returning a new Runnable.
config_schema(*[, include]) The type of config this runnable accepts specified as a pydantic model.
get_graph([config]) Return a graph representation of this runnable.
get_input_schema([config]) Get a pydantic model that can be used to validate input to the runnable.
get_name([suffix, name]) Get the name of the runnable.
get_output_schema([config]) Get a pydantic model that can be used to validate output to the runnable.
get_prompts([config])
invoke(input[, config]) Transform a single input into an output.
map() Return a new Runnable that maps a list of inputs to a list of outputs, by calling invoke() with each input.
pick(keys) Pick keys from the dict output of this runnable.
pipe(*others[, name]) Compose this runnable with another object to create a RunnableSequence.
stream(input[, config]) Default implementation of stream, which calls invoke.
transform(input[, config]) Default implementation of transform, which buffers input and then calls stream.
with_config([config]) Bind config to a Runnable, returning a new Runnable.
with_fallbacks(fallbacks, *[, ...]) Add fallbacks to a runnable, returning a new Runnable.
with_listeners(*[, on_start, on_end, on_error]) Bind lifecycle listeners to a Runnable, returning a new Runnable.
with_retry(*[, retry_if_exception_type, ...]) Create a new Runnable that retries the original runnable on exceptions.
with_types(*[, input_type, output_type]) Bind input and output types to a Runnable, returning a new Runnable.
[^3]より(2024/02/23)

※ 上記の表3では、Special Methods は省略されています。

以下ではインターフェースとして提供されている invoke などを除いて各メソッドを見ていきましょう

assign

assign は自身の出力(dict型)に新規フィールドを付与する新たな Runnable オブジェクトを作成します。

>>> from langchain_core.runnables import RunnableLambda 
>>> func = RunnableLambda(lambda x: {'x': x})
>>> func.invoke(10) 
{'x': 10}
>>> func.assign(assigned=RunnableLambda(lambda dic: dic['x']*2)).invoke('100')
{'x': '100', 'assigned': '100100'}

なお、これは

>>> from langchain_core.runnables.passthrough import RunnableAssign  
>>> from langchain_core.runnables import RunnableParallel
>>> (func | RunnableAssign(RunnableParallel(assigned=RunnableLambda(lambda dic: dic['x']*2)))).invoke(100)  
{'x': 100, 'assigned': 200}

と同じです。

bind

bindinvokebatch などのキーワード引数をバインドする RunnableBinding オブジェクトを作成します。つまり、bind で生成された RunnableBinding オブジェクトの invoke などでは bind で指定されたキーワード引数が自動的に指定されるようになります。

これは公式のドキュメント5にもあるように OpenAI の Functions や Tools をアタッチするのに有用です。以下、5 のコード例です。

function = {
    "name": "solver",
    "description": "Formulates and solves an equation",
    "parameters": {
        "type": "object",
        "properties": {
            "equation": {
                "type": "string",
                "description": "The algebraic expression of the equation",
            },
            "solution": {
                "type": "string",
                "description": "The solution to the equation",
            },
        },
        "required": ["equation", "solution"],
    },
}
# Need gpt-4 to solve this one correctly
prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "Write out the following equation using algebraic symbols then solve it.",
        ),
        ("human", "{equation_statement}"),
    ]
)
model = ChatOpenAI(model="gpt-4", temperature=0).bind(
    function_call={"name": "solver"}, functions=[function]
)
runnable = {"equation_statement": RunnablePassthrough()} | prompt | model
runnable.invoke("x raised to the third plus seven equals 12")

この例は

input_to_chat = ( {"equation_statement": RunnablePassthrough()} | prompt).invoke("x raised to the third plus seven equals 12")
chat = ChatOpenAI(model="gpt-4", temperature=0)
chat.invoke(input_to_chat, function_call={"name": "solver"}, functions=[function])

と同じです。

つまり、bind によって invoke でわざわざキーワード引数を指定するのを省略することができます。

config_schema

config_schemainvoke などで受けとる config のスキーマを pydantic モデルとして返してくれます。

>>> chat = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
>>> chat.config_schema()
<class 'pydantic.v1.main.ChatOpenAIConfig'>
>>> chat.config_schema().schema()
{'title': 'ChatOpenAIConfig', 'type': 'object', 'properties': {}}
get_graph

get_graphRunnable オブジェクトのグラフ表現を返します。
get_graph で返された Graph オブジェクトに対して、draw_ascii() を呼び出すと ASCII で可視化してくれます。

>>> chat = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
>>> chat.get_graph()
Graph(nodes={'dff830c537d341dcbda90e3051f4765f': Node(id='dff830c537d341dcbda90e3051f4765f', data=<class 'pydantic.v1.main.ChatOpenAIInput'>), 'f7a0977d5db243139449af92c64da4a4': Node(id='f7a0977d5db243139449af92c64da4a4', data=ChatOpenAI(client=<openai.resources.chat.completions.Completions object at 0x000002CD44106B50>, async_client=<openai.resources.chat.completions.AsyncCompletions object at 0x000002CD44111550>, temperature=0.0, openai_api_key=SecretStr('**********'), openai_proxy='')), '06d5b30809774781832d6f00a8628d2f': Node(id='06d5b30809774781832d6f00a8628d2f', data=<class 'pydantic.v1.main.ChatOpenAIOutput'>)}, edges=[Edge(source='dff830c537d341dcbda90e3051f4765f', target='f7a0977d5db243139449af92c64da4a4'), Edge(source='f7a0977d5db243139449af92c64da4a4', target='06d5b30809774781832d6f00a8628d2f')])
>>> print(chat.get_graph().draw_ascii())
+-----------------+
| ChatOpenAIInput |
+-----------------+
          *
          *
          *
   +------------+
   | ChatOpenAI |
   +------------+
          *
          *
          *
+------------------+
| ChatOpenAIOutput |
+------------------+

上記の例ではシンプルなチェーンですが、もっと複雑なチェーンも描画してくれます。

>>> chain = RunnableLambda(lambda x: x+'1') | RunnableParallel(y=RunnablePassthrough(), z=RunnableLambda(lambda x_: x_ + '2') | RunnablePassthrough()) | RunnableLambda(lambda dic: dic['y']+dic['z'])
>>> chain.invoke('hoge')
'hoge1hoge12'
>>> print(chain.get_graph().draw_ascii())
               +-------------+
               | LambdaInput |
               +-------------+
                      *
                      *
                      *
               +-------------+
               | Lambda(...) |
               +-------------+
                      *
                      *
                      *
            +--------------------+
            | Parallel<y,z>Input |
            +--------------------+
              ***            ***
            **                  ***
          **                       **
+-------------+                      **
| Lambda(...) |                       *
+-------------+                       *
        *                             *
        *                             *
        *                             *
+-------------+               +-------------+
| Passthrough |               | Passthrough |
+-------------+               +-------------+
              ***            ***
                 **        **
                   **    **
           +---------------------+
           | Parallel<y,z>Output |
           +---------------------+
                      *
                      *
                      *
               +-------------+
               | Lambda(...) |
               +-------------+
                      *
                      *
                      *
              +--------------+
              | LambdaOutput |
              +--------------+
get_input_schema / get_output_schema

invoke などの入力/出力を検証するために使用できるpydanticモデルを取得する。

>>> from langchain.pydantic_v1 import BaseModel
>>> class InputModel(BaseModel):
...     x: int
...     y: str
...
>>> class OutputModel(BaseModel):
...     z: str
...
>>> func = RunnableLambda(lambda input_: OutputModel(z=f'{input_.x}{input_.y}')).with_types(input_type=InputModel, output_type=OutputModel)
>>> func.invoke(InputModel(x=1, y='2'))
OutputModel(z='12')
>>> func.get_input_schema()
<class '__main__.InputModel'>
>>> func.get_input_schema().schema()
{'title': 'InputModel', 'type': 'object', 'properties': {'x': {'title': 'X', 'type': 'integer'}, 'y': {'title': 'Y', 'type': 'string'}}, 'required': ['x', 'y']}
>>> func.get_output_schema()
<class '__main__.OutputModel'>
>>> func.get_output_schema().schema()
{'title': 'OutputModel', 'type': 'object', 'properties': {'z': {'title': 'Z', 'type': 'string'}}, 'required': ['z']}

ちなみに ChatOpenAI().get_input_schema().schema() により ChatOpenAI() の入力スキーマを得ることができますが、かなり長いスキーマが得られます...

`ChatOpenAI()` の入力スキーマ
{'title': 'ChatOpenAIInput', 'anyOf': [{'type': 'string'}, {'$ref': '#/definitions/StringPromptValue'}, {'$ref': '#/definitions/ChatPromptValueConcrete'}, {'type': 'array', 'items': {'anyOf': [{'$ref': '#/definitions/AIMessage'}, {'$ref': '#/definitions/HumanMessage'}, {'$ref': '#/definitions/ChatMessage'}, {'$ref': '#/definitions/SystemMessage'}, {'$ref': '#/definitions/FunctionMessage'}, {'$ref': '#/definitions/ToolMessage'}]}}], 'definitions': {'StringPromptValue': {'title': 'StringPromptValue', 'description': 'String prompt value.', 'type': 'object', 'properties': {'text': {'title': 'Text', 'type': 'string'}, 'type': {'title': 'Type', 'default': 'StringPromptValue', 'enum': ['StringPromptValue'], 'type': 'string'}}, 'required': ['text']}, 'AIMessage': {'title': 'AIMessage', 'description': 'Message from an AI.', 'type': 'object', 'properties': {'content': {'title': 'Content', 'anyOf': [{'type': 'string'}, {'type': 'array', 'items': {'anyOf': [{'type': 'string'}, {'type': 'object'}]}}]}, 'additional_kwargs': {'title': 'Additional Kwargs', 'type': 'object'}, 'type': {'title': 'Type', 'default': 'ai', 'enum': ['ai'], 'type': 'string'}, 'name': {'title': 'Name', 'type': 'string'}, 'example': {'title': 'Example', 'default': False, 'type': 'boolean'}}, 'required': ['content']}, 'HumanMessage': {'title': 'HumanMessage', 'description': 'Message from a human.', 'type': 'object', 'properties': {'content': {'title': 'Content', 'anyOf': [{'type': 'string'}, {'type': 'array', 'items': {'anyOf': [{'type': 'string'}, {'type': 'object'}]}}]}, 'additional_kwargs': {'title': 'Additional Kwargs', 'type': 'object'}, 'type': {'title': 'Type', 'default': 'human', 'enum': ['human'], 'type': 'string'}, 'name': {'title': 'Name', 'type': 'string'}, 'example': {'title': 'Example', 'default': False, 'type': 'boolean'}}, 'required': ['content']}, 'ChatMessage': {'title': 'ChatMessage', 'description': 'Message that can be assigned an arbitrary speaker (i.e. role).', 'type': 'object', 'properties': {'content': {'title': 'Content', 'anyOf': [{'type': 'string'}, {'type': 'array', 'items': {'anyOf': [{'type': 'string'}, {'type': 'object'}]}}]}, 'additional_kwargs': {'title': 'Additional Kwargs', 'type': 'object'}, 'type': {'title': 'Type', 'default': 'chat', 'enum': ['chat'], 'type': 'string'}, 'name': {'title': 'Name', 'type': 'string'}, 'role': {'title': 'Role', 'type': 'string'}}, 'required': ['content', 'role']}, 'SystemMessage': {'title': 'SystemMessage', 'description': 'Message for priming AI behavior, usually passed in as the first of a sequence\nof input messages.', 'type': 'object', 'properties': {'content': {'title': 'Content', 'anyOf': [{'type': 'string'}, {'type': 'array', 'items': {'anyOf': [{'type': 'string'}, {'type': 'object'}]}}]}, 'additional_kwargs': {'title': 'Additional Kwargs', 'type': 'object'}, 'type': {'title': 'Type', 'default': 'system', 'enum': ['system'], 'type': 'string'}, 'name': {'title': 'Name', 'type': 'string'}}, 'required': ['content']}, 'FunctionMessage': {'title': 'FunctionMessage', 'description': 'Message for passing the result of executing a function back to a model.', 'type': 'object', 'properties': {'content': {'title': 'Content', 'anyOf': [{'type': 'string'}, {'type': 'array', 'items': {'anyOf': [{'type': 'string'}, {'type': 'object'}]}}]}, 'additional_kwargs': {'title': 'Additional Kwargs', 'type': 'object'}, 'type': {'title': 'Type', 'default': 'function', 'enum': ['function'], 'type': 'string'}, 'name': {'title': 'Name', 'type': 'string'}}, 'required': ['content', 'name']}, 'ToolMessage': {'title': 'ToolMessage', 'description': 'Message for passing the result of executing a tool back to a model.', 'type': 'object', 'properties': {'content': {'title': 'Content', 'anyOf': [{'type': 'string'}, {'type': 'array', 'items': {'anyOf': [{'type': 'string'}, {'type': 'object'}]}}]}, 'additional_kwargs': {'title': 'Additional Kwargs', 'type': 'object'}, 'type': {'title': 'Type', 'default': 'tool', 'enum': ['tool'], 'type': 'string'}, 'name': {'title': 'Name', 'type': 'string'}, 'tool_call_id': {'title': 'Tool Call Id', 'type': 'string'}}, 'required': ['content', 'tool_call_id']}, 'ChatPromptValueConcrete': {'title': 'ChatPromptValueConcrete', 'description': 'Chat prompt value which explicitly lists out the message types it accepts.\nFor use in external schemas.', 'type': 'object', 'properties': {'messages': {'title': 'Messages', 'type': 'array', 'items': {'anyOf': [{'$ref': '#/definitions/AIMessage'}, {'$ref': '#/definitions/HumanMessage'}, {'$ref': '#/definitions/ChatMessage'}, {'$ref': '#/definitions/SystemMessage'}, {'$ref': '#/definitions/FunctionMessage'}, {'$ref': '#/definitions/ToolMessage'}]}}, 'type': {'title': 'Type', 'default': 'ChatPromptValueConcrete', 'enum': ['ChatPromptValueConcrete'], 'type': 'string'}}, 'required': ['messages']}}}
get_name

get_name は引数で指定された name, 属性の name, クラス名を左から順に確認し、最初に得られたものを Runnable オブジェクトに付けられた名前として返します。

>>> chat = ChatOpenAI()
>>> 
>>> chat.get_name()
'ChatOpenAI'
>>> chat.get_name(suffix="test")
'ChatOpenAITest'
>>> chat.get_name(suffix="test", name="test")
'test_test'
>>> 
>>> chat_with_name = ChatOpenAI(name="ChatGPT")
>>> chat_with_name.name
'ChatGPT'
>>> chat_with_name.get_name(suffix="Local")
'ChatGPTLocal'
get_prompts

get_promptsRunnable オブジェクトに含まれるプロンプトを返します。具体的には get_graph() で得られるグラフから BasePromptTemplate オブジェクトを取得します。

>>> from langchain.prompts import  ChatPromptTemplate , HumanMessagePromptTemplate
>>> chat_prompts = ChatPromptTemplate.from_messages([HumanMessagePromptTemplate.from_template("{input}")])
>>> (chat_prompts | chat).get_prompts()
[ChatPromptTemplate(input_variables=['input'], messages=[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['input'], template='{input}'))])]
>>> (chat_prompts | chat | RunnableParallel({'input': RunnablePassthrough()}) | chat_prompts | chat).get_prompts()
[ChatPromptTemplate(input_variables=['input'], messages=[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['input'], template='{input}'))]), ChatPromptTemplate(input_variables=['input'], messages=[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['input'], template='{input}'))])]
map

map は入力のリストを出力のリストに変換する Runnable オブジェクトを作成するメソッドです。作成されたRunnable オブジェクトはまさに Python 標準メソッドの map にほぼ同じです。

>>> # function
>>> func = RunnableLambda(lambda x: x**2)
>>> func.map().invoke([1, 2, 3, 4])
[1, 4, 9, 16]
>>> # chat
>>> from langchain.schema import HumanMessage
>>> chat = ChatOpenAI()
>>> chain = RunnableLambda(lambda s: [HumanMessage(content=s)]) | chat
>>> chain.map().invoke(['good morning', 'good afternoon', 'good evening'])
[AIMessage(content='Good morning! How can I assist you today?'), AIMessage(content='Good afternoon! How can I assist you today?'), AIMessage(content='Good evening! How can I assist you today?')]

また、map を使えば prompt を複数使った多数決などもやりやすくなるかもしれません。

>>> prompts = [
...     ChatPromptTemplate.from_messages([('human', 'What is the squre of the {n}-th prime number?')]),
...     ChatPromptTemplate.from_messages([('human', 'Find the {n}-th prime number, then calculate the square of it.')]),
...     ChatPromptTemplate.from_messages([('human', 'echo {n}')]),
... ]
>>> chat = ChatOpenAI(model='gpt-4-turbo-preview')
>>> chain = RunnableParallel(n=RunnablePassthrough()) | dict(enumerate(prompts)) | RunnableLambda(dict.values) | chat.map()
>>> chain.invoke('5')
[AIMessage(content='The first five prime numbers are 2, 3, 5, 7, and 11. The 5th prime number is 11, so the square of the 5th prime number is \\(11^2 = 121\\).'), AIMessage(content='The first five prime numbers are 2, 3, 5, 7, and 11. The 5th prime number is 11. \n\nThe square of 11 is \\(11^2 = 121\\).'), AIMessage(content='```\n5\n```')]
pick

pick は自身の出力(dict型)から指定されたキーの値を取得するための Runnable オブジェクトを作成するメソッドです。

>>> func = RunnableLambda(lambda x: {'x': x, 'y': x*2, 'z': x*3})
>>> func.invoke(10)
{'x': 10, 'y': 20, 'z': 30}
>>> func.pick('x').invoke(10)
10
>>> func.pick(['x', 'z']).invoke(10)
{'x': 10, 'z': 30}

これはまさに

from langchain_core.runnables.passthrough import RunnablePick
(func | RunnablePick(['x', 'z'])).invoke(10)

と同じになります。

pipe

pipe は自身と他の複数の Runnable オブジェクトを連結しチェーンを作成するメソッドです。

>>> func1 = RunnableLambda(lambda x: 1)
>>> func2 = RunnableLambda(lambda x: x+10)
>>> func3 = RunnableLambda(lambda x: x+100)
>>> func4 = RunnableLambda(lambda x: x+1000)
>>> func1.pipe(func2, func3, func4)
RunnableLambda(...)
| RunnableLambda(...)
| RunnableLambda(...)
| RunnableLambda(...)
>>> func1.pipe(func2, func3, func4).invoke(1)
1111

これは

>>> (func1 | func2 | func3 | func4).invoke(1)
1111

と同じであり、| を複数まとめて繋げるできるものとして覚えればよいかと思います。

with_config

with_configRunnable オブジェクトにおいて config をバインドした RunnableBinding オブジェクトを返します。bind メソッドconfig バージョンです。

with_fallbacks

with_fallbacksRunnable オブジェクトがエラーを投げた場合のフォールバックをアタッチした RunnableWithFallbacks オブジェクトを返すメソッドです。

重要な点としては 6 にもあるように、LLM レベルだけではなく、Runnable オブジェクトレベルでフォールバックを追加できる点にあります。

また、exceptions_to_handle 引数を指定することでフォールバックでハンドルする例外も選択できます。

>>> fallback_chain = RunnableLambda(lambda x: f'Error caught for the input: {x}')
>>> invald_runnable = RunnableLambda(lambda x: x/0)
>>>
>>> invald_runnable.invoke(10)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  --- 中略 ---
ZeroDivisionError: division by zero
>>> invald_runnable.with_fallbacks([fallback_chain]).invoke(10)
'Error caught for the input: 10'
>>> invald_runnable.with_fallbacks([fallback_chain], exceptions_to_handle=[ZeroDivisionError]).invoke(10)
'Error caught for the input: 10'
>>> invald_runnable.with_fallbacks([fallback_chain], exceptions_to_handle=[ZeroDivisionError]).invoke(None)
Traceback (most recent call last):
  --- 中略 ---
TypeError: unsupported operand type(s) for /: 'NoneType' and 'int'

なお、fallbacks に指定するフォールバックには失敗した Runnable オブジェクトに与えられた入力が渡されます。

>>> invald_runnable.with_fallbacks([RunnablePassthrough()]).invoke('this is a original input')
'this is a original input'
with_listeners

with_liestenersRunnable オブジェクトにおいて、

  • on_start: Runnable オブジェクトの 実行開始前 に呼ばれる callback
  • on_end: Runnable オブジェクトの 実行完了後 に呼ばれる callback
  • on_error: Runnable オブジェクトの 実行エラー時 に呼ばれる callback

をバインドした RunnableBinding オブジェクトを返します。bind メソッドの listenres バージョンです。

with_retry

with_retryRunnable オブジェクトの実行失敗時にリトライする RunnableRetry オブジェクトのを返すメソッドです。

>>> class FuncSuccessAfter2:
...     def __init__(self):
...         self.count = 0
...     def __call__(self, *args, **kwargs):
...         if self.count < 2:
...             self.count += 1
...             raise Exception("Error")
...         return 'success'
...
>>> func = RunnableLambda(FuncSuccessAfter2())
>>> for i in range(5):
...     print(f'--- {i+1} times ---')
...     try:
...         print(func.invoke('hoge'))
...     except:
...         print('error')
...         pass
...
--- 1 times ---
error
--- 2 times ---
error
--- 3 times ---
success
--- 4 times ---
success
--- 5 times ---
success

この設定で with_retry を活用したのが下記のコードです。1つ目は3回試行して成功したケース、2つ目は2会しか試行していないため失敗しているケースです。なお、試行回数は stop_after_attempt のキーワード引数で指定することができます。

>>> func_with_retry = RunnableLambda(FuncSuccessAfter2()).with_retry(stop_after_attempt=3)
>>> func_with_retry.invoke('hi')
'success'
>>> func_with_retry = RunnableLambda(FuncSuccessAfter2()).with_retry(stop_after_attempt=2)
>>> func_with_retry.invoke('hi')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "D:\qiita\runnables-in-langchain\venv\lib\site-packages\langchain_core\runnables\retry.py", line 184, in invoke
    return self._call_with_config(self._invoke, input, config, **kwargs)
  File "D:\qiita\runnables-in-langchain\venv\lib\site-packages\langchain_core\runnables\base.py", line 1262, in _call_with_config
    context.run(
  File "D:\qiita\runnables-in-langchain\venv\lib\site-packages\langchain_core\runnables\config.py", line 326, in call_func_with_variable_args
    return func(input, **kwargs)  # type: ignore[call-arg]
  File "D:\qiita\runnables-in-langchain\venv\lib\site-packages\langchain_core\runnables\retry.py", line 170, in _invoke
    for attempt in self._sync_retrying(reraise=True):
  File "D:\qiita\runnables-in-langchain\venv\lib\site-packages\tenacity\__init__.py", line 347, in __iter__
    do = self.iter(retry_state=retry_state)
  File "D:\qiita\runnables-in-langchain\venv\lib\site-packages\tenacity\__init__.py", line 325, in iter
    raise retry_exc.reraise()
  File "D:\qiita\runnables-in-langchain\venv\lib\site-packages\tenacity\__init__.py", line 158, in reraise
    raise self.last_attempt.result()
  File "C:\Users\hmasu\.pyenv\pyenv-win\versions\3.10.11\lib\concurrent\futures\_base.py", line 451, in result
    return self.__get_result()
  File "C:\Users\hmasu\.pyenv\pyenv-win\versions\3.10.11\lib\concurrent\futures\_base.py", line 403, in __get_result
    raise self._exception
  File "D:\qiita\runnables-in-langchain\venv\lib\site-packages\langchain_core\runnables\retry.py", line 172, in _invoke
    result = super().invoke(
  File "D:\qiita\runnables-in-langchain\venv\lib\site-packages\langchain_core\runnables\base.py", line 4069, in invoke
    return self.bound.invoke(
  File "D:\qiita\runnables-in-langchain\venv\lib\site-packages\langchain_core\runnables\base.py", line 3523, in invoke
    return self._call_with_config(
  File "D:\qiita\runnables-in-langchain\venv\lib\site-packages\langchain_core\runnables\base.py", line 1262, in _call_with_config
    context.run(
  File "D:\qiita\runnables-in-langchain\venv\lib\site-packages\langchain_core\runnables\config.py", line 326, in call_func_with_variable_args
    return func(input, **kwargs)  # type: ignore[call-arg]
  File "D:\qiita\runnables-in-langchain\venv\lib\site-packages\langchain_core\runnables\base.py", line 3397, in _invoke
    output = call_func_with_variable_args(
  File "D:\qiita\runnables-in-langchain\venv\lib\site-packages\langchain_core\runnables\config.py", line 326, in call_func_with_variable_args
    return func(input, **kwargs)  # type: ignore[call-arg]
  File "<stdin>", line 7, in __call__
Exception: error
with_types

with_typesRunnable オブジェクトの入力型と出力型をバインドした RunnableBinding オブジェクトを返します。bind メソッドの InputType/OutputType バージョンです。

>>> from langchain.pydantic_v1 import BaseModel
>>>
>>> class DummyInput(BaseModel):
...     x: int
...     y: str
...
>>>
>>> class DummyOutput(BaseModel):
...     z: str
...
>>>
>>> func = RunnableLambda(lambda i: DummyOutput(z=i['y'] * i['x']))
>>> func.invoke(DummyInput(x=2, y="test").dict())
DummyOutput(z='testtest')
>>> func.get_input_schema().schema()
{'title': 'RunnableLambdaInput'}
>>>
>>> func.with_types(input_type=DummyInput, output_type=DummyOutput).invoke(DummyInput(x=2, y="test").dict())
DummyOutput(z='testtest')
>>> func.with_types(input_type=DummyInput, output_type=DummyOutput).get_input_schema().schema()
{'title': 'DummyInput', 'type': 'object', 'properties': {'x': {'title': 'X', 'type': 'integer'}, 'y': {'title': 'Y', 'type': 'string'}}, 'required': ['x', 'y']}

最後に

長くなってしまいましたが、ここまで読んでいただきありがとうございました。個人的な感想ですが assign map pick pipe with_retry with_fallbacks あたりが即活用できそうなメソッドかなという印象です。

ぜひ、本記事が langchain を勉強している方のお役に立てれば幸いです。

また、下記の記事では Runnable クラスを継承したクラスについて実行コードを以て役割を説明しておりますので合わせてご活用ください。

ご質問やご指摘等ありましたらぜひコメントいただけますと幸いです。
もっとわかりやすい例などがありましたら、ぜひご教示ください。

では、このあたりで。

  1. https://python.langchain.com/docs/get_started/introduction

  2. https://api.python.langchain.com/en/stable/runnables/langchain_core.runnables.base.Runnable.html

  3. https://python.langchain.com/docs/expression_language/interface 2

  4. https://qiita.com/hiromitsu_iwasaki/items/7d471ba05b0ecc582489

  5. https://python.langchain.com/docs/expression_language/how_to/binding 2

  6. https://python.langchain.com/docs/expression_language/how_to/fallbacks

37
34
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
37
34