導入
LlamaIndexにAIアプリ構築におけるオーケストレーションの仕組であるWorkflowsが導入されました。
私の非常にざっくりとした理解で言えば、LangChainにおけるLangGraph相当の機能だと思います。
ただし、グラフ構造でパイプラインを定義するのではなくイベント駆動型アーキテクチャを採用しています。
公式DocにおけるWorkflowsの冒頭部分を邦訳すると以下の通り。
LlamaIndexのワークフローは、いくつかのイベントを連鎖させるために使用されるイベント駆動の抽象化です。ワークフローはステップで構成されており、各ステップは特定のイベントタイプを処理し、新しいイベントを発行する役割を担います。
LlamaIndexのワークフローは、関数に@step()デコレータを付けることで機能します。これにより、各ワークフローの入力および出力タイプが検証のために推測され、受け入れられたイベントが準備できたときにのみ各ステップが実行されることが保証されます。
ワークフローを作成して、エージェント、RAGフロー、抽出フロー、その他何でも作成できます。
ワークフローは自動的に計測されるため、Arize Pheonixのようなツールを使用して各ステップの可観測性を得ることができます。(注:可観測性は新しい計測システムを利用する統合に対して機能します。使用法は異なる場合があります。)
よくわからない。。。
習うより慣れろ、という思想で以下のCookbookをDatabricks上でウォークスルーしてみました。
※ 一部、Databricksで動作するように変更しています。
また、Workflowsを理解する当たってこちらの記事が非常に参考になりました。
合わせてどうぞ。
検証環境はDatabricks on AWS、ノートブックのクラスタは"サーバレス"です。
各ステップの引用はCookbookの説明内容を邦訳する形で記載しています。
Step1. セットアップ
ノートブックを作成し、関連パッケージをインストール。
!pip install -U llama-index-core llama-index-utils-workflow llama-index-llms-databricks llama-index-embeddings-databricks
dbutils.library.restartPython()
モジュールのインポートと、環境変数にDatabricksモデルサービングへアクセスするためのAPIキーやエンドポイントURLを設定します。
下記記載のURLはダミーです。実際のURLはDatabricksのサービング画面から確認してください。
from llama_index.core.workflow import (
Event,
StartEvent,
StopEvent,
Workflow,
step,
Context,
)
import random
from llama_index.utils.workflow import draw_most_recent_execution, draw_all_possible_flows
from llama_index.llms.databricks import Databricks
import os
API_TOKEN = (
dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
)
os.environ["DATABRICKS_TOKEN"] = API_TOKEN
os.environ["DATABRICKS_SERVING_ENDPOINT"] = "https://xxx.com/serving-endpoints/"
LlamaIndex上で利用するLLMを準備。
以下の記事で作成したDatabricks Mosaic AI Model Servingエンドポイントを利用します。
llm = Databricks(model="EZO-Common-T2-2B-gemma-2-it-sglang-endpoint")
これで準備が整いました。
Step2. 基本的なワークフロー
これより、Cookbookの内容を実行していきます。まずは基本的なワークフロー。
基本的なワークフローから始めましょう: 単に開始し、1つのことを行い、停止します。タスクがこれ> ほど単純であれば、実際のワークフローを持つ理由はありませんが、ここではその動作を示しています。
class DatabricksGenerator(Workflow):
@step()
async def generate(self, ev: StartEvent) -> StopEvent:
response = await llm.acomplete(ev.query)
return StopEvent(result=str(response))
w = DatabricksGenerator(timeout=800, verbose=False)
result = await w.run(query="What's LlamaIndex?")
print(result)
LlamaIndex is an open-source platform designed to simplify and accelerate the development and deployment of large language model (LLM) applications. It provides a comprehensive suite of tools and services for working with LLMs, particularly focusing on **data management, indexing
@step
で修飾したメソッドが、ワークフローにおける処理単位になります。
(LangGraphでいうノード相当)
入力として標準で用意されているStartEvent
を取り、出力としてこれも標準で用意されているStopEvent
を設定しています。
これらがそれぞれ始端と終端のイベントを表しており、Workflowsではこのようにイベントを入力/出力として定義します。
Workflow全体を実行する(w.run
)ことでStartEventが発行され、そのイベントをハンドルするgenerate
メソッドが実行し、結果としてStopEventが発行されて終端に至る、という流れとなります。
LangGraphと異なり、Workflowsではエッジに相当するものの手動定義が不要になります。
かわりに、入力と出力のイベント情報から動的にパイプラインの繋がりが定義されます。
これらのつながりはビジュアル表現をさせることもできます。
ワークフローの素晴らしい点の一つは、pyvisを使用して視覚化できることです。この非常にシンプルなフローでそれがどのように見えるか見てみましょう。
draw_all_possible_flows(DatabricksGenerator, filename="trivial_workflow.html")
with open("trivial_workflow.html", "r") as file:
html_content = file.read()
displayHTML(html_content)
StartEventから始まって終端まで至る流れが可視化されました。
これだけだとあまりWorkflowの意味はありませんが、基本的な概念としてはこのようなイベント連鎖でパイプラインが実行される流れとなります。
Step3. ループと分岐
Workflowsでループや処理の分岐を実行させてみます。
class FailedEvent(Event):
error: str
class QueryEvent(Event):
query: str
class LoopExampleFlow(Workflow):
@step()
async def answer_query(
self, ev: StartEvent | QueryEvent
) -> FailedEvent | StopEvent:
query = ev.query
# try to answer the query
random_number = random.randint(0, 1)
if random_number == 0:
return FailedEvent(error="Failed to answer the query.")
else:
return StopEvent(result="The answer to your query")
@step()
async def improve_query(self, ev: FailedEvent) -> QueryEvent | StopEvent:
# improve the query or decide it can't be fixed
random_number = random.randint(0, 1)
if random_number == 0:
return QueryEvent(query="Here's a better query.")
else:
return StopEvent(result="Your query can't be fixed.")
ここでは、ランダムな数値を使用してLLMの動作をシミュレートし、信頼性のある興味深い動作を得るようにしています。
answer_query()は開始イベントを受け取ります。その後、2つのことができます:
- クエリに回答し、結果を返すStopEventを発行する
- クエリが不適切であると判断し、FailedEventを発行する
improve_query()はFailedEventを受け取ります。これも2つのことができます:
- クエリが改善できないと判断し、失敗を返すStopEventを発行する
- より良いクエリを提示し、QueryEventを発行してanswer_query()にループバックする
このより複雑なワークフローを視覚化することもできます:
draw_all_possible_flows(LoopExampleFlow, filename="loop_workflow.html")
with open("loop_workflow.html", "r") as file:
html_content = file.read()
displayHTML(html_content)
では、このワークフローを実行します。
l = LoopExampleFlow(timeout=10, verbose=True)
result = await l.run(query="What's LlamaIndex?")
print(result)
Running step answer_query
Step answer_query produced event StopEvent
The answer to your query
乱数の生成結果によって出力結果はかわりますが、乱数によって異なるイベントが発火されて一定の結果が得られるまでは処理がループする構造となります。
Step4. イベント間の状態を維持
すべてのイベントハンドラーで使用するために任意のデータや関数を保持するグローバル状態があります。
class GlobalExampleFlow(Workflow):
@step(pass_context=True)
async def setup(self, ctx: Context, ev: StartEvent) -> QueryEvent:
# load our data here
ctx.data["some_database"] = ["value1", "value2", "value3"]
return QueryEvent(query=ev.query)
@step(pass_context=True)
async def query(self, ctx: Context, ev: QueryEvent) -> StopEvent:
# use our data with our query
data = ctx.data["some_database"]
result = f"The answer to your query is {data[1]}"
return StopEvent(result=result)
g = GlobalExampleFlow(timeout=10, verbose=True)
result = await g.run(query="What's LlamaIndex?")
print(result)
Running step setup
Step setup produced event QueryEvent
Running step query
Step query produced event StopEvent
The answer to your query is value2
このように、stepへpass_context=True
を指定することで、ワークフロー全体で共通となるコンテキストオブジェクトを取得し、任意データの保持や取得ができます。
この章はまだ続きがあるので、実行していきましょう。
もちろん、このフローは本質的にまだ線形です。より現実的な例として、開始イベントがクエリまたはデータのポピュレーションイベントのいずれかであり、待機する必要がある場合を考えてみましょう。それがどのように見えるかを設定してみましょう:
class WaitExampleFlow(Workflow):
@step(pass_context=True)
async def setup(self, ctx: Context, ev: StartEvent) -> StopEvent:
if hasattr(ev, "data"):
ctx.data["some_database"] = ev.data
return StopEvent(result=None)
@step(pass_context=True)
async def query(self, ctx: Context, ev: StartEvent) -> StopEvent:
if hasattr(ev, "query"):
# do we have any data?
if "some_database" in ctx.data:
data = ctx.data["some_database"]
return StopEvent(result=f"Got the data {data}")
else:
# there's non data yet
return None
else:
# this isn't a query
return None
コンテキストの中にデータが含まれる場合のみ、結果が得られるワークフローです。
データの設定はsetupメソッド内で行われています。
ただし、setupメソッド・queryメソッド両方ともStartEventを入力として指定しています。
この場合どうなるのでしょうか、実行してみましょう。
w = WaitExampleFlow(verbose=True)
result = await w.run(query="Can I kick it?")
if result is None:
print("No you can't")
print("---")
result = await w.run(data="Yes you can")
print("---")
result = await w.run(query="Can I kick it?")
print(result)
Running step query
Step query produced no event
Running step setup
Step setup produced event StopEvent
No you can't
---
Running step query
Step query produced no event
Running step setup
Step setup produced event StopEvent
---
Running step query
Step query produced event StopEvent
Running step setup
Step setup produced event StopEvent
Got the data Yes you can
分かりづらいですが、1回目の実行では結果を得られませんでした。
これはコンテキストの中にデータが含まれていないためです。
その上で2回目の実行でデータを設定し、3回目に正しく結果をえることができています。
では、最後に流れを可視化してみます。
draw_all_possible_flows(WaitExampleFlow, filename="wait_workflow.html")
with open("wait_workflow.html", "r") as file:
html_content = file.read()
displayHTML(html_content)
Step5. 1つ以上のイベントを待つ
イベントを待つことは非常に一般的なパターンであるため、コンテキストオブジェクトには便利な関数 collect_events()があります。これはイベントをキャプチャして保存し、必要なすべてのイベントが収集されるまでNoneを返します。これらのイベントは指定された順序でcollect_eventsの出力に添付されます。実際に見てみましょう:
class InputEvent(Event):
input: str
class SetupEvent(Event):
error: bool
class QueryEvent(Event):
query: str
class CollectExampleFlow(Workflow):
@step(pass_context=True)
async def setup(self, ctx: Context, ev: StartEvent) -> SetupEvent:
# generically start everything up
if "setup" not in ctx.data:
ctx.data["setup"] = True
print("I got set up")
return SetupEvent(error=False)
@step()
async def collect_input(self, ev: StartEvent) -> InputEvent:
if hasattr(ev, "input"):
# perhaps validate the input
print("I got some input")
return InputEvent(input=ev.input)
@step()
async def parse_query(self, ev: StartEvent) -> QueryEvent:
if hasattr(ev, "query"):
# parse the query in some way
print("I got a query")
return QueryEvent(query=ev.query)
@step(pass_context=True)
async def run_query(
self, ctx: Context, ev: InputEvent | SetupEvent | QueryEvent
) -> StopEvent:
ready = ctx.collect_events(ev, [QueryEvent, InputEvent, SetupEvent])
if ready is None:
print("Not enough events yet")
return StopEvent()
# run the query
print("Now I have all the events")
print(ready)
return StopEvent(
result=f"Ran query {ready[0].query} on input {ready[1].input}"
)
c = CollectExampleFlow()
result = await c.run()
print("---")
result = await c.run(input="Here's some input")
print("---")
result = await c.run(query="Here's my question")
print(result)
I got set up
Not enough events yet
---
I got some input
Not enough events yet
---
I got a query
Now I have all the events
[QueryEvent(query="Here's my question"), InputEvent(input="Here's some input"), SetupEvent(error=False)]
Ran query Here's my question on input Here's some input
ポイントはrun_query
メソッドの中でコンテキストのcollect_eventsを実行しているところですね。
この処理は指定したイベントの収集を行い、必要なイベントが揃わなかったらNoneを返すようです。
これによって、必要なイベントが全て発行されるまで処理の実行を待つような動作となります。
各イベントがトリガーされる様子や、十分なイベントが到着するまでコレクションイベントが繰り返しNoneを返す様子がわかります。これをフローダイアグラムで見てみましょう:
draw_all_possible_flows(CollectExampleFlow, "collect_workflow.html")
with open("collect_workflow.html", "r") as file:
html_content = file.read()
displayHTML(html_content)
同じ種類の複数のイベントを収集することもできます。例えば、障害を監視するためです:
class FailureEvent(Event):
error: str
class FailureExampleFlow(Workflow):
@step()
async def try_to_do_thing(
self, ev: StartEvent
) -> StopEvent | FailureEvent:
random_number = random.randint(0, 10)
if random_number < 5:
print("There was a failure")
return FailureEvent(
error=f"This is the bad place: {random_number}"
)
else:
print("There was success")
return StopEvent(result="Success")
@step(pass_context=True)
async def monitor_failures(
self, ctx: Context, ev: FailureEvent
) -> StopEvent | None:
failed = ctx.collect_events(
ev, [FailureEvent, FailureEvent, FailureEvent]
)
if failed is not None:
print(failed)
print("The simulation has failed!")
return StopEvent(result="Failure")
return StopEvent()
f = FailureExampleFlow(verbose=False)
result = await f.run()
print("====", result)
result = await f.run()
print("====", result)
result = await f.run()
print("====", result)
result = await f.run()
print("====", result)
result = await f.run()
print("====", result)
result = await f.run()
There was a failure
==== None
There was a failure
==== None
There was a failure
[FailureEvent(error='This is the bad place: 4'), FailureEvent(error='This is the bad place: 2'), FailureEvent(error='This is the bad place: 2')]
The simulation has failed!
==== Failure
There was success
==== Success
There was success
==== Success
There was success
これも乱数の結果によりますが、FailureEventが3回起こるとThe simulation has failed!
メッセージが表示されます。
ご覧の通り、3回目の失敗イベントまでシステム障害は発生しませんでした。ここでの可視化も再び非常にシンプルです。
Step6. ワークフローのステップ実行
ワークフローをステップごとに実行することもできます:
というわけで、ワークフロー全体の実行ではなく、1ステップごとの実行も可能です。
# Kick off the workflow
f3 = FailureExampleFlow(verbose=True)
await f3.run_step()
Running step try_to_do_thing
There was success
Step try_to_do_thing produced event StopEvent
最初のステップであるtry_to_do_thing
のみが実行されました。
再度run_step()
を実行してみます。
await f3.run_step()
'Success'
StopEventが発行されて処理が終端に至りました。
フローが実行中であっても、すべての可能なフローではなく、最新の実行のみを表示するために視覚化を使用することもできます。
draw_most_recent_execution(f3, "last_execution.html")
with open("last_execution.html", "r") as file:
html_content = file.read()
displayHTML(html_content)
これで現時点のCookbookに記載されている内容は以上となります。
Cookbookは今後も更新されるでしょうし、内容は将来的に変更になるかもしれません。
また、別の公式Doc上で並列実行についても紹介があります。
まだベータ版だと思うのですが、機能はかなり充実しているようですね。
まとめ
LlamaIndexのWorkflowsについて、Cookbookをウォークスルーしてみました。
LangGraphの競合となる仕組だと理解していますが、イベント駆動型でパイプラインを定義するなど独自性も高いですね。
LangGraphとWorkflows、どっちがいいかというのは判断に迷うところ。。。
Workflowsのほうがパイプライン処理定義の見通しは良くなりそうなのですが、処理が増えていくとイベント定義も増えていくのでそのあたりの管理面がまだイメージしきれておらず。
とはいえ、(生成)AIの複合システムを作る際のオーケストレーション機構として面白い仕組だなと思います。LangGraph含めてAIアプリ作るときは重要な要素だと思うので、ちゃんとキャッチアップしていきたいですね。