2
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

エロ動画のシーンを音声から分析するシリーズ 6: エロ動画から会話を切り出す(完成)

Posted at

前回までのあらすじ

前々回は転移学習で、「声」と「その他」に分けるモデルを作った。前回は、「声」を「会話」と「あえぎ声」に分けるモデルを作った。

今回はこの二つのモデルを使って、エロ動画から会話を切り出す部分を作っていく

早速コードを書く

import sys
import ffmpeg
import csv
from scipy.io import wavfile
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
import os
import datetime
import math
import shutil
from pytimeparse.timeparse import timeparse
import tensorflow_io as tfio
import glob

# video ファイルのパスを受け取る
assert len(sys.argv) == 2
video_file = sys.argv[1]
file_path_without_ext = os.path.join(os.path.dirname(video_file), os.path.splitext(os.path.basename(video_file))[0])

# 出力するファイル
output_file = file_path_without_ext + '.会話.mp4'
subtitle_file = file_path_without_ext + '.デバッグ用字幕.srt'

# 学習済みモデルをロード
yamnet_model_handle = 'https://tfhub.dev/google/yamnet/1'
yamnet_model = hub.load(yamnet_model_handle)
voice_and_other_model = tf.keras.models.load_model('voice_and_other_model')
speech_and_moan_model = tf.keras.models.load_model('speech_and_moan_model')

# 動画の尺を求める
video_data = ffmpeg.probe(video_file)
video_duration = max([float(stream['duration']) for stream in video_data['streams']])
assert video_duration > 0

def load_wav_16k_mono(filename):
    sample_rate, wav_data = wavfile.read(chunk_file, 'rb')
    assert sample_rate == 16000
    waveform = wav_data / tf.int16.max # -1.0 ~ 1.0 に正規化
    return waveform

# この変数にデバッグ用の字幕を入れる
subtitle_content = ''

# 時間計測
start_time = datetime.datetime.now()

# 時刻のフォーマット
def format_sec(sec):
    return f'{str(datetime.timedelta(seconds=int(math.floor(sec))))},{int(math.floor(sec % 1.0 * 1000.0)):03d}'

# 0.48 秒ごとに処理
chunk_duration = 0.48
chunk_class_name_list = []
index = 0
for chunk_start in np.arange(0, video_duration, chunk_duration):
    chunk_end = chunk_start + chunk_duration

    chunk_end = min(chunk_end, video_duration)
    assert chunk_start < chunk_end

    chunk_file = os.path.join(f'tmp-{index:08d}.wav')

    # ffmpeg で wav ファイルを生成 
    input_stream = ffmpeg.input(video_file, ss=chunk_start, t=(chunk_end - chunk_start))
    output_stream = ffmpeg.output(input_stream.audio, chunk_file, ar=16000, ac=1)
    ffmpeg.run(output_stream, overwrite_output=True, quiet=True)

    # 音声データを読み込み読み込み
    try:
        wav_data = load_wav_16k_mono(chunk_file)
    finally:
        os.remove(chunk_file)

    # voice_and_other モデルの適用
    yamnet_scores, embeddings, spectrogram = yamnet_model(wav_data)
    vao_scores = voice_and_other_model(embeddings).numpy().mean(axis=0)

    if vao_scores[0] > vao_scores[1]:

        # speech_and_moan モデルを適用
        sam_scores = speech_and_moan_model(embeddings).numpy().mean(axis=0)
        if sam_scores[0] > sam_scores[1]:
            chunk_class_name_list.append('speech')
            debug_class_name = f'speech ({round(sam_scores[0] * 100)}, {round(sam_scores[1] * 100)})'
        else:
            chunk_class_name_list.append('moan')
            debug_class_name = f'moan ({round(sam_scores[0] * 100)}, {round(sam_scores[1] * 100)})'
    else:
        chunk_class_name_list.append('other')
        debug_class_name = f'other ({round(vao_scores[0] * 100)}, {round(vao_scores[1] * 100)})'


    # デバッグ用の字幕を作る
    subtitle_content += f'{index + 1}\n'
    subtitle_content += f'{format_sec(chunk_start + (0.01 * chunk_duration))} --> {format_sec(chunk_end - (0.01 * chunk_duration))}\n'
    subtitle_content += f'[{index:08d}] {debug_class_name}\n'
    subtitle_content += f'\n'

    # 進捗の表示
    execution_time = datetime.datetime.now() - start_time
    execution_sec = execution_time.total_seconds()
    remaining_sec = execution_sec / chunk_end * video_duration - execution_sec
    print(f'current_position={format_sec(chunk_start)}, video_duration={format_sec(video_duration)} , execution_sec={format_sec(execution_sec)}, remaining_sec={format_sec(remaining_sec)}, scores={debug_class_name}')

    index += 1

# 字幕ファイルの書き出し
with open(subtitle_file, 'w') as f:
    f.write(subtitle_content)

# chunk ごとに class_name が与えられているリストを受け取って
# 条件に合う class_name が連続した範囲をリストにして返す
def make_range_list_to_chunk_class_name_list(chunk_class_name_list, matcher_fn):
    in_range = False
    range_list = []
    for i, class_name in enumerate(chunk_class_name_list):
        if not(in_range) and matcher_fn(class_name):
            in_range = True
            start_index = i
        if in_range and not(matcher_fn(class_name)):
            in_range = False
            range_list.append((start_index, i - start_index))
    if in_range:
        range_list.append((start_index, len(chunk_class_name_list) - start_index))
    return range_list

# speech 以外の chunk が一個だけ孤立してる場合は、 speech にする
# speech, moan, speech みたいな chunk が並んでる場合に speech, speech, speech に変える(細切れにならないように)
for index, length in make_range_list_to_chunk_class_name_list(chunk_class_name_list, lambda class_name: class_name != 'speech'):
    if length <= 1:
        chunk_class_name_list[index] = 'speech'

# speech の chunk が一個だけ孤立してる場合は、 other にする
for index, length in make_range_list_to_chunk_class_name_list(chunk_class_name_list, lambda class_name: class_name == 'speech'):
    if length <= 1:
        chunk_class_name_list[index] = 'other'

expanded_chunk_class_name_list = chunk_class_name_list.copy()

# 会話が聞き取りやすいように speech の領域を拡張する 
expansion_count = 4
for i, class_name in enumerate(chunk_class_name_list):
    if class_name == 'speech':
        for j in range(0, expansion_count):
            index = i - (j + 1)
            if index >= 0:
                expanded_chunk_class_name_list[index] = class_name
            index = i + (j + 1)
            if index < len(chunk_class_name_list):
                expanded_chunk_class_name_list[index] = class_name

# speech の range を ffmpeg の input stream として定義
input_stream_list = []
for index, length in make_range_list_to_chunk_class_name_list(chunk_class_name_list, lambda class_name: class_name == 'speech'):
    input_stream = ffmpeg.input(video_file, ss=chunk_duration * index, t=chunk_duration * length)
    input_stream_list.append(input_stream['v'])
    input_stream_list.append(input_stream['a'])

# input stream を結合して、出力
concatenated_stream = ffmpeg.concat(*input_stream_list, v=1, a=1, unsafe=True)
output_stream = ffmpeg.output(concatenated_stream, output_file)
ffmpeg.run(output_stream)

思ったこと

ffmpeg-python で filter_complex 書くのめちゃくちゃ簡単で良いな〜これ。

結果

かなり満足のいくものができた

エロい!

なんか、「あん、あん、あん」みたいなのをカットして「イクイクイク」とか「気持ちいいの?」「気持ちいいよぉ」みたいなセリフ部分だけまとめて観れるので、とても良い。

このシリーズはおしまい

後で感想だけ書くかも

2
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?