3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Semantic Kernel Process Framework へ入門

Last updated at Posted at 2025-03-20

Semantic Kernel Process Framework とは?

Semantic Kernel Process Framework (以降、Process Framework) は、AI をビジネスプロセスに統合するために設計されたフレームワークです。

AI エージェントや RAG(Retrieval-Augmented Generation)などの AI コンポーネントを「ステップ」として「プロセス」に組み込み、その流れを柔軟に設計・制御できるのが大きな特徴です。

さらに、各ステップには Python などのコードによる決定論的なロジックを組み込むことも可能で、AI と従来のプログラムロジックをシームレスに融合できます。

本記事では、ワークフロー設計で頻出するシーケンス、条件分岐、ループ処理などの基本パターンについて、サンプルコードを交えながらわかりやすく解説します。

これらのパターンを組み合わせることで、複雑かつ高度な AI ワークフローも構築できるようになり、ビジネスプロセスの自動化・最適化がさらに加速します

image.png

注意
2025 年 3 月現在、Process Framework は 実験段階であり、プレビュー提供および一般提供(GA)に移行するまでに内容が変更される可能性があります。

コアコンセプト

Process Framework の中心となるのが以下の 3 つの要素です。

  • Process(プロセス)
  • Step(ステップ)
  • Patterns(パターン)

image.png

Process
Process は、ワークフロー全体を管理・制御する「箱」のようなものです。
複数のステップ(Step)を組み合わせて、「どの順番で」「どのデータを」「どう処理するか」を定義します。

Step
Step は、プロセス内で「1つの作業単位」となる部分です。
Kernel Function(AIやAPI、コード処理)を実行する実体がこの Step になります。

Step は、作業が終わるとイベントを発行し、次のステップに結果を渡します。

Patterns
Patterns は、よく使われる処理フローを標準化したものです。これにより、再利用性が高く、保守性の良いプロセス設計が可能となります。

パターン集(サンプルコード)

概要

ワークフローでよく使われるパターンとして、以下の構成を実装します。今回は Process Framework の基本を押さえる目的のため、コードは最低限の実装になっています。

  • シーケンス
  • ファンアウト / ファンイン
  • 条件分岐
  • 繰り返し(ループ)
  • 動的並列関数実行

依存関係のインストール

以降、Jypyter Notebook で実装することを想定します。
まずは依存関係のインストールを行います。

!pip install semantic-kernel==1.23.1 \
    pydantic==2.10.6 \
    python-dotenv==1.0.1 \
    mermaid-python==0.1 \
    azure-ai-projects==1.0.0b7 \
    azure-identity==1.21.0 \
    openai==1.66.3 \
    DateTime==5.5

シーケンス

はじめは最もシンプルなシーケンスを実装します。

image.png

ソースコード
import asyncio
from enum import Enum
from semantic_kernel.functions import kernel_function
from semantic_kernel.kernel import Kernel
from semantic_kernel.processes import ProcessBuilder
from semantic_kernel.processes.local_runtime.local_kernel_process import start
from semantic_kernel.processes.local_runtime.local_event import KernelProcessEvent
from semantic_kernel.processes.kernel_process import KernelProcessStep, KernelProcessStepContext

class Events(Enum):
    Start = "Start"
    StartA = "StartA"
    StartB = "StartB"
    StepBCompleted = "StepBCompleted"

class StartStep(KernelProcessStep):
    @kernel_function()
    async def start(self, context: KernelProcessStepContext, user_input: str):
        print("--- START ---")
        await asyncio.sleep(1)
        print(f"User Input: {user_input}")
        await context.emit_event(process_event=Events.StartA, data=user_input)

class AStep(KernelProcessStep):
    @kernel_function()
    async def run_a(self, context: KernelProcessStepContext, data: str):
        print("--- STEP A start ---")
        print(f"Input: {data}")
        await asyncio.sleep(2)
        modified_data_a = f"{data} -> A"
        print(f"Output: {modified_data_a}")
        await context.emit_event(process_event=Events.StartB, data=modified_data_a)

class BStep(KernelProcessStep):
    @kernel_function()
    async def run_b(self, context: KernelProcessStepContext, data: str):
        print("--- STEP B start ---")
        print(f"Input: {data}")
        await asyncio.sleep(3)
        processed_data = f"{data} -> B"
        print(f"Output-B: {processed_data}")
        print("--- END ---")
        await context.emit_event(process_event=Events.StepBCompleted)

async def main(user_input: str):
    # Initialize the process builder
    builder = ProcessBuilder(name="SimpleProcess")

    # Add the steps to the process builder
    start_step = builder.add_step(step_type=StartStep)
    step_a = builder.add_step(step_type=AStep)
    step_b = builder.add_step(step_type=BStep)

    # Define the events and where to send them
    builder.on_input_event(Events.Start).send_event_to(target=start_step, function_name="start", parameter_name="user_input")
    start_step.on_event(Events.StartA).send_event_to(target=step_a, function_name="run_a", parameter_name="data")
    step_a.on_event(Events.StartB).send_event_to(target=step_b, function_name="run_b", parameter_name="data")
    step_b.on_event(Events.StepBCompleted).stop_process()

    # Build the process
    kernel_process = builder.build()

    # Start the process
    await start(
        process=kernel_process,
        kernel=Kernel(),
        initial_event=KernelProcessEvent(id=Events.Start, data=user_input),
    )


await main(user_input="Hello!")
出力
Output
--- START ---
User Input: Hello!
--- STEP A start ---
Input: Hello!
Output: Hello! -> A
--- STEP B start ---
Input: Hello! -> A
Output-B: Hello! -> A -> B
--- END ---

Process Framework の基礎を解説

プロセスビルドの流れを解説
シーケンスの処理を例に、Process Framework のビルドの流れを説明します。

  1. ProcessBuilder を初期化
  2. ステップを追加
  3. ステップごとのイベントを定義
  4. ビルドして KernelProcess を構築

生成した KernelProcesslocal_kernel_processstart メソッドに渡すことで、ワークフローを実行できます。

sk_pw.gif

ステップとカーネル関数について
各ステップでは、1 つ以上のカーネル関数(kernel_function)を実装する必要があります。
今回の例では、各ステップをクラスとして定義し、その中で @kernel_function デコレーターを付けた関数が自動で実行されます。

ステップの中で、emit_event メソッドを使って次に発火させたいイベントを明示的に指定します。
この設計により、ワークフローの流れを関数内で制御できるのが Process Framework の特徴です。

### 抜粋 ###
class AStep(KernelProcessStep):
    @kernel_function()
    async def run_a(self, context: KernelProcessStepContext, data: str):
        ### 省略 ###
        await context.emit_event(process_event=Events.StartB, data=modified_data_a)

ファンアウト / ファンイン

ステップを並列処理させるファンアウトと、各ステップの出力を集約するファンインについて紹介します。

image.png

ソースコード
import asyncio
from enum import Enum
from semantic_kernel.functions import kernel_function
from semantic_kernel.kernel import Kernel
from semantic_kernel.processes import ProcessBuilder
from semantic_kernel.processes.local_runtime.local_kernel_process import start
from semantic_kernel.processes.local_runtime.local_event import KernelProcessEvent
from semantic_kernel.processes.kernel_process import KernelProcessStep, KernelProcessStepContext

class Events(Enum):
    Start = "Start"
    ToStepA = "ToStepA"
    ToStepB = "ToStepB"
    ToStepC = "ToStepC"
    Completed = "Completed"

class StartStep(KernelProcessStep):
    @kernel_function()
    async def start(self, context: KernelProcessStepContext, input: str):
        print("--- START ---")
        await asyncio.sleep(1)
        print(f"Input: {input}")
        await context.emit_event(Events.ToStepA, data=input)
        await context.emit_event(Events.ToStepB, data=input)

class AStep(KernelProcessStep):
    @kernel_function()
    async def run_a(self, context: KernelProcessStepContext, data: str):
        print("--- STEP A start ---")
        await asyncio.sleep(2)
        output_a = f"{data} + A"
        print(f"A Output: {output_a}")
        # await context.emit_event(Events.ToStepC, data=output_a)
        await context.emit_event(Events.ToStepC, data={"source": "run_a", "content": output_a})


class BStep(KernelProcessStep):
    @kernel_function()
    async def run_b(self, context: KernelProcessStepContext, data: str):
        print("--- STEP B start ---")
        await asyncio.sleep(3)
        output_b = f"{data} + B"
        print(f"B Output: {output_b}")
        # await context.emit_event(Events.ToStepC, data=output_b)
        await context.emit_event(Events.ToStepC, data={"source": "run_b", "content": output_b})


class CStep(KernelProcessStep):
    result_a: str = ""
    result_b: str = ""

    @kernel_function()
    async def run_c(self, context: KernelProcessStepContext, data: dict):
        source = data["source"]
        content = data["content"]
        
        if source == "run_a":
            self.result_a = content
        elif source == "run_b":
            self.result_b = content
        
        if self.result_a and self.result_b:
            print("--- STEP C start ---")
            await asyncio.sleep(3)
            final_output = f"C Output: [{self.result_a}] + [{self.result_b}]"
            print(final_output)
            print("--- END ---")
            await context.emit_event(Events.Completed)


async def main(input: str):
    # Initialize the process builder
    builder = ProcessBuilder("BranchingProcess")

    # Define steps
    start_step = builder.add_step(step_type=StartStep)
    step_a = builder.add_step(step_type=AStep)
    step_b = builder.add_step(step_type=BStep)
    step_c = builder.add_step(step_type=CStep)

    # Define step events
    builder.on_input_event(Events.Start).send_event_to(start_step, parameter_name="input")
    start_step.on_event(Events.ToStepA).send_event_to(step_a, parameter_name="data")
    start_step.on_event(Events.ToStepB).send_event_to(step_b, parameter_name="data")
    step_a.on_event(Events.ToStepC).send_event_to(step_c, parameter_name="data")
    step_b.on_event(Events.ToStepC).send_event_to(step_c, parameter_name="data")
    step_c.on_event(Events.Completed).stop_process()

    # Build the process
    kernel_process = builder.build()

    # Start the process
    await start(
        process=kernel_process,
        kernel=Kernel(),
        initial_event=KernelProcessEvent(id=Events.Start, data=input),
    )

await main(input="Hello!")
出力
Output
--- START ---
Input: Hello!
--- STEP A start ---
--- STEP B start ---
A Output: Hello! + A
B Output: Hello! + B
--- STEP C start ---
C Output: [Hello! + A] + [Hello! + B]
--- END ---

ファンアウト / ファンアウトの解説

この例では、StartStep から StepAStepB にイベントを送り、2つの処理を並列実行することで ファンアウト を実現しています。

その後、それぞれの処理が完了すると CStep が呼び出され、StepAStepB の結果が CStep.run_c() に順次渡されます。
ここで重要なのが、CStep 内では self.result_aself.result_b というインスタンス変数を使って、StepAStepB の結果を ステップ(インスタンス)内に保持している点です。

この設計により、CStep は呼び出されるたびに片方の結果を受け取りつつ、両方の出力が揃ったタイミングで次の処理(イベント)を発火させることができます。

結果として、「すべての並列処理が完了した後に集約処理を行う」 というファンインの基本パターンが実現しています。

class CStep(KernelProcessStep):
    result_a: str = ""
    result_b: str = ""

    @kernel_function()
    async def run_c(self, context: KernelProcessStepContext, data: dict):
        source = data["source"]
        content = data["content"]
        
        if source == "run_a":
            self.result_a = content
        elif source == "run_b":
            self.result_b = content
        
        if self.result_a and self.result_b:
            print("--- STEP C start ---")
            await asyncio.sleep(3)
            final_output = f"C Output: [{self.result_a}] + [{self.result_b}]"
            print(final_output)
            print("--- END ---")
            await context.emit_event(Events.Completed)

条件分岐

条件分岐では、入力を判断して後続の処理フローを分岐させる手法を紹介します。

image.png

ソースコード
import asyncio
from enum import Enum
from semantic_kernel.functions import kernel_function
from semantic_kernel.kernel import Kernel
from semantic_kernel.processes import ProcessBuilder
from semantic_kernel.processes.local_runtime.local_kernel_process import start
from semantic_kernel.processes.local_runtime.local_event import KernelProcessEvent
from semantic_kernel.processes.kernel_process import KernelProcessStep, KernelProcessStepContext
from semantic_kernel.kernel_pydantic import KernelBaseModel
from pydantic import Field

class Events(Enum):
    Start = "Start"
    ToStepA = "ToStepA"
    ToStepB = "ToStepB"
    ToStepC = "ToStepC"
    Completed = "Completed"

class StartStep(KernelProcessStep):
    @kernel_function()
    async def start(self, context: KernelProcessStepContext, input_choice: str):
        print("--- START ---")
        print(f"User Input: {input_choice}")
        branch_map = {
            "A": Events.ToStepA,
            "B": Events.ToStepB
        }
        await context.emit_event(branch_map.get(input_choice, Events.Completed))

class AStep(KernelProcessStep):
    @kernel_function()
    async def run_a(self, context: KernelProcessStepContext):
        print(f"--- STEP A ---")
        await asyncio.sleep(2)
        print("STEP A completed")
        await context.emit_event(Events.ToStepC)

class BStep(KernelProcessStep):
    @kernel_function()
    async def run_b(self, context: KernelProcessStepContext):
        print(f"--- STEP B ---")
        await asyncio.sleep(2)
        print("STEP B completed. Process terminating.")
        await context.emit_event(Events.Completed)

class CStep(KernelProcessStep):
    @kernel_function()
    async def run_c(self, context: KernelProcessStepContext):
        print(f"--- STEP C ---")
        await asyncio.sleep(2)
        print("STEP C completed. Process terminating.")
        await context.emit_event(Events.Completed)

async def main(input_choice: str):
    # Initialize the process builder
    builder = ProcessBuilder("ConditionalBranchingProcess")

    # Define steps
    start_step = builder.add_step(step_type=StartStep)
    step_a = builder.add_step(step_type=AStep)
    step_b = builder.add_step(step_type=BStep)
    step_c = builder.add_step(step_type=CStep)

    # Define step events
    builder.on_input_event(Events.Start).send_event_to(start_step, parameter_name="input_choice")
    start_step.on_event(Events.ToStepA).send_event_to(step_a)
    start_step.on_event(Events.ToStepB).send_event_to(step_b)
    step_a.on_event(Events.ToStepC).send_event_to(step_c)
    step_b.on_event(Events.Completed).stop_process()
    step_c.on_event(Events.Completed).stop_process()

    # Build the process
    kernel_process = builder.build()

    # Start the process
    await start(
        process=kernel_process,
        kernel=Kernel(),
        initial_event=KernelProcessEvent(id=Events.Start, data=input_choice),
    )

await main(input_choice="A")

出力 入力が `A` の場合。 ```bash:Output --- START --- User Input: A --- STEP A --- STEP A completed --- STEP C --- STEP C completed. Process terminating. ```

入力が B の場合。

Output
--- START ---
User Input: B
--- STEP B ---
STEP B completed. Process terminating.

条件分岐の解説

この例では、分岐のロジックを StartStep 内で以下のように実装しています。

### 抜粋 ###
branch_map = {
    "A": Events.ToStepA,
    "B": Events.ToStepB
}
await context.emit_event(branch_map.get(input_choice, Events.Completed))

ここで、ユーザー入力 (input_choice) が "A" または "B" の場合、対応するイベントを発火させます。
どちらにも当てはまらない場合(想定外の入力)は Events.Completed を発火し、そのままプロセス終了させます。

繰り返し

繰り返し処理では、指定した回数だけ処理を行うような場合に利用できます。
image.png

ソースコード
import asyncio
from enum import Enum
from semantic_kernel.functions import kernel_function
from semantic_kernel.kernel import Kernel
from semantic_kernel.processes import ProcessBuilder
from semantic_kernel.processes.local_runtime.local_kernel_process import start
from semantic_kernel.processes.local_runtime.local_event import KernelProcessEvent
from semantic_kernel.processes.kernel_process import KernelProcessStep, KernelProcessStepContext
from semantic_kernel.kernel_pydantic import KernelBaseModel
from pydantic import Field

class CounterState(KernelBaseModel):
    count: int = 1
    max_count: int = 3 

class Events(Enum):
    Start = "Start"
    ToStepA = "ToStepA"
    ToStepB = "ToStepB"
    LoopToStepA = "LoopToStepA"
    Completed = "Completed"

class StartStep(KernelProcessStep[CounterState]):
    state: CounterState = Field(default_factory=CounterState)

    @kernel_function()
    async def start(self, context: KernelProcessStepContext):
        print("--- START ---")
        print(f"Initial Count: {self.state.count} / Max Count: {self.state.max_count}")
        await context.emit_event(Events.ToStepA, data=self.state.model_dump())

class AStep(KernelProcessStep[CounterState]):
    @kernel_function()
    async def run_a(self, context: KernelProcessStepContext, data: dict):
        print(f"--- STEP A (Count: {data['count']}) ---")
        await asyncio.sleep(1)
        await context.emit_event(Events.ToStepB, data=data)

class BStep(KernelProcessStep[CounterState]):
    @kernel_function()
    async def run_b(self, context: KernelProcessStepContext, data: dict):
        count = data["count"]
        max_count = data["max_count"]
        print(f"--- STEP B (Count: {count} / Max: {max_count}) ---")
        await asyncio.sleep(1)

        if count >= max_count:
            print(f"Reached Max Count {max_count}. Ending Process.")
            print("--- End ---")
            await context.emit_event(Events.Completed)
        else:
            # count-up and loop
            next_data = {**data, "count": count + 1}
            print(f"Looping back to STEP A with count {next_data['count']}")
            await context.emit_event(Events.LoopToStepA, data=next_data)

async def main():
    # Initialize the process builder
    builder = ProcessBuilder("EnhancedLoopProcess")

    # Define steps
    start_step = builder.add_step(step_type=StartStep)
    step_a = builder.add_step(step_type=AStep)
    step_b = builder.add_step(step_type=BStep)

    # Define step events
    builder.on_input_event(Events.Start).send_event_to(start_step)
    start_step.on_event(Events.ToStepA).send_event_to(step_a, parameter_name="data")
    step_a.on_event(Events.ToStepB).send_event_to(step_b, parameter_name="data")
    step_b.on_event(Events.LoopToStepA).send_event_to(step_a, parameter_name="data")
    step_b.on_event(Events.Completed).stop_process()

    # Build the process
    kernel_process = builder.build()

    # Start the process
    await start(
        process=kernel_process,
        kernel=Kernel(),
        initial_event=KernelProcessEvent(id=Events.Start),
    )

await main()
出力
Output
--- START ---
Initial Count: 1 / Max Count: 3
--- STEP A (Count: 1) ---
--- STEP B (Count: 1 / Max: 3) ---
Looping back to STEP A with count 2
--- STEP A (Count: 2) ---
--- STEP B (Count: 2 / Max: 3) ---
Looping back to STEP A with count 3
--- STEP A (Count: 3) ---
--- STEP B (Count: 3 / Max: 3) ---
Reached Max Count 3. Ending Process.

繰り返し処理の解説

今回の例では、BStep でループ判定を行っています。

### 抜粋 ###
if count >= max_count:
    print(f"Reached Max Count {max_count}. Ending Process.")
    await context.emit_event(Events.Completed)
else:
    next_data = {**data, "count": count + 1}
    await context.emit_event(Events.LoopToStepA, data=next_data)
  • count >= max_count → 終了イベント Events.Completed を発火
  • そうでなければ count を +1 して LoopToStepA を発火(ループ継続)

この部分がループ制御の中核です。

動的並列関数実行

動的並列関数実行とは、シーケンス内の「動的関数実行」ステップで、前ステップの出力から得た数値に応じて関数を並列実行する仕組みです。
たとえば、LLM の出力数が可変の場合など、後続の処理を動的に調整したい場面で有効です。

image.png

以下のような疑問がでてくると思います。

Fan-out / Fan-in を動的に切り替えられるのか?

この点については、ビルド時に KernelProcess として構成が固定されるため、実行中に動的なステップの分岐数変更を伴う Fan-out / Fan-in をプロセスレベルで実現するのは困難となります。

そこで今回は、動的な並列実行を 1 ステップ内で非同期タスクとして管理することで、柔軟なスケールアウトを実現する手法を紹介します。

ソースコード
import asyncio
import random
from enum import Enum
from typing import List, Union
from semantic_kernel.functions import kernel_function
from semantic_kernel.kernel import Kernel
from semantic_kernel.processes import ProcessBuilder
from semantic_kernel.processes.local_runtime.local_kernel_process import start
from semantic_kernel.processes.local_runtime.local_event import KernelProcessEvent
from semantic_kernel.processes.kernel_process import (
    KernelProcessStep,
    KernelProcessStepContext,
)

MIN_RANDOM_NUM  = 2
MAX_RANDOM_NUM  = 10

class Events(Enum):
    Start = "Start"
    ToDynamicStep = "ToDynamicStep"
    ToFinalStep = "ToFinalStep"
    Completed = "Completed"

class StartStep(KernelProcessStep):
    @kernel_function()
    async def start(self, context: KernelProcessStepContext):
        print("--- START STEP ---")
        await asyncio.sleep(1)
        branches = random.randint(MIN_RANDOM_NUM , MAX_RANDOM_NUM )
        print(f"Generated random branch count: {branches}")
        await context.emit_event(Events.ToDynamicStep, data={"branches": branches})


# Step to dynamically execute parallel tasks
class DynamicStep(KernelProcessStep):
    @kernel_function()
    async def dynamic_runner(self, context: KernelProcessStepContext, data: dict):
        branches = data["branches"]
        print(f"--- DYNAMIC STEP with {branches} branches ---")

        # Run parallel tasks with exception handling
        tasks = [self.run_single_branch(branch_id) for branch_id in range(1, branches + 1)]
        outputs = await asyncio.gather(*tasks, return_exceptions=True)

        print("Dynamic Step completed all branches")
        await context.emit_event(Events.ToFinalStep, data={"outputs": outputs})

    async def run_single_branch(self, branch_id: int) -> str:
        try:
            sleep_time = random.randint(3, 5)
            print(f"Branch {branch_id} started, will sleep for {sleep_time} seconds")
            await asyncio.sleep(sleep_time)
            print(f"Branch {branch_id} completed")
            return f"Branch {branch_id} completed (slept {sleep_time}s)"
        except Exception as e:
            print(f"Branch {branch_id} failed: {str(e)}")
            return f"Branch {branch_id} failed: {str(e)}"


# Final step to aggregate and output results
class FinalStep(KernelProcessStep):
    @kernel_function()
    async def finalize(self, context: KernelProcessStepContext, data: dict):
        outputs = data.get("outputs", [])
        print("--- FINAL STEP ---")
        await asyncio.sleep(1)

        # Process results and separate successes and failures
        success_results = [o for o in outputs if not isinstance(o, Exception)]
        failed_results = [o for o in outputs if isinstance(o, Exception)]

        print(f"Total branches executed: {len(outputs)}")
        print("Success Results:")
        for result in success_results:
            print(f"- {result}")

        if failed_results:
            print("Failed Results:")
            for fail in failed_results:
                print(f"- {fail}")

        print("--- PROCESS COMPLETED ---")
        await context.emit_event(Events.Completed)


async def main():
    kernel = Kernel()

    # Initialize process builder
    builder = ProcessBuilder("DynamicParallelExecutionProcess")

    # Add steps
    start_step = builder.add_step(step_type=StartStep)
    dynamic_step = builder.add_step(step_type=DynamicStep)
    final_step = builder.add_step(step_type=FinalStep)

    # Define step events
    builder.on_input_event(Events.Start).send_event_to(start_step)
    start_step.on_event(Events.ToDynamicStep).send_event_to(dynamic_step, parameter_name="data")
    dynamic_step.on_event(Events.ToFinalStep).send_event_to(final_step, parameter_name="data")
    final_step.on_event(Events.Completed).stop_process()

    # Build and start the process
    kernel_process = builder.build()
    await start(
        process=kernel_process,
        kernel=kernel,
        initial_event=KernelProcessEvent(id=Events.Start),
    )

await main()

出力
Output
Branch 3 started, will sleep for 5 seconds
Branch 4 started, will sleep for 5 seconds
Branch 5 started, will sleep for 4 seconds
Branch 6 started, will sleep for 5 seconds
Branch 7 started, will sleep for 5 seconds
Branch 8 started, will sleep for 4 seconds
Branch 9 started, will sleep for 4 seconds
Branch 2 completed
Branch 1 completed
Branch 5 completed
Branch 8 completed
Branch 9 completed
Branch 3 completed
Branch 4 completed
Branch 6 completed
Branch 7 completed
Dynamic Step completed all branches
--- FINAL STEP ---
Total branches executed: 9
Success Results:
- Branch 1 completed (slept 4s)
- Branch 2 completed (slept 3s)
- Branch 3 completed (slept 5s)
- Branch 4 completed (slept 5s)
- Branch 5 completed (slept 4s)
- Branch 6 completed (slept 5s)
- Branch 7 completed (slept 5s)
- Branch 8 completed (slept 4s)
- Branch 9 completed (slept 4s)
--- PROCESS COMPLETED ---

動的並列関数実行の解説

この例では、DynamicStep 内で非同期並列実行を行っています。

### 抜粋 ###
tasks = [self.run_single_branch(branch_id) for branch_id in range(1, branches + 1)]
outputs = await asyncio.gather(*tasks, return_exceptions=True)
  • 決まった数だけタスク(run_single_branch())を生成
  • asyncio.gather() で並列実行

この設計により、プロセスの構成は固定したまま、内部で柔軟なスケールアウトが可能になる Tips として紹介します。


続編予告(Deep Research on Azure サンプルコード)

次回のブログでは、Process Framework を活用して「Deep Research on Azure」をサンプル実装をご紹介します。実際のユースケースを通して、活用イメージをさらに深めていただける内容です。

image.png


イベント告知|Azure OpenAI Developer Seminar 2025 開催!

日時: 3月31日(月)13:00 - 15:00
イベントタイトル:Azure OpenAI Developer Seminar 2025

Azure OpenAI Service や Azure AI Foundry など、最新の AI 技術・アップデート情報を一挙にご紹介。
さらに、最新の AI エージェント開発手法についても解説予定です。興味のある方はぜひご参加ください!

詳細・参加登録はこちら

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?