6
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Upstash Workflow × AI Elementsでワークフローをオシャレに可視化!

Posted at

前書き

最近、AIワークフローを構築できるサービスが大量に増えましたね。
GoogleのOpalが日本向けにサービス開始やOpenAIのAgent Builderの一般提供、老舗のN8NやDifyが競い合う中で、どんなシナジーが生まれるのかが楽しみですね。

そんな中、自分のアプリケーションにもオシャレなAIワークフローを追加したいと思ったことはありませんか?:relaxed::point_up_tone1:
実は、簡単に実装できる方法があります。それがUpstash Workflowです。

upstash_owler_20220317_221112_original.png

Upstash とは

Upstashは、NoSQLデータベース、ベクターデータベース、ワークフロー、検索サービスなどを提供するServerless Data Platformです。

サービス自体を使ったことがなくても、Context7 MCP Serverを利用したことがある方は多いかもしれません。実はそれもUpstashの製品です:point_up_tone1:

Upstash Workflowの特徴は以下の通りです:

  • 耐久性、信頼性、パフォーマンスに優れたサーバーレス関数
  • 並行実行や長時間待機にも対応
  • クレジットカード登録なしで使える無料枠あり

無料プランでも一日1000ステップまで利用できます。

ステップとは、たとえばcontext.runメソッドを使って情報収集文章生成ユーザー通知という一連の操作をワークフローで実行する場合、3ステップを消費します。

外部API呼び出しなど重い処理向けのcontext.callを使う場合は、1回の呼び出しで2ステップを消費します。

検証段階であれば、無料プランで十分に試すことができます。

基礎的なワークフロー

まずは簡単なワークフローを作ってみます。

Upstash WorkflowのSDKはTypeScript版Python版がありますが、今回はTypeScriptを使います。
フレームワークはNext.jsを使います。Cloudflare WorkersHonoでも使えますが、今回作るワークフローは画面も欲しいのでNext.jsにしました。

ドキュメントは下記の通り。

プロジェクトの初期化

App RouterのNext.jsプロジェクトを新規作成します。今回はNext.js 16を使用します。

npx create-next-app@latest

次に、Upstashのワークフローライブラリをインストールします。

npm install @upstash/workflow

app/apiディレクトリ配下にworkflowフォルダを作成し、ワークフローAPIを実装します。

app/api/workflow/route.ts
import { serve } from "@upstash/workflow/nextjs"

export const { POST } = serve(
  async (context) => {
    await context.run("initial-step", () => {
      console.log("initial step ran")
    })

    await context.run("second-step", () => {
      console.log("second step ran")
    })
  }
)

次に、ワークフローを起動するトリガーを作成します。

src/app/actions.ts
'use server'

import { Client } from "@upstash/workflow";

export async function triggerWorkflow() {
  const token = process.env.QSTASH_TOKEN;

  if (!token) {
    throw new Error("QSTASH_TOKEN environment variable is not set");
  }

  const client = new Client({ token })

  const { workflowRunId } = await client.trigger({
    url: `http://localhost:3000/api/workflow`,
    retries: 3, //リトライの回数
  });
  return { workflowRunId };
}

src/app/page.tsxにワークフローを実行するボタンを追加します。

src/app/page.tsx
"use client";

import { useActionState } from "react";
import { triggerWorkflow, type WorkflowState } from "./actions";

const initialState: WorkflowState = {};

export default function Home() {
  const [state, formAction, isPending] = useActionState(
    triggerWorkflow,
    initialState
  );

  return (
    <div className="flex min-h-screen items-center justify-center bg-zinc-50 font-sans dark:bg-black">
      <main className="flex flex-col items-center gap-8 p-8">
        <h1 className="text-4xl font-bold text-black dark:text-zinc-50">
          Upstash Workflow Demo
        </h1>
        <p className="text-lg text-zinc-600 dark:text-zinc-400">
          Click the button below to trigger a workflow using Upstash.
        </p>
        <form action={formAction}>
          <button
            type="submit"
            disabled={isPending}
            className="px-8 py-3 rounded-lg bg-black text-white hover:bg-zinc-800 dark:bg-white dark:text-black dark:hover:bg-zinc-200 disabled:opacity-50 disabled:cursor-not-allowed transition-colors"
          >
            {isPending ? "Loading..." : "Trigger Workflow"}
          </button>
        </form>
        {state.success && state.workflowRunId && (
          <p className="text-sm text-green-600 dark:text-green-400 max-w-md text-center">
            Workflow triggered! Run ID: {state.workflowRunId}
          </p>
        )}
        {state.error && (
          <p className="text-sm text-red-600 dark:text-red-400 max-w-md text-center">
            Error: {state.error}
          </p>
        )}
      </main>
    </div>
  );
}

Upstashの開発サーバーを起動する

本番環境では、UpstashワークフローはUpstashのQStashサーバー上で動作します。
完全マネージドサービスのため、開発者は内部の仕組みを意識する必要がありません。

ローカルでテストする際は、下記のコマンドを実行することで、本番環境と同じ動作をするサーバーを起動できます。

npx @upstash/qstash-cli dev

サーバーを起動すると、下記のように複数の環境変数が出力されます。

% npx @upstash/qstash-cli dev
Upstash QStash development server is runnning at http://127.0.0.1:8080

A default user has been created for you to authorize your requests.
QSTASH_TOKEN=you_token
QSTASH_CURRENT_SIGNING_KEY=you_token
QSTASH_NEXT_SIGNING_KEY=you_token

Sample cURL request:
curl -X POST http://127.0.0.1:8080/v2/publish/https://example.com -H "Authorization: Bearer you_token"

これらをNext.jsの環境変数ファイルに設定します。

.env
QSTASH_TOKEN=you_token
QSTASH_CURRENT_SIGNING_KEY=you_token
QSTASH_NEXT_SIGNING_KEY=you_token
QSTASH_URL=http://127.0.0.1:8080

次に、Upstashワークフローのダッシュボードを開き、ローカルモードを有効にします。

有効にしておくことで、ローカル環境のワークフロー実行時のログやステータスをダッシュボードから確認できるようになります。

F33B3B7B-D22E-481E-B2E9-5EBF8A02277B.jpeg

ワークフローのテスト

プロジェクトに戻り、別のターミナルを開いてNext.jsプロジェクトを起動します。

npm run dev

Trigger Workflowボタンをクリックすると、Run IDが出力されることを確認できます。

823BF7B9-A698-4718-B6F8-6FF85241B7C4.jpeg

Upstashのダッシュボードから、先ほどの実行ログも確認できます。

CADF89F3-1A78-4E49-A265-037530B80269.jpeg

Upstash Workflowの詳細

Upstashクライアント

前のステップで実行できましたが、実際にUpstashワークフローがどのような流れで実行されているかを説明します。

actions.tsには下記の記述があります。

actions.ts
    const { workflowRunId } = await client.trigger({
      url: `http://localhost:3000/api/workflow`,
      retries: 3, //リトライの回数
    });

http://localhost:3000は現在のNext.js API Route HandlerのURLですが、本番環境ではデプロイ先のURLになります。

そのため、Next.jsから切り離して、Honoやほかのフレームワークで実装したり、デプロイ先を分けても問題ありません。

したがって、公式ドキュメントには下記のような書き方も存在します。

const BASE_URL = process.env.VERCEL_URL
  ? `https://${process.env.VERCEL_URL}`
  : `http://localhost:3000`

const { workflowRunId } = await client.trigger({
  url: `${BASE_URL}/api/workflow`,
  retries: 3,
  keepTriggerConfig: true,
});

また、Upstashワークフローは実装しているサーバー内で動作しているわけではなく、UpstashのQStashとデータのやり取りを行うだけです。

そのため、実行時間に制限があるサービス、VercelのFunction、cloudflare workersでも、Upstashワークフローを実行できます:point_up_tone1:
workflow-concept.avif

client.triggerはワークフローを起動するメソッドです、実行後、workflowRunIdを取得できます。

このIDはワークフローの監視や制御において非常に重要です。

例えば、workflowRunIdが123のワークフローが現在実行中だとします。
どこまで実行されたかを確認したい場合、下記の方法で確認できます。

curl "https://qstash.upstash.io/v2/workflows/logs?workflowRunId=123" \
  -H "Authorization: Bearer <token>"

Next.jsの中で実行する場合は、clientのlogsメソッドが使えます。

import { Client } from "@upstash/workflow";

const client = new Client({ token: "<QSTASH_TOKEN>" });
const { runs } = await client.logs({ workflowRunId: "123" });

ローカルで実行したワークフローを確認したい場合、ローカルのURLを使ってください。
例えば先ほどローカルで実行したワークフローを確認したい場合、下記にコマンドで実行できます。

curl http://localhost:8080/v2/workflows/logs \
  -H "Authorization: Bearer you_token"

詳細は下記のクライアントドキュメントを確認してください。

ワークフロー制御

下記のコードは、最初に作成したワークフローです。

await context.runが2つ並んでいるので、2つのステップが順番に実行されるシンプルなワークフローです。

app/api/workflow/route.ts
import { serve } from "@upstash/workflow/nextjs"

export const { POST } = serve(
  async (context) => {
    await context.run("initial-step", () => {
      console.log("initial step ran")
    })

    await context.run("second-step", () => {
      console.log("second step ran")
    })
  }
)

contextには、ワークフロー実行時のすべてのState情報が含まれています。

例えば、APIサーバーからuserIDなどが渡された場合、const { userId } = context.requestPayload;のようにしてuserIdを取得できます。

渡すときは、下記のようにbodyにオブジェクトとして渡します。

const { workflowRunId } = await client.trigger({
  keepTriggerConfig: true,
  url: "https://<YOUR_WORKFLOW_ENDPOINT>/<YOUR-WORKFLOW-ROUTE>",
  body: {
  "userId": xxxx
  }

また、並行実行したい場合は、下記のように定義することで実行できます。

export const { POST } = serve<string>(
  async (context) => {
    const input = context.requestPayload;

    const promise1 = context.run("step-1", async () => {
      return someWork(input);
    });

    const promise2 = context.run("step-2", async () => {
      return someOtherWork(input);
    });

    await Promise.all([promise1, promise2]);
  },
);

詳細は下記のドキュメントを確認してください。

画像生成ワークフローの実装

次に、より実践的なワークフローを作成していきます。

ユーザーが指定したシーン枚数に基づいて、並行的に画像を生成するワークフローを実装してみます。

実行中
7C3EC58F-40F7-4C6A-B87A-1C5506FF86CB.jpeg

実行完了
45A0EB58-3F73-4F0C-803A-41396DC2CB2D.jpeg

AI Elements

AI ElementsはVercel AI SDKのエコシステムの一つで、shadcn/uiのリポジトリにも含まれています。

これをうまく活用すれば、AI SDKと組み合わせることで、チャットシステムのUIを爆速で構築できます。

今回はAI ElementsのWorkflowコンポーネントを使用します。その内部実装にはReact Flowが使われており、OpenAIのAgent BuilderもReact Flowを採用しています。

3CBF661A-2C27-4711-84C0-EA6E318D9048.jpeg

プロジェクトに戻って、下記のコマンドでWorkflowをインストールします。

npx ai-elements@latest || npx shadcn@latest add @ai-elements/all
npm i @xyflow/react

AI Elementsを使ってワークフローのライブラリーを導入すると、多くのコンポーネントが一緒にインストールされてしまいます。

React Flow単体でもこのワークフローの実装は可能なので、最後にサンプルリポジトリを参考にしてください。

ワークフローの実装

今回はAI SDKとBedrockのプロバイダーを使用します。
下記のライブラリーをインストールしてください。プロバイダーはお好きなものを使用していただいて構いません。

npm install ai @ai-sdk/react zod @ai-sdk/amazon-bedrock

ワークフローの実装は下記の通りです。

app/api/generateImage/route.ts
import { serve } from "@upstash/workflow/nextjs";
import { generateObject, experimental_generateImage as generateImage } from "ai";
import { createAmazonBedrock } from "@ai-sdk/amazon-bedrock";
import { z } from "zod";
import type {
  ImageWorkflowPayload,
  ImageGenerationResult,
} from "@/types/workflow";

const bedrock = createAmazonBedrock({
  region: "us-east-1",
  accessKeyId: process.env.AWS_ACCESS_KEY_ID,
  secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
  sessionToken: process.env.AWS_SESSION_TOKEN,
});

export const { POST } = serve<ImageWorkflowPayload>(
  async (context) => {
    const { scene, numberOfImages = 3 } = context.requestPayload;

    console.log(`Starting image workflow for scene: "${scene}"`);

    // Step 1: Generate prompt variations using Claude with structured output
    const prompts = await context.run("generate-prompts", async () => {
      try {
        const { object } = await generateObject({
          model: bedrock("us.anthropic.claude-sonnet-4-5-20250929-v1:0"),
          output: "array",
          schema: z.object({
            prompt: z
              .string()
              .describe(
                "A detailed 1-2 sentence image generation prompt describing the scene vividly",
              ),
            aspect: z
              .string()
              .describe(
                "Brief description of what unique aspect or interpretation of the scene this prompt captures",
              ),
          }),
          prompt: `You are a creative AI that generates diverse image prompts. Given a scene description, create ${numberOfImages} distinct and creative variations of image prompts that capture different aspects, perspectives, or interpretations of the scene.

Scene: "${scene}"

Generate ${numberOfImages} unique prompts, each focusing on a different:
- Time of day or lighting
- Emotional tone or atmosphere
- Visual style or artistic interpretation
- Specific detail or element of the scene
- Camera angle or composition

Each prompt should be vivid and descriptive (1-2 sentences) to generate high-quality images.`,
        });

        console.log(
          "Generated prompts:",
          object.map((p) => `${p.aspect}: ${p.prompt}`),
        );

        // Extract just the prompts from the structured output
        return object.map((item) => item.prompt);
      } catch (error) {
        console.error("Failed to generate prompts:", error);
        throw error;
      }
    });

    console.log(`Generated ${prompts.length} prompts, starting image generation`);

    // Step 2-N: Generate images in parallel for each prompt
    const images = await Promise.all(
      prompts.map(async (prompt, index) => {
        return await context.run(`generate-image-${index + 1}`, async () => {
          try {
            console.log(
              `Generating image ${index + 1}/${prompts.length}: ${prompt.substring(0, 50)}...`,
            );

            const { image } = await generateImage({
              model: bedrock.image("amazon.nova-canvas-v1:0"),
              prompt: prompt,
              size: "512x512",
              // Bedrock seed must be <= 2147483646 (32-bit int max)
              // Use modulo to ensure it stays within range while maintaining uniqueness
              seed: (Date.now() % 1000000) * 1000 + index,
              providerOptions: {
                bedrock: {
                  quality: "standard",
                  cfgScale: 7.5,
                },
              },
            });

            // Convert image to base64
            const base64 = image.base64;

            console.log(
              `Successfully generated image ${index + 1}/${prompts.length}`,
            );

            return {
              prompt,
              imageBase64: base64,
              index: index + 1,
            };
          } catch (error) {
            console.error(`Failed to generate image ${index + 1}:`, error);
            throw error;
          }
        });
      }),
    );

    // Final result
    const result: ImageGenerationResult = {
      scene,
      images,
      generatedAt: Date.now(),
    };

    console.log(
      `Workflow completed successfully! Generated ${images.length} images for scene: "${scene}"`,
    );

    return result;
  },
  {
    retries: 2,
    verbose: true,
  },
);

処理の流れを解説します。

ユーザーからプロンプトと枚数を受け取り、最初のワークフローステップで枚数分のプロンプトを生成します。そして、生成されたプロンプトごとに並行ステップを作成し、それらを並行実行します。

サーバーアクションの実装では、2つの関数を実装します。

1.ワークフローを起動する関数: ワークフローを起動し、workflowRunIdをクライアントに返却する

app/actions.ts
// Trigger workflow response
export interface TriggerWorkflowResponse {
  workflowRunId: string;
  success: boolean;
  error?: string;
}

/**
 * Triggers the image generation workflow
 */
export async function triggerImageWorkflow(
    scene: string,
    numberOfImages: number = 3
): Promise<TriggerWorkflowResponse> {
    const token = process.env.QSTASH_TOKEN;

    if (!token) {
        return {
            workflowRunId: "",
            success: false,
            error: "QSTASH_TOKEN environment variable is not set",
        };
    }

    try {
        const client = new Client({ token });

        const { workflowRunId } = await client.trigger({
                 // BASE_URLの実装は前の解説を参照してください。
            url: `${BASE_URL}/api/generateImage`,
            body: {
                scene,
                numberOfImages,
            },
        });

        return {
            workflowRunId,
            success: true,
        };
    } catch (error) {
        return {
            workflowRunId: "",
            success: false,
            error: error instanceof Error ? error.message : "Unknown error",
        };
    }
}

2. ワークフローの状態を監視する関数: ワークフロー全体と各ステップの実行状態を返却します。

app/actions.ts
// Workflow step states
export type StepState =
  | "STEP_PENDING"
  | "STEP_PROGRESS"
  | "STEP_SUCCESS"
  | "STEP_RETRY"
  | "STEP_FAILED";

// Workflow run states
export type WorkflowRunState =
  | "RUN_STARTED"
  | "RUN_SUCCESS"
  | "RUN_FAILED"
  | "RUN_CANCELED";

// Individual step information
export interface WorkflowStep {
  stepName: string;
  state: StepState;
  out?: string; // Output from the step
  messageId?: string;
}

// Workflow status response
export interface WorkflowStatus {
  workflowRunId: string;
  workflowUrl: string;
  workflowState: WorkflowRunState;
  workflowRunCreatedAt: number;
  workflowRunCompletedAt?: number;
  steps: WorkflowStep[];
  progress: {
    completed: number;
    total: number;
    percentage: number;
  };
  result?: ImageGenerationResult;
  partialImages: GeneratedImage[]; // Images completed so far
  isComplete: boolean; // Whether workflow has finished (success/failed/canceled)
}

// Generated image result
export interface GeneratedImage {
  prompt: string;
  imageBase64: string;
  index: number;
}

// Final workflow result
export interface ImageGenerationResult {
  scene: string;
  images: GeneratedImage[];
  generatedAt: number;
}

// Trigger workflow response
export interface TriggerWorkflowResponse {
  workflowRunId: string;
  success: boolean;
  error?: string;
}

/**
 * Gets the status of a workflow run, including partial results
 */
export async function getWorkflowStatus(
    workflowRunId: string
): Promise<WorkflowStatus | null> {
    const token = process.env.QSTASH_TOKEN;

    if (!token) {
        console.error("QSTASH_TOKEN not set");
        return null;
    }

    try {
        const client = new Client({ token });
        const { runs } = await client.logs({
            workflowRunId,
            count: 1,
        });

        if (!runs || runs.length === 0) {
            return null;
        }

        const run = runs[0];

        // Parse steps from the workflow run
        const steps: WorkflowStep[] = [];
        const partialImages: GeneratedImage[] = [];
        let completedSteps = 0;

        // Process step groups (both sequential and parallel)
        if (run.steps) {
            for (const stepGroup of run.steps) {
                if (stepGroup.type === "sequential" && stepGroup.steps) {
                    for (const step of stepGroup.steps) {
                        const stepInfo: WorkflowStep = {
                            stepName: step.stepName || "unknown",
                            state: (step.state as StepState) || "STEP_PENDING",
                            out: step.out as string | undefined,
                            messageId: step.messageId as string | undefined,
                        };
                        steps.push(stepInfo);

                        if (step.state === "STEP_SUCCESS") {
                            completedSteps++;

                            // Extract image data from completed image generation steps
                            if (
                                step.stepName?.startsWith("generate-image-") &&
                                step.out
                            ) {
                                try {
                                    const imageData = JSON.parse(
                                        step.out as string
                                    ) as GeneratedImage;
                                    partialImages.push(imageData);
                                } catch (e) {
                                    console.error("Failed to parse image data:", e);
                                }
                            }
                        }
                    }
                } else if (stepGroup.type === "parallel" && stepGroup.steps) {
                    for (const step of stepGroup.steps) {
                        const stepInfo: WorkflowStep = {
                            stepName: step.stepName || "unknown",
                            state: (step.state as StepState) || "STEP_PENDING",
                            out: step.out as string | undefined,
                            messageId: step.messageId as string | undefined,
                        };
                        steps.push(stepInfo);

                        if (step.state === "STEP_SUCCESS") {
                            completedSteps++;

                            // Extract image data from completed image generation steps
                            if (
                                step.stepName?.startsWith("generate-image-") &&
                                step.out
                            ) {
                                try {
                                    const imageData = JSON.parse(
                                        step.out as string
                                    ) as GeneratedImage;
                                    partialImages.push(imageData);
                                } catch (e) {
                                    console.error("Failed to parse image data:", e);
                                }
                            }
                        }
                    }
                }
            }
        }

        // Sort partial images by index
        partialImages.sort((a, b) => a.index - b.index);

        // Calculate progress
        const totalSteps = steps.length || 1;
        const percentage = Math.round((completedSteps / totalSteps) * 100);

        // Check if workflow is complete
        const isComplete =
            run.workflowState === "RUN_SUCCESS" ||
            run.workflowState === "RUN_FAILED" ||
            run.workflowState === "RUN_CANCELED";

        return {
            workflowRunId: run.workflowRunId,
            workflowUrl: run.workflowUrl,
            workflowState: run.workflowState,
            workflowRunCreatedAt: run.workflowRunCreatedAt,
            workflowRunCompletedAt: run.workflowRunCompletedAt,
            steps,
            progress: {
                completed: completedSteps,
                total: totalSteps,
                percentage,
            },
            result: run.workflowRunResponse as ImageGenerationResult | undefined,
            partialImages,
            isComplete,
        };
    } catch (error) {
        console.error("Failed to fetch workflow status:", error);
        return null;
    }
}

クライアントの実装

今回はワークフローのステータスを監視するために、2秒ごとにポーリング処理を実行しています。

ただし、実際に複数のワークフローを同時実行する場合は、リロードボタンを追加して、クリック時に最新の状態を取得する実装も有効です。

app/generate-image/page.tsx
app/generate-image/page.tsx
"use client";

import { useState, useEffect } from "react";
import { triggerImageWorkflow, getWorkflowStatus } from "@/app/actions";
import type { WorkflowStatus, GeneratedImage } from "@/types/workflow";
import { WorkflowVisualizer } from "@/components/workflow-visualizer";
import { Button } from "@/components/ui/button";
import { Input } from "@/components/ui/input";
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card";
import { Progress } from "@/components/ui/progress";
import { Badge } from "@/components/ui/badge";
import { Loader2, Sparkles } from "lucide-react";

export default function GenerateImagePage() {
  // Form state
  const [scene, setScene] = useState("");
  const [numberOfImages, setNumberOfImages] = useState(3);
  const [loading, setLoading] = useState(false);

  // Workflow state
  const [workflowRunId, setWorkflowRunId] = useState<string | null>(null);
  const [status, setStatus] = useState<WorkflowStatus | null>(null);
  const [generatedImages, setGeneratedImages] = useState<GeneratedImage[]>([]);
  const [error, setError] = useState<string | null>(null);

  // Poll workflow status
  useEffect(() => {
    if (!workflowRunId) return;

    let intervalId: NodeJS.Timeout;

    const pollStatus = async () => {
      const newStatus = await getWorkflowStatus(workflowRunId);
      if (newStatus) {
        setStatus(newStatus);

        // Update generated images with partial results
        if (newStatus.partialImages.length > 0) {
          setGeneratedImages(newStatus.partialImages);
        }

        // Stop polling if workflow is complete
        if (newStatus.isComplete) {
          setLoading(false);
          if (newStatus.result?.images) {
            setGeneratedImages(newStatus.result.images);
          }

          // Stop polling immediately
          clearInterval(intervalId);
          return;
        }
      }
    };

    // Poll every 2 seconds
    intervalId = setInterval(pollStatus, 2000);

    // Initial poll
    pollStatus();

    return () => clearInterval(intervalId);
  }, [workflowRunId]);

  // Handle form submission
  const handleSubmit = async (e: React.FormEvent) => {
    e.preventDefault();

    if (!scene.trim()) {
      setError("Please enter a scene description");
      return;
    }

    setLoading(true);
    setError(null);
    setWorkflowRunId(null);
    setStatus(null);
    setGeneratedImages([]);

    const response = await triggerImageWorkflow(scene, numberOfImages);

    if (response.success && response.workflowRunId) {
      setWorkflowRunId(response.workflowRunId);
    } else {
      setError(response.error || "Failed to start workflow");
      setLoading(false);
    }
  };

  return (
    <div className="container mx-auto max-w-7xl p-4 md:p-8">
      <div className="mb-8">
        <h1 className="flex items-center gap-2 text-3xl font-bold">
          <Sparkles className="h-8 w-8 text-primary" />
          AI Image Generation Workflow
        </h1>
        <p className="mt-2 text-muted-foreground">
          Generate multiple AI images from a scene description using Upstash Workflow, Claude, and Amazon Nova
        </p>
      </div>

      {/* Form */}
      <Card className="mb-8">
        <CardHeader>
          <CardTitle>Generate Images</CardTitle>
          <CardDescription>
            Describe a scene and choose how many images to generate
          </CardDescription>
        </CardHeader>
        <CardContent>
          <form onSubmit={handleSubmit} className="space-y-4">
            <div className="space-y-2">
              <label htmlFor="scene" className="text-sm font-medium">
                Scene Description
              </label>
              <Input
                id="scene"
                placeholder="e.g., A serene Japanese garden with cherry blossoms"
                value={scene}
                onChange={(e) => setScene(e.target.value)}
                disabled={loading}
                className="w-full"
              />
            </div>

            <div className="space-y-2">
              <label htmlFor="numberOfImages" className="text-sm font-medium">
                Number of Images: {numberOfImages}
              </label>
              <input
                id="numberOfImages"
                type="range"
                min="1"
                max="5"
                value={numberOfImages}
                onChange={(e) => setNumberOfImages(parseInt(e.target.value))}
                disabled={loading}
                className="w-full"
              />
              <p className="text-xs text-muted-foreground">
                Generate between 1-5 images (more images = longer processing time)
              </p>
            </div>

            <Button type="submit" disabled={loading} className="w-full">
              {loading ? (
                <>
                  <Loader2 className="mr-2 h-4 w-4 animate-spin" />
                  Generating...
                </>
              ) : (
                <>
                  <Sparkles className="mr-2 h-4 w-4" />
                  Generate Images
                </>
              )}
            </Button>
          </form>

          {error && (
            <div className="mt-4 rounded-md bg-red-50 p-3 text-sm text-red-600">
              {error}
            </div>
          )}
        </CardContent>
      </Card>

      {/* Progress and Results */}
      {status && (
        <div className="space-y-6">
          {/* Progress Section */}
          <Card>
            <CardHeader>
              <CardTitle className="flex items-center justify-between">
                <span>Workflow Progress</span>
                <Badge
                  variant={
                    status.workflowState === "RUN_SUCCESS"
                      ? "default"
                      : status.workflowState === "RUN_FAILED"
                      ? "destructive"
                      : "secondary"
                  }
                >
                  {status.workflowState.replace("RUN_", "")}
                </Badge>
              </CardTitle>
              <CardDescription>
                Run ID: {status.workflowRunId}
              </CardDescription>
            </CardHeader>
            <CardContent className="space-y-4">
              <div>
                <div className="mb-2 flex items-center justify-between text-sm">
                  <span className="font-medium">Overall Progress</span>
                  <span className="text-muted-foreground">
                    {status.progress.percentage}% ({status.progress.completed}/{status.progress.total} steps)
                  </span>
                </div>
                <Progress value={status.progress.percentage} className="h-2" />
              </div>
            </CardContent>
          </Card>

          {/* Workflow Visualization */}
          <Card>
            <CardHeader>
              <CardTitle>Workflow Visualization</CardTitle>
              <CardDescription>
                Real-time visualization of the workflow execution
              </CardDescription>
            </CardHeader>
            <CardContent>
              <WorkflowVisualizer
                status={status}
                scene={scene}
                numberOfImages={numberOfImages}
              />
            </CardContent>
          </Card>

          {/* Generated Images Section */}
          <Card>
            <CardHeader>
              <CardTitle>Generated Images</CardTitle>
              <CardDescription>
                {generatedImages.length > 0
                  ? `${generatedImages.length} of ${numberOfImages} images completed`
                  : "Images will appear here as they are generated"}
              </CardDescription>
            </CardHeader>
            <CardContent>
              <div className="grid grid-cols-1 gap-4 md:grid-cols-2 lg:grid-cols-3">
                {/* Show placeholder loading cards */}
                {Array.from({ length: numberOfImages }).map((_, index) => {
                  const image = generatedImages[index];

                  if (image) {
                    return (
                      <Card key={index} className="overflow-hidden">
                        <img
                          src={`data:image/png;base64,${image.imageBase64}`}
                          alt={image.prompt}
                          className="h-64 w-full object-cover"
                        />
                        <CardContent className="p-3">
                          <p className="line-clamp-2 text-xs text-muted-foreground">
                            {image.prompt}
                          </p>
                        </CardContent>
                      </Card>
                    );
                  }

                  return (
                    <Card
                      key={index}
                      className="flex h-64 items-center justify-center border-dashed"
                    >
                      <div className="text-center">
                        <Loader2 className="mx-auto mb-2 h-8 w-8 animate-spin text-muted-foreground" />
                        <p className="text-sm text-muted-foreground">
                          Generating image {index + 1}...
                        </p>
                      </div>
                    </Card>
                  );
                })}
              </div>
            </CardContent>
          </Card>
        </div>
      )}

      {/* Empty State */}
      {!status && !loading && (
        <Card className="border-dashed">
          <CardContent className="flex h-64 items-center justify-center">
            <div className="text-center">
              <Sparkles className="mx-auto mb-4 h-12 w-12 text-muted-foreground" />
              <p className="text-lg font-medium">Ready to generate images</p>
              <p className="text-sm text-muted-foreground">
                Enter a scene description above to get started
              </p>
            </div>
          </CardContent>
        </Card>
      )}
    </div>
  );
}

ワークフローのUIコンポーネント実装

実装内容が多くなるため、別コンポーネントとして切り出して実装します。

components/workflow-visualizer.tsx
components/workflow-visualizer.tsx
'use client';

import { useMemo } from 'react';
import {
  type Node as FlowNode,
  type Edge as FlowEdge,
  MarkerType,
} from '@xyflow/react';
import '@xyflow/react/dist/style.css';
import type { WorkflowStatus, StepState } from '@/types/workflow';
import { Canvas } from '@/components/ai-elements/canvas';
import { Node, NodeHeader, NodeTitle, NodeContent, NodeFooter } from '@/components/ai-elements/node';
import { Edge } from '@/components/ai-elements/edge';
import { Badge } from '@/components/ui/badge';
import { cn } from '@/lib/utils';

// Custom Node Component using AI Elements
function WorkflowNode({ data }: { data: any }) {
  const stateColors = {
    STEP_PENDING: 'bg-gray-50 border-gray-300',
    STEP_PROGRESS: 'bg-blue-50 border-blue-400 shadow-lg',
    STEP_SUCCESS: 'bg-green-50 border-green-400',
    STEP_RETRY: 'bg-yellow-50 border-yellow-400',
    STEP_FAILED: 'bg-red-50 border-red-400',
  };

  const stateIcons = {
    STEP_PENDING: '',
    STEP_PROGRESS: '🔄',
    STEP_SUCCESS: '',
    STEP_RETRY: '🔁',
    STEP_FAILED: '',
  };

  const stateBadgeVariants: Record<StepState, 'default' | 'secondary' | 'destructive' | 'outline'> = {
    STEP_PENDING: 'secondary',
    STEP_PROGRESS: 'default',
    STEP_SUCCESS: 'default',
    STEP_RETRY: 'secondary',
    STEP_FAILED: 'destructive',
  };

  const colorClass = stateColors[data.state as StepState] || stateColors.STEP_PENDING;
  const icon = stateIcons[data.state as StepState] || '';

  return (
    <Node
      handles={{ target: data.hasTarget, source: data.hasSource }}
      className={cn(
        'min-w-[250px] border-2 transition-all duration-300',
        colorClass,
        data.state === 'STEP_PROGRESS' && 'animate-pulse'
      )}
    >
      <NodeHeader>
        <div className="flex items-center justify-between gap-2">
          <div className="flex items-center gap-2">
            <span className="text-xl">{icon}</span>
            <NodeTitle className="text-sm font-semibold">{data.label}</NodeTitle>
          </div>
          <Badge variant={stateBadgeVariants[data.state as StepState] || 'outline'} className="text-xs">
            {data.state.replace('STEP_', '')}
          </Badge>
        </div>
        {data.description && (
          <p className="text-xs text-muted-foreground mt-1">{data.description}</p>
        )}
      </NodeHeader>

      {data.content && (
        <NodeContent>
          {data.imageBase64 ? (
            <img
              src={`data:image/png;base64,${data.imageBase64}`}
              alt="Generated"
              className="w-full h-auto rounded"
            />
          ) : (
            <p className="text-xs text-gray-700 whitespace-pre-line">{data.content}</p>
          )}
        </NodeContent>
      )}

      {data.footer && (
        <NodeFooter>
          <p className="text-xs text-muted-foreground">{data.footer}</p>
        </NodeFooter>
      )}
    </Node>
  );
}

const nodeTypes = {
  workflow: WorkflowNode,
};

const edgeTypes = {
  animated: Edge.Animated,
};

interface WorkflowVisualizerProps {
  status: WorkflowStatus | null;
  scene: string;
  numberOfImages: number;
}

export function WorkflowVisualizer({ status, scene, numberOfImages }: WorkflowVisualizerProps) {
  // Generate nodes from workflow status
  const nodes = useMemo<FlowNode[]>(() => {
    if (!status) {
      return [];
    }

    const generatedNodes: FlowNode[] = [];

    // Start Node
    generatedNodes.push({
      id: 'start',
      type: 'workflow',
      position: { x: 0, y: 200 },
      data: {
        label: 'Workflow Start',
        description: 'User input received',
        state: 'STEP_SUCCESS',
        content: `Scene: "${scene}"\nImages to generate: ${numberOfImages}`,
        footer: `Started at ${new Date(status.workflowRunCreatedAt).toLocaleTimeString()}`,
        hasTarget: false,
        hasSource: true,
      },
    });

    // Prompts Node
    const promptStep = status.steps.find(s => s.stepName === 'generate-prompts');
    generatedNodes.push({
      id: 'prompts',
      type: 'workflow',
      position: { x: 400, y: 200 },
      data: {
        label: 'Generate Prompts',
        description: 'Claude AI prompt generation',
        state: promptStep?.state || 'STEP_PENDING',
        content: promptStep?.state === 'STEP_SUCCESS'
          ? `Generated ${numberOfImages} unique prompts`
          : 'Generating prompt variations...',
        footer: 'Model: Claude Sonnet 4.5',
        hasTarget: true,
        hasSource: true,
      },
    });

    // Image Generation Nodes (parallel)
    for (let i = 1; i <= numberOfImages; i++) {
      const imageStep = status.steps.find(s => s.stepName === `generate-image-${i}`);
      const partialImage = status.partialImages.find(img => img.index === i);

      // Calculate vertical position for parallel nodes
      const yOffset = (i - 1 - (numberOfImages - 1) / 2) * 220;

      generatedNodes.push({
        id: `image-${i}`,
        type: 'workflow',
        position: { x: 900, y: 200 + yOffset },
        data: {
          label: `Image ${i}`,
          description: `Nova Canvas generation`,
          state: imageStep?.state || 'STEP_PENDING',
          content: partialImage?.prompt || 'Waiting for prompt...',
          imageBase64: partialImage?.imageBase64,
          footer: imageStep?.state === 'STEP_SUCCESS'
            ? '✓ Generated'
            : imageStep?.state === 'STEP_PROGRESS'
            ? 'Generating...'
            : 'Pending',
          hasTarget: true,
          hasSource: true,
        },
      });
    }

    // Complete Node
    generatedNodes.push({
      id: 'complete',
      type: 'workflow',
      position: { x: 1400, y: 200 },
      data: {
        label: 'Workflow Complete',
        description: 'Final result aggregation',
        state: status.isComplete
          ? (status.workflowState === 'RUN_SUCCESS' ? 'STEP_SUCCESS' : 'STEP_FAILED')
          : 'STEP_PENDING',
        content: status.isComplete
          ? `${status.partialImages.length} images generated successfully`
          : `Progress: ${status.progress.percentage}%`,
        footer: status.workflowRunCompletedAt
          ? `Completed at ${new Date(status.workflowRunCompletedAt).toLocaleTimeString()}`
          : 'In progress...',
        hasTarget: true,
        hasSource: false,
      },
    });

    return generatedNodes;
  }, [status, scene, numberOfImages]);

  // Generate edges from workflow status
  const edges = useMemo<FlowEdge[]>(() => {
    if (!status) {
      return [];
    }

    const generatedEdges: FlowEdge[] = [];

    // Start → Prompts
    const promptStep = status.steps.find(s => s.stepName === 'generate-prompts');
    const isPromptActive = promptStep?.state === 'STEP_PROGRESS' || promptStep?.state === 'STEP_SUCCESS';
    generatedEdges.push({
      id: 'edge-start-prompts',
      source: 'start',
      target: 'prompts',
      type: 'animated',
      animated: isPromptActive,
      style: {
        stroke: promptStep?.state === 'STEP_SUCCESS' ? '#10b981' : '#3b82f6',
        strokeWidth: 2.5,
      },
      markerEnd: {
        type: MarkerType.ArrowClosed,
        color: promptStep?.state === 'STEP_SUCCESS' ? '#10b981' : '#3b82f6',
      },
    });

    // Prompts → Images (parallel)
    for (let i = 1; i <= numberOfImages; i++) {
      const imageStep = status.steps.find(s => s.stepName === `generate-image-${i}`);
      const isActive = imageStep?.state === 'STEP_PROGRESS' || imageStep?.state === 'STEP_SUCCESS';
      const color = imageStep?.state === 'STEP_SUCCESS' ? '#10b981' :
                    imageStep?.state === 'STEP_FAILED' ? '#ef4444' : '#3b82f6';

      generatedEdges.push({
        id: `edge-prompts-image-${i}`,
        source: 'prompts',
        target: `image-${i}`,
        type: 'animated',
        animated: isActive,
        style: {
          stroke: color,
          strokeWidth: 2.5,
          strokeDasharray: imageStep?.state === 'STEP_SUCCESS' ? undefined : '5,5',
        },
        markerEnd: {
          type: MarkerType.ArrowClosed,
          color,
        },
      });
    }

    // Images → Complete
    for (let i = 1; i <= numberOfImages; i++) {
      const imageStep = status.steps.find(s => s.stepName === `generate-image-${i}`);
      const isComplete = imageStep?.state === 'STEP_SUCCESS';
      const color = isComplete ? '#10b981' : '#9ca3af';

      generatedEdges.push({
        id: `edge-image-${i}-complete`,
        source: `image-${i}`,
        target: 'complete',
        type: 'animated',
        animated: isComplete && status.isComplete,
        style: {
          stroke: color,
          strokeWidth: 2,
          strokeDasharray: isComplete ? undefined : '5,5',
        },
        markerEnd: {
          type: MarkerType.ArrowClosed,
          color,
        },
      });
    }

    return generatedEdges;
  }, [status, numberOfImages]);

  if (!status) {
    return (
      <div className="w-full h-[600px] flex items-center justify-center bg-gray-50 rounded-lg border-2 border-dashed border-gray-300">
        <p className="text-gray-500">No workflow data available</p>
      </div>
    );
  }

  return (
    <div className="w-full h-[600px] border rounded-lg overflow-hidden">
      <Canvas
        nodes={nodes}
        edges={edges}
        nodeTypes={nodeTypes}
        edgeTypes={edgeTypes}
      />
    </div>
  );
}

最後に

Upstash Workflowは本当によくできており、制御しやすく、非同期のワークフローも簡単に作成できます。
また、最近Vercel社もワークフロー機能をリリースしたので、そのうち触ってみようと思います。

今回実際に使用したリポジトリは下記に置いておきます。

6
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?