エロ動画のシーンを音声から分析するシリーズ 3: 転移学習のためのデータセットを作る

  1. まず、学習済み YAMNet で何個かエロ動画を持ってきて、ざっくり Speech, Speech-ish, Moan, Other に分類して、それを手動で聞いていって、「あえぎ声」「会話」「その他」に分ける
  2. 1. で作った音声データセットを使って「あえぎ声」「会話」と「その他」の二つに分ける転移学習モデルを作る
    1. で作った音声データセットを使って「あえぎ声」と「会話」の二つに分ける転移学習モデルを作る

この計画の 3 Step の 1 までやる

まず、エロ動画を Speech, Speech-ish, Moan, EnvSound に分けてみる

import sys
import ffmpeg
import csv
from scipy.io import wavfile
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
import os
import datetime
import math
import shutil
from pytimeparse.timeparse import timeparse

assert len(sys.argv) == 4
video_id = int(sys.argv[1])
video_file = sys.argv[2]
time_str = sys.argv[3]
video_start = float(timeparse(time_str))
assert video_start >= 0

chunk_dir = os.path.join('chunk')
chunk_tmp_dir = os.path.join(chunk_dir, 'tmp')
speech_dir = os.path.join(chunk_dir, 'speech')
speechish_dir = os.path.join(chunk_dir, 'speechish')
moanish_dir = os.path.join(chunk_dir, 'moan')
env_sound_dir = os.path.join(chunk_dir, 'env_sound')
other_dir = os.path.join(chunk_dir, 'other')

for d in [chunk_dir, chunk_tmp_dir, speech_dir, speechish_dir, moanish_dir, env_sound_dir, other_dir]:
    os.makedirs(d, exist_ok=True)

# 動画の尺を求める
video_data = ffmpeg.probe(video_file)
video_duration = max([float(stream['duration']) for stream in video_data['streams']])
assert video_duration > video_start

# Tensorflow hub から YMANet の学習済みモデルをロード
model = hub.load('https://tfhub.dev/google/yamnet/1')

# クラス名を取得
def class_names_from_csv(class_map_csv_text):
    class_names = []
    with tf.io.gfile.GFile(class_map_csv_text) as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
    return class_names

class_map_path = model.class_map_path().numpy()
class_names = class_names_from_csv(class_map_path)

# クラス名の扱いを定義
speech_class_names = ['Speech']
speechish_class_names = [
        'Child speech, kid speaking',
        'Crying, sobbing',
        'Baby cry, infant cry',
        'Baby laughter',
        'Child singing',
        'Belly laugh',
moanish_class_names = [
        'Domestic animals, pets',
        'Livestock, farm animals, working animals',
        'Wail, moan',
env_sound_class_names = [
        'Inside, small room',
        'Inside, large room or hall',
        'Drawer open or close',
        'Sliding door',
other_class_names = list(filter(lambda class_name:
        class_name not in speech_class_names and
        class_name not in speechish_class_names and
        class_name not in moanish_class_names and
        class_name not in env_sound_class_names,
assert all(class_name in class_names for class_name in speech_class_names)
assert all(class_name in class_names for class_name in speechish_class_names)
assert all(class_name in class_names for class_name in moanish_class_names)
assert all(class_name in class_names for class_name in env_sound_class_names)
speech_classes = [class_names.index(class_name) for class_name in speech_class_names]
speechish_classes = [class_names.index(class_name) for class_name in speechish_class_names]
moanish_classes = [class_names.index(class_name) for class_name in moanish_class_names]
env_sound_classes = [class_names.index(class_name) for class_name in env_sound_class_names]
other_classes = [class_names.index(class_name) for class_name in other_class_names]

def calc_scores(original_scores):
    speech_score = sum([original_scores[class_id] for class_id in speech_classes])
    speechish_score = sum([original_scores[class_id] for class_id in speechish_classes])
    moanish_score = sum([original_scores[class_id] for class_id in moanish_classes])
    env_sound_score = sum([original_scores[class_id] for class_id in env_sound_classes])
    other_score = sum([original_scores[class_id] for class_id in other_classes])
    return speech_score, speechish_score, moanish_score, env_sound_score, other_score

# この変数にデバッグ用の字幕を入れる
subtitle_content = ''

# 時間計測
start_time = datetime.datetime.now()

# 0.48 秒ごとに処理
chunk_duration = 0.48
index = 0
for chunk_start in np.arange(video_start, video_duration, chunk_duration):
    chunk_end = min(chunk_start + chunk_duration, video_duration)
    assert chunk_start < chunk_end

    chunk_file = os.path.join(chunk_tmp_dir, f'{video_id:03d}-{index:08d}.wav')

    # ffmpeg
    input_stream = ffmpeg.input(video_file)
    output_stream = ffmpeg.output(input_stream.audio, chunk_file, ss=chunk_start, t=chunk_duration, ar=16000, ac=1)
    ffmpeg.run(output_stream, overwrite_output=True, quiet=True)

    # 音声を読み込み読み込み
    sample_rate, wav_data = wavfile.read(chunk_file, 'rb')
    assert sample_rate == 16000
    waveform = wav_data / tf.int16.max # -1.0 ~ 1.0 に正規化

    # 分類の実行
    scores_result, embeddings, spectrogram = model(waveform)
    scores_result_np = scores_result.numpy()
    assert scores_result_np.shape == (1, 521)
    scores = scores_result_np[0]

    # 独自のスコア計算
    speech_score, speechish_score, moanish_score, env_sound_score, other_score = calc_scores(scores)

    # デバッグ用の情報を作る
    pseaudo_class_name_score_pairs_for_debug = [
        ('会話', speech_score),
        ('会話-ish', speechish_score),
        ('喘ぎ-ish', moanish_score),
        ('環境音-ish', env_sound_score),
    pseaudo_class_name_score_pairs_for_debug.extend(list(zip(other_class_names, [scores[class_id] for class_id in other_classes])))
    pseaudo_class_name_score_pairs_for_debug = sorted(pseaudo_class_name_score_pairs_for_debug, key=lambda pair: pair[1], reverse=True)
    print([(class_name, f'{round(score * 100)}%') for class_name, score in pseaudo_class_name_score_pairs_for_debug[0:5]])

    def format_sec(sec):
        return f'{str(datetime.timedelta(seconds=int(math.floor(sec))))},{int(math.floor(sec % 1.0 * 1000.0)):03d}'

    # デバッグ用の字幕を作る
    subtitle_content += f'{index + 1}\n'
    subtitle_content += f'{format_sec(chunk_start)} --> {format_sec(chunk_end)}\n'
    for class_name, score in pseaudo_class_name_score_pairs_for_debug[0:5]:
        subtitle_content += f'{class_name} ({int(score * 100)}%)\n'
    subtitle_content += f'\n'

    if speech_score > 0.8:
        shutil.move(chunk_file, speech_dir)
    elif speechish_score > 0.8:
        shutil.move(chunk_file, speechish_dir)
    elif moanish_score > 0.8:
        shutil.move(chunk_file, moanish_dir)
    elif env_sound_score > 0.8:
        shutil.move(chunk_file, env_sound_dir)
        shutil.move(chunk_file, other_dir)

    # 進捗の表示
    execution_time = datetime.datetime.now() - start_time
    execution_sec = execution_time.total_seconds()
    remaining_sec = execution_sec / (chunk_end - video_start) * (video_duration - video_start) - execution_sec
    print(f'current_position={format_sec(chunk_start)}, video_duration={format_sec(video_duration)} , execution_sec={format_sec(execution_sec)}, remaining_sec={format_sec(remaining_sec)}, inferred_class={pseaudo_class_name_score_pairs_for_debug[0][0]} ({pseaudo_class_name_score_pairs_for_debug[0][1]})')

    index += 1

# 字幕ファイルの書き出し
with open('subtitles.srt', 'w') as subtitle_file:




  • dataset
    • speech: 会話と思われる音声を入れる
    • moan: あえぎ声に思われる音声を入れる
    • env_sound: 無音とか雑音とか環境音を入れる




