イントロダクション
先日「手話男、現る」というイベントが開催された。
手話に20代の人生を捧げる手話男を中心にさまざまな手話関連の話題を会話するというものだ。ちなみに手話男とは私のことではない。
その中である参加者がこのような話題を展開した。
「実は私、手話ニュースを見ているんです」
「手話ニュースですか、私も見ていました」
「手話ニュースというのは手話、VTR、手話という繰り返しの構成になっていますよね」
「そうですね」
「あの手話部分のみを編集して集中して勉強できるようにしています」
「それはすばらしいですね」
「でも編集の手間がかかるので、なかなか進められていません」
「確かに…、私ならプログラムで自動編集できるようにすると思います」
「なるほど、私もやってみます」
どっちが私なのかはさておき
手話ニュースを手話通訳教材の一部として利用するのは理解できるものだなと思った。
VTR箇所は案外長いこともあるため、それをカットしたいというもまたしかりである。
概要
やりたいこと
手話ニュース動画をプログラムに渡して手話部分のみを取り出せるようにしたい。
成果物のイメージ
①プログラムを起動
②編集対象の動画ファイルを選択
③手話部分のみが取り出された動画ファイルを出力
手話ニュースの構成
さまざまなパターンが存在することが確認できた。
■パターン①
- 手話パートA
- 手話パートB
- エンディング
■パターン②
- 手話パートA
- VTR
- 手話パートA
- エンディング
※手話パートA、Bについて
現在確認できている範囲だと手話ニュースには2つの背景が存在する。
手話パートAは赤背景、手話パートBは緑背景である。
手話ニュースサイトからサンプルを確認できる。
https://www.nhk.or.jp/shuwa/
プログラムについて
構築方法
当初ChatGPTに丸投げしてみたがうまく作成できなかった。
よって、よりシンプルな実装を考えてから構築した。
- 動画を読み込む
- 背景の左下部分の色情報をフレーム単位で読み込む
- 事前に定義した手話パートA,Bと色情報が一致している部分を「手話パート」として識別する
- 手話パートとして認識された部分を切り出して結合する
使い方
- プログラムに動画ファイルをドラッグアンドドロップする、ドラッグアンドドロップしなかった場合はファイル選択ダイアログが開くので動画ファイルを選択する
- しばらく待つと<ファイル名>_sign-part-only.mp4が出力される
コード
from __future__ import annotations
import sys, argparse, math, subprocess, tempfile, shutil
from pathlib import Path
from typing import List, Tuple, Iterable, Optional
import numpy as np
import av # pip install av
# --- 1) 切り出し対象の基準色(拡張しやすい配列) ---
TARGET_COLORS = [
{"name": "red_bg", "rgb": [170, 109, 123]}, # 赤背景
{"name": "green_bg", "rgb": [154, 181, 144]}, # 緑背景
]
# --- 2) ROIヘルパ ---
def norm_roi_to_px(x: float, y: float, w: float, h: float, width: int, height: int) -> Tuple[int,int,int,int]:
x0 = max(0, int(round(x * width)))
y0 = max(0, int(round(y * height)))
ww = max(1, int(round(w * width)))
hh = max(1, int(round(h * height)))
if x0 + ww > width: ww = width - x0
if y0 + hh > height: hh = height - y0
return x0, y0, ww, hh
# --- 3) フレーム列挙(RGB + PTS秒) ---
def iter_frames(path: Path) -> Iterable[Tuple[np.ndarray, Optional[float], int, float]]:
container = av.open(str(path))
v = next((s for s in container.streams if s.type == "video"), None)
if v is None:
container.close()
raise RuntimeError("映像ストリームが見つかりません")
time_base = float(v.time_base) if v.time_base else None
avg_rate = float(v.average_rate) if v.average_rate else None # フォールバック用
idx = 0
last_sec = 0.0
for packet in container.demux(v):
for frame in packet.decode():
pts_sec = None
if frame.pts is not None and time_base is not None:
pts_sec = frame.pts * time_base
last_sec = pts_sec
else:
# PTSが取れない場合は nominal FPS から擬似秒数
if avg_rate and avg_rate > 0:
pts_sec = idx / avg_rate
last_sec = pts_sec
else:
pts_sec = None
img = frame.to_ndarray(format="rgb24")
yield img, pts_sec, idx, last_sec
idx += 1
container.close()
# --- 4) ROI平均RGB ---
def mean_rgb(img_rgb: np.ndarray, roi_px: Tuple[int,int,int,int]) -> Tuple[int,int,int]:
x0,y0,w,h = roi_px
roi = img_rgb[y0:y0+h, x0:x0+w, :]
if roi.size == 0:
return (0,0,0)
m = roi.reshape(-1,3).mean(axis=0)
return (int(round(float(m[0]))), int(round(float(m[1]))), int(round(float(m[2]))))
# --- 5) 基準色との最短距離 ---
def min_distance_to_targets(rgb: Tuple[int,int,int]) -> float:
r,g,b = rgb
best = float("inf")
for t in TARGET_COLORS:
tr,tg,tb = t["rgb"]
d = math.sqrt((r-tr)**2 + (g-tg)**2 + (b-tb)**2)
if d < best:
best = d
return best
# --- 6) 連続区間の抽出(flag=1の連なりを開始・終了の秒に変換) ---
def detect_segments(
path: Path,
roi_norm: Tuple[float,float,float,float],
tol: float
) -> List[Tuple[float, float]]:
segments: List[Tuple[float,float]] = []
in_seg = False
seg_start: Optional[float] = None
last_pts_fallback = 0.0
for img_rgb, pts_sec, idx, last_sec in iter_frames(path):
H, W = img_rgb.shape[:2]
roi_px = norm_roi_to_px(*roi_norm, width=W, height=H)
r,g,b = mean_rgb(img_rgb, roi_px)
dist = min_distance_to_targets((r,g,b))
flag = 1 if dist <= tol else 0
cur_time = pts_sec if pts_sec is not None else last_sec
last_pts_fallback = cur_time
if flag and not in_seg:
# start new segment
in_seg = True
seg_start = cur_time
elif (not flag) and in_seg:
# close segment at current time(フレーム境界で十分)
in_seg = False
if seg_start is not None:
segments.append((seg_start, cur_time))
seg_start = None
# 終端が開いたままならクローズ
if in_seg and seg_start is not None:
segments.append((seg_start, last_pts_fallback))
return segments
# --- 7) ffmpeg で各区間を切り出し→結合 ---
def ensure_ffmpeg() -> str:
exe = shutil.which("ffmpeg")
if not exe:
raise RuntimeError("ffmpeg が見つかりません。先にインストールしてください。")
return exe
def cut_and_concat(
src: Path,
segments: List[Tuple[float,float]],
out_path: Path,
reencode: bool = True
):
ffmpeg = ensure_ffmpeg()
tmpdir = Path(tempfile.mkdtemp(prefix="sign_parts_"))
try:
part_files: List[Path] = []
# 1) 各区間を切り出し
for i, (ss, ee) in enumerate(segments):
if ee <= ss:
continue
part = tmpdir / f"part_{i:04d}.mp4"
if reencode:
# 正確さ優先(再エンコード)
cmd = [
ffmpeg, "-y",
"-ss", f"{ss:.6f}",
"-to", f"{ee:.6f}",
"-i", str(src),
"-c:v", "libx264",
"-preset", "veryfast",
"-crf", "20",
"-c:a", "aac", "-b:a", "128k",
"-movflags", "+faststart",
str(part)
]
else:
# 速さ優先(キーフレーム境界前提:ずれる可能性あり)
cmd = [
ffmpeg, "-y",
"-ss", f"{ss:.6f}",
"-to", f"{ee:.6f}",
"-i", str(src),
"-c", "copy",
str(part)
]
subprocess.run(cmd, check=True)
part_files.append(part)
if not part_files:
raise RuntimeError("切り出し対象の区間が見つかりませんでした。")
# 2) concat demuxer 用リスト作成
list_file = tmpdir / "list.txt"
with open(list_file, "w", encoding="utf-8") as f:
for p in part_files:
# パスに空白があってもOKな書式
f.write(f"file '{p.as_posix()}'\n")
# 3) 結合
cmd_concat = [
ffmpeg, "-y",
"-f", "concat", "-safe", "0",
"-i", str(list_file),
"-c", "copy",
str(out_path)
]
# 同一コーデックの結合なので copy でOK(全パートを同じエンコード設定で出力している)
subprocess.run(cmd_concat, check=True)
finally:
# 一時ファイル削除
shutil.rmtree(tmpdir, ignore_errors=True)
# --- 8) メイン ---
def parse_args(argv: List[str]) -> argparse.Namespace:
p = argparse.ArgumentParser(description="切り出し対象(=1)だけを抽出して結合し、<ファイル名>_sign-part-only.mp4を出力")
# 実行情報を指定
p.add_argument("--roi", nargs=4, type=float, metavar=("X","Y","W","H"),
default=[0.03, 0.75, 0.16, 0.22], # 左下を認識領域として定義
help="正規化ROI(0~1の割合で X,Y,W,H)")
p.add_argument("--tol", type=float, default=20.0, help="色ズレ許容量(RGB距離)")
p.add_argument("--min-sec", type=float, default=0.0, help="この秒数未満の短い区間は除外")
p.add_argument("--copy", action="store_true", help="再エンコードせずcopy(境界がずれる可能性あり)")
p.add_argument("paths", nargs="*", help="動画ファイル(複数可:DnD/引数/ダイアログ)")
return p.parse_args(argv)
def main():
args = parse_args(sys.argv[1:])
# 入力収集(ドラッグアンドドロップ/引数優先、無ければダイアログ)
targets = [Path(p) for p in args.paths if str(p).strip()]
if not targets:
try:
import tkinter as tk
from tkinter import filedialog
root = tk.Tk(); root.withdraw()
sel = filedialog.askopenfilenames(
title="処理する動画を選択(複数可)",
filetypes=[("Video files","*.mp4 *.mov *.mkv *.m2ts *.mts *.avi *.wmv *.flv *.webm"), ("All files","*.*")]
)
targets = [Path(p) for p in sel]
except Exception:
pass
if not targets:
print("[INFO] 対象ファイルが選択されませんでした。")
return
roi_norm = tuple(args.roi)
for src in targets:
try:
print(f"[INFO] 区間検出中: {src.name}")
segs = detect_segments(src, roi_norm=roi_norm, tol=args.tol)
# 短すぎる区間の除外(必要なら)
if args.min_sec > 0:
segs = [(s,e) for (s,e) in segs if (e - s) >= args.min_sec]
if not segs:
print(f"[WARN] 切り出し対象(=1)の区間が見つかりませんでした: {src.name}")
continue
out_path = src.with_name(f"{src.stem}_sign-part-only.mp4")
print(f"[INFO] {len(segs)} 区間を切り出し&結合 → {out_path.name}")
cut_and_concat(src, segs, out_path, reencode=(not args.copy))
print(f"[OK] 出力完了: {out_path}")
except Exception as e:
print(f"[NG] {src}: {e}")
if __name__ == "__main__":
main()
今後
事実として勢いで作成したのみであり、サンプルとして用意した2本にしか実験できていない。
よって、現在のコードが不完全な可能性は大いにある、
完璧なものは自分で所持し、この記事で記載した機能はアイデアベースの下書きとして書き記しておく。
このアイデア自体を出した企画参加者は手話ニュースを録画して動画ファイルに起こす術を持っているようだが、私自身はテレビすら持っていないため、私自身がこのプログラムを利用する想定はない。
もちろん、手話ニュースを編集してどうのこうの…というのは個人利用の範囲に限られるのでこのプログラムを完成させたところで自分自身にいいことはとくにない。
作成時間は約1時間程度であった。