38
26

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

FramePackに動画を生成させまくる話

Last updated at Posted at 2025-04-18

はじめに

FramePack、めっちゃ話題になってますね。

効率的に長尺の動画生成を実現するために、入力フレームを固定調のコンテキストに圧縮する、生成時に過去と未来のフレーム情報を同時に考慮する、一度に全フレームを生成するのではなく、コンテキストを参照しながら逐次生成など、いろいろな工夫がされており、低VRAMで長時間の動画生成が可能なのが特徴です。

HPでは、GUIで動画生成するツールが用意されていますが、一気に大量に生成するためにコマンドラインで動かす方法を試してみました。

Pinokioでの作業

本記事では、FramepackのインストールにPinokioというツールを使います。使い方については、過去の記事をご覧ください。アプリを起動したら、DiscoverページからFramePackを検索し、ダウンロード、インストールしてください。

image.png

インストールが終わったら、GUI画面が出るので、ここで一回動画を生成させて動作確認してください(サンプルはHPを参照)

image.png

CLI実行の準備

次に管理者権限でコマンドプロンプトを開き、C:\pinokio\api\Frame-Pack.git\appへ移動しましょう。

まず、以下のコマンドを実行して仮想環境を有効にします(これは新しいコマンドプロンプトを開くごとに必要です)。

./env/Scripts/activate

(env) とプロンプトの先頭に表示され、仮想環境が有効になっていることを確認してください。

次に、必要な Python パッケージをインストールします。以下のコマンドを順に実行してください。

まず pip を更新し、devicetorch をインストールします。(これは元の requirements.txt には含まれていません)

pip install --upgrade pip
pip install devicetorch

次に、app ディレクトリにある requirements.txt ファイルを使って基本的な依存関係(gradio, diffusers, transformers など)をインストールします。

pip install -r requirements.txt

(任意) 高速化のための追加ライブラリ:

実行速度を向上させるために、xformersflash-attn のインストールが推奨されます。

pip install xformers
pip install https://huggingface.co/lldacing/flash-attention-windows-wheel/resolve/main/flash_attn-2.7.4%2Bcu126torch2.6.0cxx11abiFALSE-cp310-cp310-win_amd64.whl?download=true

注意: flash-attn は特定の CUDA/PyTorch/Python バージョン (CUDA 12.6, PyTorch 2.6.0, Python 3.10) 向けにビルドされています。環境が異なる場合は、公式ドキュメントを参照して適切なバージョンをインストールするか、ソースからビルドする必要があります。

さらに、以下のライブラリも検討できます。

pip install triton-windows
pip install https://github.com/deepbeepmeep/SageAttention/raw/refs/heads/main/releases/sageattention-2.1.0-cp310-cp310-win_amd64.whl

(注意: SageAttentionflash-attnxformers と機能が重複する可能性があります。また、triton-windows の必要性は環境によります。)

次に、以下のスクリプトをappフォルダにコピーしてください。

run.py
import os
os.environ['HF_HOME'] = os.path.abspath(os.path.realpath(os.path.join(os.path.dirname(__file__), './hf_download')))

import torch
import traceback
import einops
import numpy as np
import argparse
import math
from PIL import Image

from diffusers import AutoencoderKLHunyuanVideo
from transformers import LlamaModel, CLIPTextModel, LlamaTokenizerFast, CLIPTokenizer
from diffusers_helper.hunyuan import encode_prompt_conds, vae_decode, vae_encode, vae_decode_fake
from diffusers_helper.utils import save_bcthw_as_mp4, crop_or_pad_yield_mask, soft_append_bcthw, resize_and_center_crop, generate_timestamp
from diffusers_helper.models.hunyuan_video_packed import HunyuanVideoTransformer3DModelPacked
from diffusers_helper.pipelines.k_diffusion_hunyuan import sample_hunyuan
from diffusers_helper.memory import cpu, gpu, get_cuda_free_memory_gb, move_model_to_device_with_memory_preservation, offload_model_from_device_for_memory_preservation, fake_diffusers_current_device, DynamicSwapInstaller, unload_complete_models, load_model_as_complete
from transformers import SiglipImageProcessor, SiglipVisionModel
from diffusers_helper.clip_vision import hf_clip_vision_encode
from diffusers_helper.bucket_tools import find_nearest_bucket


def parse_args():
    parser = argparse.ArgumentParser(description="Generate video from image and prompt using FramePack.")
    parser.add_argument('--image_path', type=str, required=True, default="C:\\Users\\vzb10\\Pictures\\Screenshots\\enami1.png", help='Path to the input image.')
    parser.add_argument('--prompt', type=str, required=True, default="男が壁に聞き耳を立てている様子", help='Text prompt describing the desired video content.')
    parser.add_argument('--output_dir', type=str, default='./outputs/', help='Directory to save the output video and intermediate files.')
    parser.add_argument('--n_prompt', type=str, default="", help='Negative text prompt.')
    parser.add_argument('--seed', type=int, default=31337, help='Random seed for generation.')
    parser.add_argument('--total_second_length', type=float, default=5.0, help='Total length of the video in seconds.')
    parser.add_argument('--latent_window_size', type=int, default=9, help='Latent window size (should not change).')
    parser.add_argument('--steps', type=int, default=25, help='Number of sampling steps.')
    parser.add_argument('--cfg', type=float, default=1.0, help='CFG Scale (should not change).')
    parser.add_argument('--gs', type=float, default=10.0, help='Distilled CFG Scale.')
    parser.add_argument('--rs', type=float, default=0.0, help='CFG Re-Scale (should not change).')
    parser.add_argument('--gpu_memory_preservation', type=float, default=6.0, help='GPU memory preservation in GB.')
    parser.add_argument('--use_teacache', action='store_true', default=True, help='Use TeaCache for potentially faster generation.')
    parser.add_argument('--no_teacache', action='store_false', dest='use_teacache', help='Do not use TeaCache.')

    args = parser.parse_args()
    return args

@torch.no_grad()
def worker(input_image, prompt, n_prompt, seed, total_second_length, latent_window_size, steps, cfg, gs, rs, gpu_memory_preservation, use_teacache, outputs_folder, high_vram, text_encoder, text_encoder_2, tokenizer, tokenizer_2, vae, feature_extractor, image_encoder, transformer):
    total_latent_sections = (total_second_length * 30) / (latent_window_size * 4)
    total_latent_sections = int(max(round(total_latent_sections), 1))

    job_id = generate_timestamp()
    output_filename_final = None

    print("Starting video generation...")

    try:
        # Clean GPU
        if not high_vram:
            unload_complete_models(
                text_encoder, text_encoder_2, image_encoder, vae, transformer
            )

        # Text encoding
        print("Text encoding ...")
        if not high_vram:
            fake_diffusers_current_device(text_encoder, gpu)
            load_model_as_complete(text_encoder_2, target_device=gpu)

        llama_vec, clip_l_pooler = encode_prompt_conds(prompt, text_encoder, text_encoder_2, tokenizer, tokenizer_2)

        if cfg == 1:
            llama_vec_n, clip_l_pooler_n = torch.zeros_like(llama_vec), torch.zeros_like(clip_l_pooler)
        else:
            llama_vec_n, clip_l_pooler_n = encode_prompt_conds(n_prompt, text_encoder, text_encoder_2, tokenizer, tokenizer_2)

        llama_vec, llama_attention_mask = crop_or_pad_yield_mask(llama_vec, length=512)
        llama_vec_n, llama_attention_mask_n = crop_or_pad_yield_mask(llama_vec_n, length=512)

        # Processing input image
        print("Image processing ...")
        input_image_np = np.array(input_image)
        H, W, C = input_image_np.shape
        height, width = find_nearest_bucket(H, W, resolution=640)
        input_image_np = resize_and_center_crop(input_image_np, target_width=width, target_height=height)

        Image.fromarray(input_image_np).save(os.path.join(outputs_folder, f'{job_id}_input.png'))

        input_image_pt = torch.from_numpy(input_image_np).float() / 127.5 - 1
        input_image_pt = input_image_pt.permute(2, 0, 1)[None, :, None]

        # VAE encoding
        print("VAE encoding ...")
        if not high_vram:
            load_model_as_complete(vae, target_device=gpu)

        start_latent = vae_encode(input_image_pt, vae)

        # CLIP Vision
        print("CLIP Vision encoding ...")
        if not high_vram:
            load_model_as_complete(image_encoder, target_device=gpu)

        image_encoder_output = hf_clip_vision_encode(input_image_np, feature_extractor, image_encoder)
        image_encoder_last_hidden_state = image_encoder_output.last_hidden_state

        # Dtype
        llama_vec = llama_vec.to(transformer.dtype)
        llama_vec_n = llama_vec_n.to(transformer.dtype)
        clip_l_pooler = clip_l_pooler.to(transformer.dtype)
        clip_l_pooler_n = clip_l_pooler_n.to(transformer.dtype)
        image_encoder_last_hidden_state = image_encoder_last_hidden_state.to(transformer.dtype)

        # Sampling
        print("Start sampling ...")
        rnd = torch.Generator("cpu").manual_seed(seed)
        num_frames = latent_window_size * 4 - 3

        history_latents = torch.zeros(size=(1, 16, 1 + 2 + 16, height // 8, width // 8), dtype=torch.float32).cpu()
        history_pixels = None
        total_generated_latent_frames = 0

        latent_paddings = reversed(range(total_latent_sections))

        if total_latent_sections > 4:
            latent_paddings = [3] + [2] * (total_latent_sections - 3) + [1, 0]

        for i, latent_padding in enumerate(latent_paddings):
            is_last_section = latent_padding == 0
            latent_padding_size = latent_padding * latent_window_size

            print(f"Generating section {i+1}/{total_latent_sections} (padding={latent_padding}, last={is_last_section})...")
            # print(f'latent_padding_size = {latent_padding_size}, is_last_section = {is_last_section}')

            indices = torch.arange(0, sum([1, latent_padding_size, latent_window_size, 1, 2, 16])).unsqueeze(0)
            clean_latent_indices_pre, blank_indices, latent_indices, clean_latent_indices_post, clean_latent_2x_indices, clean_latent_4x_indices = indices.split([1, latent_padding_size, latent_window_size, 1, 2, 16], dim=1)
            clean_latent_indices = torch.cat([clean_latent_indices_pre, clean_latent_indices_post], dim=1)

            clean_latents_pre = start_latent.to(history_latents)
            clean_latents_post, clean_latents_2x, clean_latents_4x = history_latents[:, :, :1 + 2 + 16, :, :].split([1, 2, 16], dim=2)
            clean_latents = torch.cat([clean_latents_pre, clean_latents_post], dim=2)

            if not high_vram:
                unload_complete_models()
                move_model_to_device_with_memory_preservation(transformer, target_device=gpu, preserved_memory_gb=gpu_memory_preservation)

            if use_teacache:
                transformer.initialize_teacache(enable_teacache=True, num_steps=steps)
            else:
                transformer.initialize_teacache(enable_teacache=False)

            def callback(d):
                current_step = d['i'] + 1
                percentage = int(100.0 * current_step / steps)
                print(f"  Sampling Step: {current_step}/{steps} ({percentage}%)", end='\r')
                # No preview generation needed for CLI
                return

            generated_latents = sample_hunyuan(
                transformer=transformer,
                sampler='unipc',
                width=width,
                height=height,
                frames=num_frames,
                real_guidance_scale=cfg,
                distilled_guidance_scale=gs,
                guidance_rescale=rs,
                num_inference_steps=steps,
                generator=rnd,
                prompt_embeds=llama_vec,
                prompt_embeds_mask=llama_attention_mask,
                prompt_poolers=clip_l_pooler,
                negative_prompt_embeds=llama_vec_n,
                negative_prompt_embeds_mask=llama_attention_mask_n,
                negative_prompt_poolers=clip_l_pooler_n,
                device=gpu,
                dtype=torch.bfloat16,
                image_embeddings=image_encoder_last_hidden_state,
                latent_indices=latent_indices,
                clean_latents=clean_latents,
                clean_latent_indices=clean_latent_indices,
                clean_latents_2x=clean_latents_2x,
                clean_latent_2x_indices=clean_latent_2x_indices,
                clean_latents_4x=clean_latents_4x,
                clean_latent_4x_indices=clean_latent_4x_indices,
                callback=callback,
            )
            print() # Newline after step progress

            if is_last_section:
                generated_latents = torch.cat([start_latent.to(generated_latents), generated_latents], dim=2)

            total_generated_latent_frames += int(generated_latents.shape[2])
            history_latents = torch.cat([generated_latents.to(history_latents), history_latents], dim=2)

            if not high_vram:
                offload_model_from_device_for_memory_preservation(transformer, target_device=gpu, preserved_memory_gb=8)
                load_model_as_complete(vae, target_device=gpu)

            real_history_latents = history_latents[:, :, :total_generated_latent_frames, :, :]

            print("  Decoding latents...")
            if history_pixels is None:
                history_pixels = vae_decode(real_history_latents, vae).cpu()
            else:
                section_latent_frames = (latent_window_size * 2 + 1) if is_last_section else (latent_window_size * 2)
                overlapped_frames = latent_window_size * 4 - 3

                current_pixels = vae_decode(real_history_latents[:, :, :section_latent_frames], vae).cpu()
                history_pixels = soft_append_bcthw(current_pixels, history_pixels, overlapped_frames)

            if not high_vram:
                unload_complete_models()

            output_filename_final = os.path.join(outputs_folder, f'{job_id}.mp4') # Use a consistent final name
            print(f"  Saving video segment to {output_filename_final}...")
            save_bcthw_as_mp4(history_pixels, output_filename_final, fps=30)

            print(f"  Decoded. Current latent shape {real_history_latents.shape}; pixel shape {history_pixels.shape}")

            if is_last_section:
                break

    except Exception as e:
        traceback.print_exc()
        print(f"Error during generation: {e}")

    finally:
        if not high_vram:
            print("Unloading models...")
            unload_complete_models(
                text_encoder, text_encoder_2, image_encoder, vae, transformer
            )

    print("Video generation finished.")
    return output_filename_final

if __name__ == "__main__":
    args = parse_args()

    print("Arguments:")
    for k, v in vars(args).items():
        print(f"  {k}: {v}")

    print("\nInitializing...")

    free_mem_gb = get_cuda_free_memory_gb(gpu)
    high_vram = free_mem_gb > 60

    print(f'Free VRAM {free_mem_gb} GB')
    print(f'High-VRAM Mode: {high_vram}')

    # Load Models
    print("Loading models...")
    text_encoder = LlamaModel.from_pretrained("hunyuanvideo-community/HunyuanVideo", subfolder='text_encoder', torch_dtype=torch.float16).cpu()
    text_encoder_2 = CLIPTextModel.from_pretrained("hunyuanvideo-community/HunyuanVideo", subfolder='text_encoder_2', torch_dtype=torch.float16).cpu()
    tokenizer = LlamaTokenizerFast.from_pretrained("hunyuanvideo-community/HunyuanVideo", subfolder='tokenizer')
    tokenizer_2 = CLIPTokenizer.from_pretrained("hunyuanvideo-community/HunyuanVideo", subfolder='tokenizer_2')
    vae = AutoencoderKLHunyuanVideo.from_pretrained("hunyuanvideo-community/HunyuanVideo", subfolder='vae', torch_dtype=torch.float16).cpu()
    feature_extractor = SiglipImageProcessor.from_pretrained("lllyasviel/flux_redux_bfl", subfolder='feature_extractor')
    image_encoder = SiglipVisionModel.from_pretrained("lllyasviel/flux_redux_bfl", subfolder='image_encoder', torch_dtype=torch.float16).cpu()
    transformer = HunyuanVideoTransformer3DModelPacked.from_pretrained('lllyasviel/FramePackI2V_HY', torch_dtype=torch.bfloat16).cpu()
    print("Models loaded.")

    # Configure Models
    vae.eval()
    text_encoder.eval()
    text_encoder_2.eval()
    image_encoder.eval()
    transformer.eval()

    if not high_vram:
        vae.enable_slicing()
        vae.enable_tiling()

    transformer.high_quality_fp32_output_for_inference = True
    # print('transformer.high_quality_fp32_output_for_inference = True') # Less verbose

    transformer.to(dtype=torch.bfloat16)
    vae.to(dtype=torch.float16)
    image_encoder.to(dtype=torch.float16)
    text_encoder.to(dtype=torch.float16)
    text_encoder_2.to(dtype=torch.float16)

    vae.requires_grad_(False)
    text_encoder.requires_grad_(False)
    text_encoder_2.requires_grad_(False)
    image_encoder.requires_grad_(False)
    transformer.requires_grad_(False)

    if not high_vram:
        print("Installing dynamic swap for low VRAM...")
        DynamicSwapInstaller.install_model(transformer, device=gpu)
        DynamicSwapInstaller.install_model(text_encoder, device=gpu)
    else:
        print("Moving models to GPU (High VRAM)...")
        text_encoder.to(gpu)
        text_encoder_2.to(gpu)
        image_encoder.to(gpu)
        vae.to(gpu)
        transformer.to(gpu)

    os.makedirs(args.output_dir, exist_ok=True)

    # Load Input Image
    print(f"Loading input image from: {args.image_path}")
    try:
        input_image = Image.open(args.image_path).convert('RGB')
    except FileNotFoundError:
        print(f"Error: Input image not found at {args.image_path}")
        exit(1)
    except Exception as e:
        print(f"Error loading image: {e}")
        exit(1)

    # Run Worker
    final_video_path = worker(
        input_image=input_image,
        prompt=args.prompt,
        n_prompt=args.n_prompt,
        seed=args.seed,
        total_second_length=args.total_second_length,
        latent_window_size=args.latent_window_size,
        steps=args.steps,
        cfg=args.cfg,
        gs=args.gs,
        rs=args.rs,
        gpu_memory_preservation=args.gpu_memory_preservation,
        use_teacache=args.use_teacache,
        outputs_folder=args.output_dir,
        high_vram=high_vram,
        text_encoder=text_encoder,
        text_encoder_2=text_encoder_2,
        tokenizer=tokenizer,
        tokenizer_2=tokenizer_2,
        vae=vae,
        feature_extractor=feature_extractor,
        image_encoder=image_encoder,
        transformer=transformer
    )

    if final_video_path:
        print(f"\nSuccessfully generated video: {final_video_path}")
    else:
        print("\nVideo generation failed.")

以下のスクリプトで実行します。画像のpathとプロンプトは適宜変えてください。

python run.py --image_path ./sample.jpg --prompt "The man dances energetically, leaping mid-air with fluid arm swings and quick footwork."

うまくいけば、outputsフォルダに生成された動画が保存されます。

250418_232341_518_4768.gif

ここまでうまくいけば、あとはスクリプトで動画を大量生産するだけです。例えば、下のスクリプトはサンプルの画像からスタートして、1つの感情をテーマにした動画を作ったら、その最後のフレームから次の動画を作る、ということで5種類の感情を連続的に表現しました。

emotions.py
import subprocess
import os
import time
import sys
import cv2 # Import OpenCV
import shutil # To clean up temporary file if needed

# --- Settings ---
initial_image_path = "./test.jpg" # Starting image for the first prompt
base_output_dir = "continuous" # Single output directory
emulate_script_path = "run.py" # Assuming it's in the same directory (app)
video_segment_length_seconds = 5.0 # Desired length for each segment
last_frame_temp_filename = "_last_frame_for_next_step.png" # Temporary file for the last frame

# Dictionary of emotions and prompts - Order matters now!
# Using a list of tuples to guarantee order explicitly
prompts_ordered = [
    ("joy", "The man dances with joyful energy, spinning lightly and clapping his hands with a big smile"),
    ("anger", "The man stomps with force, arms cutting through the air sharply in an angry rhythm"),
    ("sadness", "The man moves slowly and fluidly, arms drooping as he spins with a somber expression"),
    ("surprise", "The man jolts upright mid-dance, arms flaring outward in a startled motion, then quickly steps back"),
    ("excitement", "The man bursts into rapid footwork and high jumps, pumping his fists with excitement"),
]

# Create the base output directory if it doesn't exist
os.makedirs(base_output_dir, exist_ok=True)

# Get the path to the Python executable in the current environment
python_executable = sys.executable
print(f"Using Python executable: {python_executable}")
print(f"Base output directory: {os.path.abspath(base_output_dir)}")
sys.stdout.flush() # Flush initial messages

# --- Execution Loop ---
current_image_path = initial_image_path # Start with the initial image
last_generated_video_path = None # Keep track of the last video

for i, (emotion, prompt_text) in enumerate(prompts_ordered):
    print(f"\n===== Starting generation {i}: {emotion} =====")
    print(f"Prompt: {prompt_text}")
    print(f"Using input image: {current_image_path}")

    # Determine the output video filename
    output_video_filename = f"{i:02d}_{emotion}.mp4"
    output_video_path = os.path.join(base_output_dir, output_video_filename)
    print(f"Output video: {output_video_path}")

    # Construct the command using the environment's Python
    command = [
        python_executable,
        emulate_script_path,
        "--image_path", current_image_path, # Use the current image path
        "--prompt", f'"{prompt_text}"',
        "--output_dir", base_output_dir, # Output to the main directory
        "--total_second_length", str(video_segment_length_seconds), # Set video length
        # Add other arguments here if needed, e.g.:
        # "--seed", "42",
    ]
    # Note: run.py needs to save the video with a predictable name
    # or we need to modify it. Assuming it saves ONE .mp4 in the output_dir.
    # Let's modify the command to potentially include the specific output filename
    # if run.py supports it, otherwise we'll rely on finding the
    # latest mp4 file in the directory. For now, let's assume run.py
    # saves a uniquely named file (like job_id.mp4) in the output_dir.

    print(f"Executing command: {' '.join(command)}") # Log the command
    sys.stdout.flush() # Flush before starting subprocess output

    generated_video_this_step = None # Reset for this step

    # Run the command and stream output
    try:
        start_time = time.time()
        # Use Popen to stream output in real-time
        process = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT, # Redirect stderr to stdout
            text=True,
            encoding='utf-8',
            errors='replace', # Replace characters that cannot be decoded
            bufsize=1, # Line buffered
            cwd=os.path.dirname(__file__) or '.' # Ensure execution in app directory
        )

        print("--- run.py output (streaming) ---")
        sys.stdout.flush() # Flush before streaming
        process_output_lines = []
        # Read and print output line by line, storing it
        while True:
            line = process.stdout.readline()
            if not line:
                break
            stripped_line = line.strip()
            print(stripped_line) # Print the line to the console
            process_output_lines.append(stripped_line) # Store for later parsing
            sys.stdout.flush() # Ensure each line from subprocess is flushed

        process.wait() # Wait for the process to complete
        end_time = time.time()

        if process.returncode == 0:
            print(f"----- Subprocess finished successfully -----")
            print(f"Duration: {end_time - start_time:.2f} seconds")
            sys.stdout.flush()

            # --- Find the generated video ---
            # Option 1: Parse output from run.py (if it prints the filename)
            # This is more robust if run.py prints a specific line like "Successfully generated video: ..."
            found_path = None
            for line in reversed(process_output_lines):
                 # Adjust this marker based on run.py's actual success message
                marker = "Successfully generated video:"
                if marker in line:
                    found_path = line.split(marker)[-1].strip()
                    # Ensure the path exists and is relative to the base_output_dir or absolute
                    if not os.path.isabs(found_path):
                        found_path = os.path.join(base_output_dir, os.path.basename(found_path)) # Make path relative to script execution? No, base_output_dir
                    if os.path.exists(found_path):
                        generated_video_this_step = found_path
                        print(f"Parsed generated video path: {generated_video_this_step}")
                        break
                    else:
                         print(f"Warning: Parsed path '{found_path}' does not exist. Falling back to directory scan.")
                         found_path = None # Reset if path doesn't exist

            # Option 2: Fallback - Find the most recently created MP4 file in the output directory
            if not generated_video_this_step:
                print("Parsing failed or video not found in output, trying directory scan...")
                try:
                    list_of_files = [os.path.join(base_output_dir, f) for f in os.listdir(base_output_dir) if f.lower().endswith(".mp4")]
                    if list_of_files:
                        generated_video_this_step = max(list_of_files, key=os.path.getctime)
                        print(f"Found most recent video via scan: {generated_video_this_step}")
                    else:
                        print(f"Error: No MP4 files found in {base_output_dir} after generation.")
                        # Decide how to handle: stop or continue? Let's stop for now.
                        raise FileNotFoundError(f"No MP4 file found in {base_output_dir}")
                except Exception as scan_e:
                    print(f"Error scanning directory {base_output_dir}: {scan_e}")
                    raise scan_e # Propagate error

            # Rename the found video to the desired sequential name
            final_video_path = os.path.join(base_output_dir, output_video_filename)
            if os.path.abspath(generated_video_this_step) != os.path.abspath(final_video_path):
                 print(f"Renaming {generated_video_this_step} to {final_video_path}")
                 try:
                      shutil.move(generated_video_this_step, final_video_path)
                      last_generated_video_path = final_video_path
                 except Exception as move_e:
                      print(f"Error renaming video: {move_e}")
                      # Decide how to proceed, maybe stop?
                      raise move_e
            else:
                 print("Generated video already has the correct name.")
                 last_generated_video_path = final_video_path


            # --- Extract last frame for next iteration ---
            if i < len(prompts_ordered) - 1: # Only if it's not the very last prompt
                print(f"Extracting last frame from: {last_generated_video_path}")
                try:
                    cap = cv2.VideoCapture(last_generated_video_path)
                    if not cap.isOpened():
                        raise IOError(f"Cannot open video file: {last_generated_video_path}")

                    # Get frame count (may be inaccurate for some formats)
                    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
                    last_frame_index = frame_count - 1
                    print(f"Total frames (approx): {frame_count}")

                    if last_frame_index < 0:
                         raise ValueError("Video appears to have no frames.")

                    # --- Method 1: Seek to the end (faster but potentially less reliable) ---
                    # cap.set(cv2.CAP_PROP_POS_FRAMES, last_frame_index)
                    # ret, last_frame = cap.read()
                    # if not ret:
                    #    print("Warning: Seeking to last frame failed. Trying frame-by-frame reading.")
                    #    # Fallback to Method 2

                    # --- Method 2: Read all frames (slower but more reliable) ---
                    last_frame = None
                    frame_read_count = 0
                    while True:
                        ret, frame = cap.read()
                        if not ret:
                            break
                        last_frame = frame
                        frame_read_count += 1

                    cap.release()

                    if last_frame is None:
                        raise ValueError(f"Could not read any frames or the last frame from {last_generated_video_path}. Frames read: {frame_read_count}")

                    # Save the last frame
                    next_input_image_path = os.path.join(base_output_dir, last_frame_temp_filename)
                    cv2.imwrite(next_input_image_path, last_frame)
                    print(f"Last frame saved to: {next_input_image_path}")
                    current_image_path = next_input_image_path # Update for the next loop

                except Exception as frame_e:
                    print(f"----- Error extracting last frame for: {emotion} -----")
                    print(f"Error: {frame_e}")
                    print("Cannot continue without the last frame. Stopping.")
                    sys.stdout.flush()
                    raise frame_e # Stop the script

        else:
            # Raise an error if the process failed (will be caught below)
            raise subprocess.CalledProcessError(process.returncode, command)

    except subprocess.CalledProcessError as e:
        print(f"\n----- Error during generation for: {emotion} -----")
        print(f"Return code: {e.returncode}")
        print(f"Command failed: {' '.join(e.cmd)}")
        print("Stopping the script due to generation error.")
        sys.stdout.flush()
        break # Stop the loop on error

    except FileNotFoundError as e:
        print(f"Error: {e}")
        print("Please ensure Python, scripts, and necessary files are accessible.")
        sys.stdout.flush()
        break # Stop if essential files are missing

    except KeyboardInterrupt: # Catch KeyboardInterrupt specifically
        print("\n" + "#"*10 + f" Process interrupted by user during generation for: {emotion} " + "#"*10)
        sys.stdout.flush()
        print("Stopping the script.")
        break # Stop the loop if interrupted

    except Exception as e:
        print(f"\n----- An unexpected error occurred for: {emotion} -----")
        import traceback
        traceback.print_exc()
        print("Stopping the script due to unexpected error.")
        sys.stdout.flush()
        break # Stop the loop on unexpected error

# --- Cleanup ---
final_temp_frame_path = os.path.join(base_output_dir, last_frame_temp_filename)
if os.path.exists(final_temp_frame_path):
    try:
        os.remove(final_temp_frame_path)
        print(f"\nCleaned up temporary file: {final_temp_frame_path}")
    except Exception as clean_e:
        print(f"\nWarning: Could not clean up temporary file {final_temp_frame_path}: {clean_e}")

print("\n===== Continuous generation finished or script stopped =====")
sys.stdout.flush() # Flush final message

おわりに

やっぱり、ローカルで作れるって良いですよね。色々工夫した使い方ができそうなので、また記事で取り上げてみたいです。

38
26
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
38
26

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?