前回までのあらすじ
前々回は転移学習で、「声」と「その他」に分けるモデルを作った。前回は、「声」を「会話」と「あえぎ声」に分けるモデルを作った。
今回はこの二つのモデルを使って、エロ動画から会話を切り出す部分を作っていく
早速コードを書く
import sys
import ffmpeg
import csv
from scipy.io import wavfile
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
import os
import datetime
import math
import shutil
from pytimeparse.timeparse import timeparse
import tensorflow_io as tfio
import glob
# video ファイルのパスを受け取る
assert len(sys.argv) == 2
video_file = sys.argv[1]
file_path_without_ext = os.path.join(os.path.dirname(video_file), os.path.splitext(os.path.basename(video_file))[0])
# 出力するファイル
output_file = file_path_without_ext + '.会話.mp4'
subtitle_file = file_path_without_ext + '.デバッグ用字幕.srt'
# 学習済みモデルをロード
yamnet_model_handle = 'https://tfhub.dev/google/yamnet/1'
yamnet_model = hub.load(yamnet_model_handle)
voice_and_other_model = tf.keras.models.load_model('voice_and_other_model')
speech_and_moan_model = tf.keras.models.load_model('speech_and_moan_model')
# 動画の尺を求める
video_data = ffmpeg.probe(video_file)
video_duration = max([float(stream['duration']) for stream in video_data['streams']])
assert video_duration > 0
def load_wav_16k_mono(filename):
sample_rate, wav_data = wavfile.read(chunk_file, 'rb')
assert sample_rate == 16000
waveform = wav_data / tf.int16.max # -1.0 ~ 1.0 に正規化
return waveform
# この変数にデバッグ用の字幕を入れる
subtitle_content = ''
# 時間計測
start_time = datetime.datetime.now()
# 時刻のフォーマット
def format_sec(sec):
return f'{str(datetime.timedelta(seconds=int(math.floor(sec))))},{int(math.floor(sec % 1.0 * 1000.0)):03d}'
# 0.48 秒ごとに処理
chunk_duration = 0.48
chunk_class_name_list = []
index = 0
for chunk_start in np.arange(0, video_duration, chunk_duration):
chunk_end = chunk_start + chunk_duration
chunk_end = min(chunk_end, video_duration)
assert chunk_start < chunk_end
chunk_file = os.path.join(f'tmp-{index:08d}.wav')
# ffmpeg で wav ファイルを生成
input_stream = ffmpeg.input(video_file, ss=chunk_start, t=(chunk_end - chunk_start))
output_stream = ffmpeg.output(input_stream.audio, chunk_file, ar=16000, ac=1)
ffmpeg.run(output_stream, overwrite_output=True, quiet=True)
# 音声データを読み込み読み込み
try:
wav_data = load_wav_16k_mono(chunk_file)
finally:
os.remove(chunk_file)
# voice_and_other モデルの適用
yamnet_scores, embeddings, spectrogram = yamnet_model(wav_data)
vao_scores = voice_and_other_model(embeddings).numpy().mean(axis=0)
if vao_scores[0] > vao_scores[1]:
# speech_and_moan モデルを適用
sam_scores = speech_and_moan_model(embeddings).numpy().mean(axis=0)
if sam_scores[0] > sam_scores[1]:
chunk_class_name_list.append('speech')
debug_class_name = f'speech ({round(sam_scores[0] * 100)}, {round(sam_scores[1] * 100)})'
else:
chunk_class_name_list.append('moan')
debug_class_name = f'moan ({round(sam_scores[0] * 100)}, {round(sam_scores[1] * 100)})'
else:
chunk_class_name_list.append('other')
debug_class_name = f'other ({round(vao_scores[0] * 100)}, {round(vao_scores[1] * 100)})'
# デバッグ用の字幕を作る
subtitle_content += f'{index + 1}\n'
subtitle_content += f'{format_sec(chunk_start + (0.01 * chunk_duration))} --> {format_sec(chunk_end - (0.01 * chunk_duration))}\n'
subtitle_content += f'[{index:08d}] {debug_class_name}\n'
subtitle_content += f'\n'
# 進捗の表示
execution_time = datetime.datetime.now() - start_time
execution_sec = execution_time.total_seconds()
remaining_sec = execution_sec / chunk_end * video_duration - execution_sec
print(f'current_position={format_sec(chunk_start)}, video_duration={format_sec(video_duration)} , execution_sec={format_sec(execution_sec)}, remaining_sec={format_sec(remaining_sec)}, scores={debug_class_name}')
index += 1
# 字幕ファイルの書き出し
with open(subtitle_file, 'w') as f:
f.write(subtitle_content)
# chunk ごとに class_name が与えられているリストを受け取って
# 条件に合う class_name が連続した範囲をリストにして返す
def make_range_list_to_chunk_class_name_list(chunk_class_name_list, matcher_fn):
in_range = False
range_list = []
for i, class_name in enumerate(chunk_class_name_list):
if not(in_range) and matcher_fn(class_name):
in_range = True
start_index = i
if in_range and not(matcher_fn(class_name)):
in_range = False
range_list.append((start_index, i - start_index))
if in_range:
range_list.append((start_index, len(chunk_class_name_list) - start_index))
return range_list
# speech 以外の chunk が一個だけ孤立してる場合は、 speech にする
# speech, moan, speech みたいな chunk が並んでる場合に speech, speech, speech に変える(細切れにならないように)
for index, length in make_range_list_to_chunk_class_name_list(chunk_class_name_list, lambda class_name: class_name != 'speech'):
if length <= 1:
chunk_class_name_list[index] = 'speech'
# speech の chunk が一個だけ孤立してる場合は、 other にする
for index, length in make_range_list_to_chunk_class_name_list(chunk_class_name_list, lambda class_name: class_name == 'speech'):
if length <= 1:
chunk_class_name_list[index] = 'other'
expanded_chunk_class_name_list = chunk_class_name_list.copy()
# 会話が聞き取りやすいように speech の領域を拡張する
expansion_count = 4
for i, class_name in enumerate(chunk_class_name_list):
if class_name == 'speech':
for j in range(0, expansion_count):
index = i - (j + 1)
if index >= 0:
expanded_chunk_class_name_list[index] = class_name
index = i + (j + 1)
if index < len(chunk_class_name_list):
expanded_chunk_class_name_list[index] = class_name
# speech の range を ffmpeg の input stream として定義
input_stream_list = []
for index, length in make_range_list_to_chunk_class_name_list(chunk_class_name_list, lambda class_name: class_name == 'speech'):
input_stream = ffmpeg.input(video_file, ss=chunk_duration * index, t=chunk_duration * length)
input_stream_list.append(input_stream['v'])
input_stream_list.append(input_stream['a'])
# input stream を結合して、出力
concatenated_stream = ffmpeg.concat(*input_stream_list, v=1, a=1, unsafe=True)
output_stream = ffmpeg.output(concatenated_stream, output_file)
ffmpeg.run(output_stream)
思ったこと
ffmpeg-python で filter_complex 書くのめちゃくちゃ簡単で良いな〜これ。
結果
かなり満足のいくものができた
エロい!
なんか、「あん、あん、あん」みたいなのをカットして「イクイクイク」とか「気持ちいいの?」「気持ちいいよぉ」みたいなセリフ部分だけまとめて観れるので、とても良い。
このシリーズはおしまい
完
後で感想だけ書くかも