前書き
最近、AIワークフローを構築できるサービスが大量に増えましたね。
GoogleのOpalが日本向けにサービス開始やOpenAIのAgent Builderの一般提供、老舗のN8NやDifyが競い合う中で、どんなシナジーが生まれるのかが楽しみですね。
そんな中、自分のアプリケーションにもオシャレなAIワークフローを追加したいと思ったことはありませんか?![]()
![]()
実は、簡単に実装できる方法があります。それがUpstash Workflowです。
Upstash とは
Upstashは、NoSQLデータベース、ベクターデータベース、ワークフロー、検索サービスなどを提供するServerless Data Platformです。
サービス自体を使ったことがなくても、Context7 MCP Serverを利用したことがある方は多いかもしれません。実はそれもUpstashの製品です![]()
Upstash Workflowの特徴は以下の通りです:
- 耐久性、信頼性、パフォーマンスに優れたサーバーレス関数
- 並行実行や長時間待機にも対応
- クレジットカード登録なしで使える無料枠あり
無料プランでも一日1000ステップまで利用できます。
ステップとは、たとえばcontext.runメソッドを使って情報収集 → 文章生成 → ユーザー通知という一連の操作をワークフローで実行する場合、3ステップを消費します。
外部API呼び出しなど重い処理向けのcontext.callを使う場合は、1回の呼び出しで2ステップを消費します。
検証段階であれば、無料プランで十分に試すことができます。
基礎的なワークフロー
まずは簡単なワークフローを作ってみます。
Upstash WorkflowのSDKはTypeScript版とPython版がありますが、今回はTypeScriptを使います。
フレームワークはNext.jsを使います。Cloudflare WorkersやHonoでも使えますが、今回作るワークフローは画面も欲しいのでNext.jsにしました。
ドキュメントは下記の通り。
プロジェクトの初期化
App RouterのNext.jsプロジェクトを新規作成します。今回はNext.js 16を使用します。
npx create-next-app@latest
次に、Upstashのワークフローライブラリをインストールします。
npm install @upstash/workflow
app/apiディレクトリ配下にworkflowフォルダを作成し、ワークフローAPIを実装します。
import { serve } from "@upstash/workflow/nextjs"
export const { POST } = serve(
async (context) => {
await context.run("initial-step", () => {
console.log("initial step ran")
})
await context.run("second-step", () => {
console.log("second step ran")
})
}
)
次に、ワークフローを起動するトリガーを作成します。
'use server'
import { Client } from "@upstash/workflow";
export async function triggerWorkflow() {
const token = process.env.QSTASH_TOKEN;
if (!token) {
throw new Error("QSTASH_TOKEN environment variable is not set");
}
const client = new Client({ token })
const { workflowRunId } = await client.trigger({
url: `http://localhost:3000/api/workflow`,
retries: 3, //リトライの回数
});
return { workflowRunId };
}
src/app/page.tsxにワークフローを実行するボタンを追加します。
"use client";
import { useActionState } from "react";
import { triggerWorkflow, type WorkflowState } from "./actions";
const initialState: WorkflowState = {};
export default function Home() {
const [state, formAction, isPending] = useActionState(
triggerWorkflow,
initialState
);
return (
<div className="flex min-h-screen items-center justify-center bg-zinc-50 font-sans dark:bg-black">
<main className="flex flex-col items-center gap-8 p-8">
<h1 className="text-4xl font-bold text-black dark:text-zinc-50">
Upstash Workflow Demo
</h1>
<p className="text-lg text-zinc-600 dark:text-zinc-400">
Click the button below to trigger a workflow using Upstash.
</p>
<form action={formAction}>
<button
type="submit"
disabled={isPending}
className="px-8 py-3 rounded-lg bg-black text-white hover:bg-zinc-800 dark:bg-white dark:text-black dark:hover:bg-zinc-200 disabled:opacity-50 disabled:cursor-not-allowed transition-colors"
>
{isPending ? "Loading..." : "Trigger Workflow"}
</button>
</form>
{state.success && state.workflowRunId && (
<p className="text-sm text-green-600 dark:text-green-400 max-w-md text-center">
Workflow triggered! Run ID: {state.workflowRunId}
</p>
)}
{state.error && (
<p className="text-sm text-red-600 dark:text-red-400 max-w-md text-center">
Error: {state.error}
</p>
)}
</main>
</div>
);
}
Upstashの開発サーバーを起動する
本番環境では、UpstashワークフローはUpstashのQStashサーバー上で動作します。
完全マネージドサービスのため、開発者は内部の仕組みを意識する必要がありません。
ローカルでテストする際は、下記のコマンドを実行することで、本番環境と同じ動作をするサーバーを起動できます。
npx @upstash/qstash-cli dev
サーバーを起動すると、下記のように複数の環境変数が出力されます。
% npx @upstash/qstash-cli dev
Upstash QStash development server is runnning at http://127.0.0.1:8080
A default user has been created for you to authorize your requests.
QSTASH_TOKEN=you_token
QSTASH_CURRENT_SIGNING_KEY=you_token
QSTASH_NEXT_SIGNING_KEY=you_token
Sample cURL request:
curl -X POST http://127.0.0.1:8080/v2/publish/https://example.com -H "Authorization: Bearer you_token"
これらをNext.jsの環境変数ファイルに設定します。
QSTASH_TOKEN=you_token
QSTASH_CURRENT_SIGNING_KEY=you_token
QSTASH_NEXT_SIGNING_KEY=you_token
QSTASH_URL=http://127.0.0.1:8080
次に、Upstashワークフローのダッシュボードを開き、ローカルモードを有効にします。
有効にしておくことで、ローカル環境のワークフロー実行時のログやステータスをダッシュボードから確認できるようになります。
ワークフローのテスト
プロジェクトに戻り、別のターミナルを開いてNext.jsプロジェクトを起動します。
npm run dev
Trigger Workflowボタンをクリックすると、Run IDが出力されることを確認できます。
Upstashのダッシュボードから、先ほどの実行ログも確認できます。
Upstash Workflowの詳細
Upstashクライアント
前のステップで実行できましたが、実際にUpstashワークフローがどのような流れで実行されているかを説明します。
actions.tsには下記の記述があります。
const { workflowRunId } = await client.trigger({
url: `http://localhost:3000/api/workflow`,
retries: 3, //リトライの回数
});
http://localhost:3000は現在のNext.js API Route HandlerのURLですが、本番環境ではデプロイ先のURLになります。
そのため、Next.jsから切り離して、Honoやほかのフレームワークで実装したり、デプロイ先を分けても問題ありません。
したがって、公式ドキュメントには下記のような書き方も存在します。
const BASE_URL = process.env.VERCEL_URL
? `https://${process.env.VERCEL_URL}`
: `http://localhost:3000`
const { workflowRunId } = await client.trigger({
url: `${BASE_URL}/api/workflow`,
retries: 3,
keepTriggerConfig: true,
});
また、Upstashワークフローは実装しているサーバー内で動作しているわけではなく、UpstashのQStashとデータのやり取りを行うだけです。
そのため、実行時間に制限があるサービス、VercelのFunction、cloudflare workersでも、Upstashワークフローを実行できます![]()

client.triggerはワークフローを起動するメソッドです、実行後、workflowRunIdを取得できます。
このIDはワークフローの監視や制御において非常に重要です。
例えば、workflowRunIdが123のワークフローが現在実行中だとします。
どこまで実行されたかを確認したい場合、下記の方法で確認できます。
curl "https://qstash.upstash.io/v2/workflows/logs?workflowRunId=123" \
-H "Authorization: Bearer <token>"
Next.jsの中で実行する場合は、clientのlogsメソッドが使えます。
import { Client } from "@upstash/workflow";
const client = new Client({ token: "<QSTASH_TOKEN>" });
const { runs } = await client.logs({ workflowRunId: "123" });
ローカルで実行したワークフローを確認したい場合、ローカルのURLを使ってください。
例えば先ほどローカルで実行したワークフローを確認したい場合、下記にコマンドで実行できます。
curl http://localhost:8080/v2/workflows/logs \
-H "Authorization: Bearer you_token"
詳細は下記のクライアントドキュメントを確認してください。
ワークフロー制御
下記のコードは、最初に作成したワークフローです。
await context.runが2つ並んでいるので、2つのステップが順番に実行されるシンプルなワークフローです。
import { serve } from "@upstash/workflow/nextjs"
export const { POST } = serve(
async (context) => {
await context.run("initial-step", () => {
console.log("initial step ran")
})
await context.run("second-step", () => {
console.log("second step ran")
})
}
)
contextには、ワークフロー実行時のすべてのState情報が含まれています。
例えば、APIサーバーからuserIDなどが渡された場合、const { userId } = context.requestPayload;のようにしてuserIdを取得できます。
渡すときは、下記のようにbodyにオブジェクトとして渡します。
const { workflowRunId } = await client.trigger({
keepTriggerConfig: true,
url: "https://<YOUR_WORKFLOW_ENDPOINT>/<YOUR-WORKFLOW-ROUTE>",
body: {
"userId": xxxx
}
また、並行実行したい場合は、下記のように定義することで実行できます。
export const { POST } = serve<string>(
async (context) => {
const input = context.requestPayload;
const promise1 = context.run("step-1", async () => {
return someWork(input);
});
const promise2 = context.run("step-2", async () => {
return someOtherWork(input);
});
await Promise.all([promise1, promise2]);
},
);
詳細は下記のドキュメントを確認してください。
画像生成ワークフローの実装
次に、より実践的なワークフローを作成していきます。
ユーザーが指定したシーンと枚数に基づいて、並行的に画像を生成するワークフローを実装してみます。
AI Elements
AI ElementsはVercel AI SDKのエコシステムの一つで、shadcn/uiのリポジトリにも含まれています。
これをうまく活用すれば、AI SDKと組み合わせることで、チャットシステムのUIを爆速で構築できます。
今回はAI ElementsのWorkflowコンポーネントを使用します。その内部実装にはReact Flowが使われており、OpenAIのAgent BuilderもReact Flowを採用しています。
プロジェクトに戻って、下記のコマンドでWorkflowをインストールします。
npx ai-elements@latest || npx shadcn@latest add @ai-elements/all
npm i @xyflow/react
AI Elementsを使ってワークフローのライブラリーを導入すると、多くのコンポーネントが一緒にインストールされてしまいます。
React Flow単体でもこのワークフローの実装は可能なので、最後にサンプルリポジトリを参考にしてください。
ワークフローの実装
今回はAI SDKとBedrockのプロバイダーを使用します。
下記のライブラリーをインストールしてください。プロバイダーはお好きなものを使用していただいて構いません。
npm install ai @ai-sdk/react zod @ai-sdk/amazon-bedrock
ワークフローの実装は下記の通りです。
import { serve } from "@upstash/workflow/nextjs";
import { generateObject, experimental_generateImage as generateImage } from "ai";
import { createAmazonBedrock } from "@ai-sdk/amazon-bedrock";
import { z } from "zod";
import type {
ImageWorkflowPayload,
ImageGenerationResult,
} from "@/types/workflow";
const bedrock = createAmazonBedrock({
region: "us-east-1",
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
sessionToken: process.env.AWS_SESSION_TOKEN,
});
export const { POST } = serve<ImageWorkflowPayload>(
async (context) => {
const { scene, numberOfImages = 3 } = context.requestPayload;
console.log(`Starting image workflow for scene: "${scene}"`);
// Step 1: Generate prompt variations using Claude with structured output
const prompts = await context.run("generate-prompts", async () => {
try {
const { object } = await generateObject({
model: bedrock("us.anthropic.claude-sonnet-4-5-20250929-v1:0"),
output: "array",
schema: z.object({
prompt: z
.string()
.describe(
"A detailed 1-2 sentence image generation prompt describing the scene vividly",
),
aspect: z
.string()
.describe(
"Brief description of what unique aspect or interpretation of the scene this prompt captures",
),
}),
prompt: `You are a creative AI that generates diverse image prompts. Given a scene description, create ${numberOfImages} distinct and creative variations of image prompts that capture different aspects, perspectives, or interpretations of the scene.
Scene: "${scene}"
Generate ${numberOfImages} unique prompts, each focusing on a different:
- Time of day or lighting
- Emotional tone or atmosphere
- Visual style or artistic interpretation
- Specific detail or element of the scene
- Camera angle or composition
Each prompt should be vivid and descriptive (1-2 sentences) to generate high-quality images.`,
});
console.log(
"Generated prompts:",
object.map((p) => `${p.aspect}: ${p.prompt}`),
);
// Extract just the prompts from the structured output
return object.map((item) => item.prompt);
} catch (error) {
console.error("Failed to generate prompts:", error);
throw error;
}
});
console.log(`Generated ${prompts.length} prompts, starting image generation`);
// Step 2-N: Generate images in parallel for each prompt
const images = await Promise.all(
prompts.map(async (prompt, index) => {
return await context.run(`generate-image-${index + 1}`, async () => {
try {
console.log(
`Generating image ${index + 1}/${prompts.length}: ${prompt.substring(0, 50)}...`,
);
const { image } = await generateImage({
model: bedrock.image("amazon.nova-canvas-v1:0"),
prompt: prompt,
size: "512x512",
// Bedrock seed must be <= 2147483646 (32-bit int max)
// Use modulo to ensure it stays within range while maintaining uniqueness
seed: (Date.now() % 1000000) * 1000 + index,
providerOptions: {
bedrock: {
quality: "standard",
cfgScale: 7.5,
},
},
});
// Convert image to base64
const base64 = image.base64;
console.log(
`Successfully generated image ${index + 1}/${prompts.length}`,
);
return {
prompt,
imageBase64: base64,
index: index + 1,
};
} catch (error) {
console.error(`Failed to generate image ${index + 1}:`, error);
throw error;
}
});
}),
);
// Final result
const result: ImageGenerationResult = {
scene,
images,
generatedAt: Date.now(),
};
console.log(
`Workflow completed successfully! Generated ${images.length} images for scene: "${scene}"`,
);
return result;
},
{
retries: 2,
verbose: true,
},
);
処理の流れを解説します。
ユーザーからプロンプトと枚数を受け取り、最初のワークフローステップで枚数分のプロンプトを生成します。そして、生成されたプロンプトごとに並行ステップを作成し、それらを並行実行します。
サーバーアクションの実装では、2つの関数を実装します。
1.ワークフローを起動する関数: ワークフローを起動し、workflowRunIdをクライアントに返却する
// Trigger workflow response
export interface TriggerWorkflowResponse {
workflowRunId: string;
success: boolean;
error?: string;
}
/**
* Triggers the image generation workflow
*/
export async function triggerImageWorkflow(
scene: string,
numberOfImages: number = 3
): Promise<TriggerWorkflowResponse> {
const token = process.env.QSTASH_TOKEN;
if (!token) {
return {
workflowRunId: "",
success: false,
error: "QSTASH_TOKEN environment variable is not set",
};
}
try {
const client = new Client({ token });
const { workflowRunId } = await client.trigger({
// BASE_URLの実装は前の解説を参照してください。
url: `${BASE_URL}/api/generateImage`,
body: {
scene,
numberOfImages,
},
});
return {
workflowRunId,
success: true,
};
} catch (error) {
return {
workflowRunId: "",
success: false,
error: error instanceof Error ? error.message : "Unknown error",
};
}
}
2. ワークフローの状態を監視する関数: ワークフロー全体と各ステップの実行状態を返却します。
// Workflow step states
export type StepState =
| "STEP_PENDING"
| "STEP_PROGRESS"
| "STEP_SUCCESS"
| "STEP_RETRY"
| "STEP_FAILED";
// Workflow run states
export type WorkflowRunState =
| "RUN_STARTED"
| "RUN_SUCCESS"
| "RUN_FAILED"
| "RUN_CANCELED";
// Individual step information
export interface WorkflowStep {
stepName: string;
state: StepState;
out?: string; // Output from the step
messageId?: string;
}
// Workflow status response
export interface WorkflowStatus {
workflowRunId: string;
workflowUrl: string;
workflowState: WorkflowRunState;
workflowRunCreatedAt: number;
workflowRunCompletedAt?: number;
steps: WorkflowStep[];
progress: {
completed: number;
total: number;
percentage: number;
};
result?: ImageGenerationResult;
partialImages: GeneratedImage[]; // Images completed so far
isComplete: boolean; // Whether workflow has finished (success/failed/canceled)
}
// Generated image result
export interface GeneratedImage {
prompt: string;
imageBase64: string;
index: number;
}
// Final workflow result
export interface ImageGenerationResult {
scene: string;
images: GeneratedImage[];
generatedAt: number;
}
// Trigger workflow response
export interface TriggerWorkflowResponse {
workflowRunId: string;
success: boolean;
error?: string;
}
/**
* Gets the status of a workflow run, including partial results
*/
export async function getWorkflowStatus(
workflowRunId: string
): Promise<WorkflowStatus | null> {
const token = process.env.QSTASH_TOKEN;
if (!token) {
console.error("QSTASH_TOKEN not set");
return null;
}
try {
const client = new Client({ token });
const { runs } = await client.logs({
workflowRunId,
count: 1,
});
if (!runs || runs.length === 0) {
return null;
}
const run = runs[0];
// Parse steps from the workflow run
const steps: WorkflowStep[] = [];
const partialImages: GeneratedImage[] = [];
let completedSteps = 0;
// Process step groups (both sequential and parallel)
if (run.steps) {
for (const stepGroup of run.steps) {
if (stepGroup.type === "sequential" && stepGroup.steps) {
for (const step of stepGroup.steps) {
const stepInfo: WorkflowStep = {
stepName: step.stepName || "unknown",
state: (step.state as StepState) || "STEP_PENDING",
out: step.out as string | undefined,
messageId: step.messageId as string | undefined,
};
steps.push(stepInfo);
if (step.state === "STEP_SUCCESS") {
completedSteps++;
// Extract image data from completed image generation steps
if (
step.stepName?.startsWith("generate-image-") &&
step.out
) {
try {
const imageData = JSON.parse(
step.out as string
) as GeneratedImage;
partialImages.push(imageData);
} catch (e) {
console.error("Failed to parse image data:", e);
}
}
}
}
} else if (stepGroup.type === "parallel" && stepGroup.steps) {
for (const step of stepGroup.steps) {
const stepInfo: WorkflowStep = {
stepName: step.stepName || "unknown",
state: (step.state as StepState) || "STEP_PENDING",
out: step.out as string | undefined,
messageId: step.messageId as string | undefined,
};
steps.push(stepInfo);
if (step.state === "STEP_SUCCESS") {
completedSteps++;
// Extract image data from completed image generation steps
if (
step.stepName?.startsWith("generate-image-") &&
step.out
) {
try {
const imageData = JSON.parse(
step.out as string
) as GeneratedImage;
partialImages.push(imageData);
} catch (e) {
console.error("Failed to parse image data:", e);
}
}
}
}
}
}
}
// Sort partial images by index
partialImages.sort((a, b) => a.index - b.index);
// Calculate progress
const totalSteps = steps.length || 1;
const percentage = Math.round((completedSteps / totalSteps) * 100);
// Check if workflow is complete
const isComplete =
run.workflowState === "RUN_SUCCESS" ||
run.workflowState === "RUN_FAILED" ||
run.workflowState === "RUN_CANCELED";
return {
workflowRunId: run.workflowRunId,
workflowUrl: run.workflowUrl,
workflowState: run.workflowState,
workflowRunCreatedAt: run.workflowRunCreatedAt,
workflowRunCompletedAt: run.workflowRunCompletedAt,
steps,
progress: {
completed: completedSteps,
total: totalSteps,
percentage,
},
result: run.workflowRunResponse as ImageGenerationResult | undefined,
partialImages,
isComplete,
};
} catch (error) {
console.error("Failed to fetch workflow status:", error);
return null;
}
}
クライアントの実装
今回はワークフローのステータスを監視するために、2秒ごとにポーリング処理を実行しています。
ただし、実際に複数のワークフローを同時実行する場合は、リロードボタンを追加して、クリック時に最新の状態を取得する実装も有効です。
app/generate-image/page.tsx
"use client";
import { useState, useEffect } from "react";
import { triggerImageWorkflow, getWorkflowStatus } from "@/app/actions";
import type { WorkflowStatus, GeneratedImage } from "@/types/workflow";
import { WorkflowVisualizer } from "@/components/workflow-visualizer";
import { Button } from "@/components/ui/button";
import { Input } from "@/components/ui/input";
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card";
import { Progress } from "@/components/ui/progress";
import { Badge } from "@/components/ui/badge";
import { Loader2, Sparkles } from "lucide-react";
export default function GenerateImagePage() {
// Form state
const [scene, setScene] = useState("");
const [numberOfImages, setNumberOfImages] = useState(3);
const [loading, setLoading] = useState(false);
// Workflow state
const [workflowRunId, setWorkflowRunId] = useState<string | null>(null);
const [status, setStatus] = useState<WorkflowStatus | null>(null);
const [generatedImages, setGeneratedImages] = useState<GeneratedImage[]>([]);
const [error, setError] = useState<string | null>(null);
// Poll workflow status
useEffect(() => {
if (!workflowRunId) return;
let intervalId: NodeJS.Timeout;
const pollStatus = async () => {
const newStatus = await getWorkflowStatus(workflowRunId);
if (newStatus) {
setStatus(newStatus);
// Update generated images with partial results
if (newStatus.partialImages.length > 0) {
setGeneratedImages(newStatus.partialImages);
}
// Stop polling if workflow is complete
if (newStatus.isComplete) {
setLoading(false);
if (newStatus.result?.images) {
setGeneratedImages(newStatus.result.images);
}
// Stop polling immediately
clearInterval(intervalId);
return;
}
}
};
// Poll every 2 seconds
intervalId = setInterval(pollStatus, 2000);
// Initial poll
pollStatus();
return () => clearInterval(intervalId);
}, [workflowRunId]);
// Handle form submission
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
if (!scene.trim()) {
setError("Please enter a scene description");
return;
}
setLoading(true);
setError(null);
setWorkflowRunId(null);
setStatus(null);
setGeneratedImages([]);
const response = await triggerImageWorkflow(scene, numberOfImages);
if (response.success && response.workflowRunId) {
setWorkflowRunId(response.workflowRunId);
} else {
setError(response.error || "Failed to start workflow");
setLoading(false);
}
};
return (
<div className="container mx-auto max-w-7xl p-4 md:p-8">
<div className="mb-8">
<h1 className="flex items-center gap-2 text-3xl font-bold">
<Sparkles className="h-8 w-8 text-primary" />
AI Image Generation Workflow
</h1>
<p className="mt-2 text-muted-foreground">
Generate multiple AI images from a scene description using Upstash Workflow, Claude, and Amazon Nova
</p>
</div>
{/* Form */}
<Card className="mb-8">
<CardHeader>
<CardTitle>Generate Images</CardTitle>
<CardDescription>
Describe a scene and choose how many images to generate
</CardDescription>
</CardHeader>
<CardContent>
<form onSubmit={handleSubmit} className="space-y-4">
<div className="space-y-2">
<label htmlFor="scene" className="text-sm font-medium">
Scene Description
</label>
<Input
id="scene"
placeholder="e.g., A serene Japanese garden with cherry blossoms"
value={scene}
onChange={(e) => setScene(e.target.value)}
disabled={loading}
className="w-full"
/>
</div>
<div className="space-y-2">
<label htmlFor="numberOfImages" className="text-sm font-medium">
Number of Images: {numberOfImages}
</label>
<input
id="numberOfImages"
type="range"
min="1"
max="5"
value={numberOfImages}
onChange={(e) => setNumberOfImages(parseInt(e.target.value))}
disabled={loading}
className="w-full"
/>
<p className="text-xs text-muted-foreground">
Generate between 1-5 images (more images = longer processing time)
</p>
</div>
<Button type="submit" disabled={loading} className="w-full">
{loading ? (
<>
<Loader2 className="mr-2 h-4 w-4 animate-spin" />
Generating...
</>
) : (
<>
<Sparkles className="mr-2 h-4 w-4" />
Generate Images
</>
)}
</Button>
</form>
{error && (
<div className="mt-4 rounded-md bg-red-50 p-3 text-sm text-red-600">
{error}
</div>
)}
</CardContent>
</Card>
{/* Progress and Results */}
{status && (
<div className="space-y-6">
{/* Progress Section */}
<Card>
<CardHeader>
<CardTitle className="flex items-center justify-between">
<span>Workflow Progress</span>
<Badge
variant={
status.workflowState === "RUN_SUCCESS"
? "default"
: status.workflowState === "RUN_FAILED"
? "destructive"
: "secondary"
}
>
{status.workflowState.replace("RUN_", "")}
</Badge>
</CardTitle>
<CardDescription>
Run ID: {status.workflowRunId}
</CardDescription>
</CardHeader>
<CardContent className="space-y-4">
<div>
<div className="mb-2 flex items-center justify-between text-sm">
<span className="font-medium">Overall Progress</span>
<span className="text-muted-foreground">
{status.progress.percentage}% ({status.progress.completed}/{status.progress.total} steps)
</span>
</div>
<Progress value={status.progress.percentage} className="h-2" />
</div>
</CardContent>
</Card>
{/* Workflow Visualization */}
<Card>
<CardHeader>
<CardTitle>Workflow Visualization</CardTitle>
<CardDescription>
Real-time visualization of the workflow execution
</CardDescription>
</CardHeader>
<CardContent>
<WorkflowVisualizer
status={status}
scene={scene}
numberOfImages={numberOfImages}
/>
</CardContent>
</Card>
{/* Generated Images Section */}
<Card>
<CardHeader>
<CardTitle>Generated Images</CardTitle>
<CardDescription>
{generatedImages.length > 0
? `${generatedImages.length} of ${numberOfImages} images completed`
: "Images will appear here as they are generated"}
</CardDescription>
</CardHeader>
<CardContent>
<div className="grid grid-cols-1 gap-4 md:grid-cols-2 lg:grid-cols-3">
{/* Show placeholder loading cards */}
{Array.from({ length: numberOfImages }).map((_, index) => {
const image = generatedImages[index];
if (image) {
return (
<Card key={index} className="overflow-hidden">
<img
src={`data:image/png;base64,${image.imageBase64}`}
alt={image.prompt}
className="h-64 w-full object-cover"
/>
<CardContent className="p-3">
<p className="line-clamp-2 text-xs text-muted-foreground">
{image.prompt}
</p>
</CardContent>
</Card>
);
}
return (
<Card
key={index}
className="flex h-64 items-center justify-center border-dashed"
>
<div className="text-center">
<Loader2 className="mx-auto mb-2 h-8 w-8 animate-spin text-muted-foreground" />
<p className="text-sm text-muted-foreground">
Generating image {index + 1}...
</p>
</div>
</Card>
);
})}
</div>
</CardContent>
</Card>
</div>
)}
{/* Empty State */}
{!status && !loading && (
<Card className="border-dashed">
<CardContent className="flex h-64 items-center justify-center">
<div className="text-center">
<Sparkles className="mx-auto mb-4 h-12 w-12 text-muted-foreground" />
<p className="text-lg font-medium">Ready to generate images</p>
<p className="text-sm text-muted-foreground">
Enter a scene description above to get started
</p>
</div>
</CardContent>
</Card>
)}
</div>
);
}
ワークフローのUIコンポーネント実装
実装内容が多くなるため、別コンポーネントとして切り出して実装します。
components/workflow-visualizer.tsx
'use client';
import { useMemo } from 'react';
import {
type Node as FlowNode,
type Edge as FlowEdge,
MarkerType,
} from '@xyflow/react';
import '@xyflow/react/dist/style.css';
import type { WorkflowStatus, StepState } from '@/types/workflow';
import { Canvas } from '@/components/ai-elements/canvas';
import { Node, NodeHeader, NodeTitle, NodeContent, NodeFooter } from '@/components/ai-elements/node';
import { Edge } from '@/components/ai-elements/edge';
import { Badge } from '@/components/ui/badge';
import { cn } from '@/lib/utils';
// Custom Node Component using AI Elements
function WorkflowNode({ data }: { data: any }) {
const stateColors = {
STEP_PENDING: 'bg-gray-50 border-gray-300',
STEP_PROGRESS: 'bg-blue-50 border-blue-400 shadow-lg',
STEP_SUCCESS: 'bg-green-50 border-green-400',
STEP_RETRY: 'bg-yellow-50 border-yellow-400',
STEP_FAILED: 'bg-red-50 border-red-400',
};
const stateIcons = {
STEP_PENDING: '⏳',
STEP_PROGRESS: '🔄',
STEP_SUCCESS: '✅',
STEP_RETRY: '🔁',
STEP_FAILED: '❌',
};
const stateBadgeVariants: Record<StepState, 'default' | 'secondary' | 'destructive' | 'outline'> = {
STEP_PENDING: 'secondary',
STEP_PROGRESS: 'default',
STEP_SUCCESS: 'default',
STEP_RETRY: 'secondary',
STEP_FAILED: 'destructive',
};
const colorClass = stateColors[data.state as StepState] || stateColors.STEP_PENDING;
const icon = stateIcons[data.state as StepState] || '⏳';
return (
<Node
handles={{ target: data.hasTarget, source: data.hasSource }}
className={cn(
'min-w-[250px] border-2 transition-all duration-300',
colorClass,
data.state === 'STEP_PROGRESS' && 'animate-pulse'
)}
>
<NodeHeader>
<div className="flex items-center justify-between gap-2">
<div className="flex items-center gap-2">
<span className="text-xl">{icon}</span>
<NodeTitle className="text-sm font-semibold">{data.label}</NodeTitle>
</div>
<Badge variant={stateBadgeVariants[data.state as StepState] || 'outline'} className="text-xs">
{data.state.replace('STEP_', '')}
</Badge>
</div>
{data.description && (
<p className="text-xs text-muted-foreground mt-1">{data.description}</p>
)}
</NodeHeader>
{data.content && (
<NodeContent>
{data.imageBase64 ? (
<img
src={`data:image/png;base64,${data.imageBase64}`}
alt="Generated"
className="w-full h-auto rounded"
/>
) : (
<p className="text-xs text-gray-700 whitespace-pre-line">{data.content}</p>
)}
</NodeContent>
)}
{data.footer && (
<NodeFooter>
<p className="text-xs text-muted-foreground">{data.footer}</p>
</NodeFooter>
)}
</Node>
);
}
const nodeTypes = {
workflow: WorkflowNode,
};
const edgeTypes = {
animated: Edge.Animated,
};
interface WorkflowVisualizerProps {
status: WorkflowStatus | null;
scene: string;
numberOfImages: number;
}
export function WorkflowVisualizer({ status, scene, numberOfImages }: WorkflowVisualizerProps) {
// Generate nodes from workflow status
const nodes = useMemo<FlowNode[]>(() => {
if (!status) {
return [];
}
const generatedNodes: FlowNode[] = [];
// Start Node
generatedNodes.push({
id: 'start',
type: 'workflow',
position: { x: 0, y: 200 },
data: {
label: 'Workflow Start',
description: 'User input received',
state: 'STEP_SUCCESS',
content: `Scene: "${scene}"\nImages to generate: ${numberOfImages}`,
footer: `Started at ${new Date(status.workflowRunCreatedAt).toLocaleTimeString()}`,
hasTarget: false,
hasSource: true,
},
});
// Prompts Node
const promptStep = status.steps.find(s => s.stepName === 'generate-prompts');
generatedNodes.push({
id: 'prompts',
type: 'workflow',
position: { x: 400, y: 200 },
data: {
label: 'Generate Prompts',
description: 'Claude AI prompt generation',
state: promptStep?.state || 'STEP_PENDING',
content: promptStep?.state === 'STEP_SUCCESS'
? `Generated ${numberOfImages} unique prompts`
: 'Generating prompt variations...',
footer: 'Model: Claude Sonnet 4.5',
hasTarget: true,
hasSource: true,
},
});
// Image Generation Nodes (parallel)
for (let i = 1; i <= numberOfImages; i++) {
const imageStep = status.steps.find(s => s.stepName === `generate-image-${i}`);
const partialImage = status.partialImages.find(img => img.index === i);
// Calculate vertical position for parallel nodes
const yOffset = (i - 1 - (numberOfImages - 1) / 2) * 220;
generatedNodes.push({
id: `image-${i}`,
type: 'workflow',
position: { x: 900, y: 200 + yOffset },
data: {
label: `Image ${i}`,
description: `Nova Canvas generation`,
state: imageStep?.state || 'STEP_PENDING',
content: partialImage?.prompt || 'Waiting for prompt...',
imageBase64: partialImage?.imageBase64,
footer: imageStep?.state === 'STEP_SUCCESS'
? '✓ Generated'
: imageStep?.state === 'STEP_PROGRESS'
? 'Generating...'
: 'Pending',
hasTarget: true,
hasSource: true,
},
});
}
// Complete Node
generatedNodes.push({
id: 'complete',
type: 'workflow',
position: { x: 1400, y: 200 },
data: {
label: 'Workflow Complete',
description: 'Final result aggregation',
state: status.isComplete
? (status.workflowState === 'RUN_SUCCESS' ? 'STEP_SUCCESS' : 'STEP_FAILED')
: 'STEP_PENDING',
content: status.isComplete
? `${status.partialImages.length} images generated successfully`
: `Progress: ${status.progress.percentage}%`,
footer: status.workflowRunCompletedAt
? `Completed at ${new Date(status.workflowRunCompletedAt).toLocaleTimeString()}`
: 'In progress...',
hasTarget: true,
hasSource: false,
},
});
return generatedNodes;
}, [status, scene, numberOfImages]);
// Generate edges from workflow status
const edges = useMemo<FlowEdge[]>(() => {
if (!status) {
return [];
}
const generatedEdges: FlowEdge[] = [];
// Start → Prompts
const promptStep = status.steps.find(s => s.stepName === 'generate-prompts');
const isPromptActive = promptStep?.state === 'STEP_PROGRESS' || promptStep?.state === 'STEP_SUCCESS';
generatedEdges.push({
id: 'edge-start-prompts',
source: 'start',
target: 'prompts',
type: 'animated',
animated: isPromptActive,
style: {
stroke: promptStep?.state === 'STEP_SUCCESS' ? '#10b981' : '#3b82f6',
strokeWidth: 2.5,
},
markerEnd: {
type: MarkerType.ArrowClosed,
color: promptStep?.state === 'STEP_SUCCESS' ? '#10b981' : '#3b82f6',
},
});
// Prompts → Images (parallel)
for (let i = 1; i <= numberOfImages; i++) {
const imageStep = status.steps.find(s => s.stepName === `generate-image-${i}`);
const isActive = imageStep?.state === 'STEP_PROGRESS' || imageStep?.state === 'STEP_SUCCESS';
const color = imageStep?.state === 'STEP_SUCCESS' ? '#10b981' :
imageStep?.state === 'STEP_FAILED' ? '#ef4444' : '#3b82f6';
generatedEdges.push({
id: `edge-prompts-image-${i}`,
source: 'prompts',
target: `image-${i}`,
type: 'animated',
animated: isActive,
style: {
stroke: color,
strokeWidth: 2.5,
strokeDasharray: imageStep?.state === 'STEP_SUCCESS' ? undefined : '5,5',
},
markerEnd: {
type: MarkerType.ArrowClosed,
color,
},
});
}
// Images → Complete
for (let i = 1; i <= numberOfImages; i++) {
const imageStep = status.steps.find(s => s.stepName === `generate-image-${i}`);
const isComplete = imageStep?.state === 'STEP_SUCCESS';
const color = isComplete ? '#10b981' : '#9ca3af';
generatedEdges.push({
id: `edge-image-${i}-complete`,
source: `image-${i}`,
target: 'complete',
type: 'animated',
animated: isComplete && status.isComplete,
style: {
stroke: color,
strokeWidth: 2,
strokeDasharray: isComplete ? undefined : '5,5',
},
markerEnd: {
type: MarkerType.ArrowClosed,
color,
},
});
}
return generatedEdges;
}, [status, numberOfImages]);
if (!status) {
return (
<div className="w-full h-[600px] flex items-center justify-center bg-gray-50 rounded-lg border-2 border-dashed border-gray-300">
<p className="text-gray-500">No workflow data available</p>
</div>
);
}
return (
<div className="w-full h-[600px] border rounded-lg overflow-hidden">
<Canvas
nodes={nodes}
edges={edges}
nodeTypes={nodeTypes}
edgeTypes={edgeTypes}
/>
</div>
);
}
最後に
Upstash Workflowは本当によくできており、制御しやすく、非同期のワークフローも簡単に作成できます。
また、最近Vercel社もワークフロー機能をリリースしたので、そのうち触ってみようと思います。
今回実際に使用したリポジトリは下記に置いておきます。






