前回のあらすじ
前回は転移学習のやり方を学んだ
今回は、転移学習をどうやるか計画を立てて、データセットを作る
ちなみに、このシリーズ一貫した目的は「あえぎ声」と「セックス中の会話」を分離すること
計画
- まず、学習済み YAMNet で何個かエロ動画を持ってきて、ざっくり Speech, Speech-ish, Moan, Other に分類して、それを手動で聞いていって、「あえぎ声」「会話」「その他」に分ける
- 1. で作った音声データセットを使って「あえぎ声」「会話」と「その他」の二つに分ける転移学習モデルを作る
-
- で作った音声データセットを使って「あえぎ声」と「会話」の二つに分ける転移学習モデルを作る
この計画の 3 Step の 1 までやる
まず、エロ動画を Speech, Speech-ish, Moan, EnvSound に分けてみる
import sys
import ffmpeg
import csv
from scipy.io import wavfile
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
import os
import datetime
import math
import shutil
from pytimeparse.timeparse import timeparse
assert len(sys.argv) == 4
video_id = int(sys.argv[1])
video_file = sys.argv[2]
time_str = sys.argv[3]
video_start = float(timeparse(time_str))
assert video_start >= 0
chunk_dir = os.path.join('chunk')
chunk_tmp_dir = os.path.join(chunk_dir, 'tmp')
speech_dir = os.path.join(chunk_dir, 'speech')
speechish_dir = os.path.join(chunk_dir, 'speechish')
moanish_dir = os.path.join(chunk_dir, 'moan')
env_sound_dir = os.path.join(chunk_dir, 'env_sound')
other_dir = os.path.join(chunk_dir, 'other')
for d in [chunk_dir, chunk_tmp_dir, speech_dir, speechish_dir, moanish_dir, env_sound_dir, other_dir]:
os.makedirs(d, exist_ok=True)
# 動画の尺を求める
video_data = ffmpeg.probe(video_file)
video_duration = max([float(stream['duration']) for stream in video_data['streams']])
assert video_duration > video_start
# Tensorflow hub から YMANet の学習済みモデルをロード
model = hub.load('https://tfhub.dev/google/yamnet/1')
# クラス名を取得
def class_names_from_csv(class_map_csv_text):
class_names = []
with tf.io.gfile.GFile(class_map_csv_text) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
class_names.append(row['display_name'])
return class_names
class_map_path = model.class_map_path().numpy()
class_names = class_names_from_csv(class_map_path)
# クラス名の扱いを定義
speech_class_names = ['Speech']
speechish_class_names = [
'Child speech, kid speaking',
'Crying, sobbing',
'Baby cry, infant cry',
'Baby laughter',
'Babbling',
'Laughter',
'Whimper',
'Singing',
'Child singing',
'Belly laugh',
'Giggle'
]
moanish_class_names = [
'Animal',
'Dog',
'Cat',
'Domestic animals, pets',
'Livestock, farm animals, working animals',
'Pant',
'Goat',
'Hiccup',
'Wail, moan',
]
env_sound_class_names = [
'Silence',
'Inside, small room',
'Inside, large room or hall',
'Drawer open or close',
'Tap',
'Bouncing',
'Door',
'Sliding door',
'Writing',
'Snort',
'Squeak',
'Breathing',
'Music',
]
other_class_names = list(filter(lambda class_name:
class_name not in speech_class_names and
class_name not in speechish_class_names and
class_name not in moanish_class_names and
class_name not in env_sound_class_names,
class_names))
assert all(class_name in class_names for class_name in speech_class_names)
assert all(class_name in class_names for class_name in speechish_class_names)
assert all(class_name in class_names for class_name in moanish_class_names)
assert all(class_name in class_names for class_name in env_sound_class_names)
speech_classes = [class_names.index(class_name) for class_name in speech_class_names]
speechish_classes = [class_names.index(class_name) for class_name in speechish_class_names]
moanish_classes = [class_names.index(class_name) for class_name in moanish_class_names]
env_sound_classes = [class_names.index(class_name) for class_name in env_sound_class_names]
other_classes = [class_names.index(class_name) for class_name in other_class_names]
def calc_scores(original_scores):
speech_score = sum([original_scores[class_id] for class_id in speech_classes])
speechish_score = sum([original_scores[class_id] for class_id in speechish_classes])
moanish_score = sum([original_scores[class_id] for class_id in moanish_classes])
env_sound_score = sum([original_scores[class_id] for class_id in env_sound_classes])
other_score = sum([original_scores[class_id] for class_id in other_classes])
return speech_score, speechish_score, moanish_score, env_sound_score, other_score
# この変数にデバッグ用の字幕を入れる
subtitle_content = ''
# 時間計測
start_time = datetime.datetime.now()
# 0.48 秒ごとに処理
chunk_duration = 0.48
index = 0
for chunk_start in np.arange(video_start, video_duration, chunk_duration):
chunk_end = min(chunk_start + chunk_duration, video_duration)
assert chunk_start < chunk_end
chunk_file = os.path.join(chunk_tmp_dir, f'{video_id:03d}-{index:08d}.wav')
# ffmpeg
input_stream = ffmpeg.input(video_file)
output_stream = ffmpeg.output(input_stream.audio, chunk_file, ss=chunk_start, t=chunk_duration, ar=16000, ac=1)
ffmpeg.run(output_stream, overwrite_output=True, quiet=True)
# 音声を読み込み読み込み
sample_rate, wav_data = wavfile.read(chunk_file, 'rb')
assert sample_rate == 16000
waveform = wav_data / tf.int16.max # -1.0 ~ 1.0 に正規化
# 分類の実行
scores_result, embeddings, spectrogram = model(waveform)
scores_result_np = scores_result.numpy()
assert scores_result_np.shape == (1, 521)
scores = scores_result_np[0]
# 独自のスコア計算
speech_score, speechish_score, moanish_score, env_sound_score, other_score = calc_scores(scores)
# デバッグ用の情報を作る
pseaudo_class_name_score_pairs_for_debug = [
('会話', speech_score),
('会話-ish', speechish_score),
('喘ぎ-ish', moanish_score),
('環境音-ish', env_sound_score),
]
pseaudo_class_name_score_pairs_for_debug.extend(list(zip(other_class_names, [scores[class_id] for class_id in other_classes])))
pseaudo_class_name_score_pairs_for_debug = sorted(pseaudo_class_name_score_pairs_for_debug, key=lambda pair: pair[1], reverse=True)
print([(class_name, f'{round(score * 100)}%') for class_name, score in pseaudo_class_name_score_pairs_for_debug[0:5]])
def format_sec(sec):
return f'{str(datetime.timedelta(seconds=int(math.floor(sec))))},{int(math.floor(sec % 1.0 * 1000.0)):03d}'
# デバッグ用の字幕を作る
subtitle_content += f'{index + 1}\n'
subtitle_content += f'{format_sec(chunk_start)} --> {format_sec(chunk_end)}\n'
for class_name, score in pseaudo_class_name_score_pairs_for_debug[0:5]:
subtitle_content += f'{class_name} ({int(score * 100)}%)\n'
subtitle_content += f'\n'
if speech_score > 0.8:
shutil.move(chunk_file, speech_dir)
elif speechish_score > 0.8:
shutil.move(chunk_file, speechish_dir)
elif moanish_score > 0.8:
shutil.move(chunk_file, moanish_dir)
elif env_sound_score > 0.8:
shutil.move(chunk_file, env_sound_dir)
else:
shutil.move(chunk_file, other_dir)
# 進捗の表示
execution_time = datetime.datetime.now() - start_time
execution_sec = execution_time.total_seconds()
remaining_sec = execution_sec / (chunk_end - video_start) * (video_duration - video_start) - execution_sec
print(f'current_position={format_sec(chunk_start)}, video_duration={format_sec(video_duration)} , execution_sec={format_sec(execution_sec)}, remaining_sec={format_sec(remaining_sec)}, inferred_class={pseaudo_class_name_score_pairs_for_debug[0][0]} ({pseaudo_class_name_score_pairs_for_debug[0][1]})')
index += 1
# 字幕ファイルの書き出し
with open('subtitles.srt', 'w') as subtitle_file:
subtitle_file.write(subtitle_content)
こんな感じで分けて、それを耳で聞いてデータを作った
ディレクトリ構成
以下のような感じ
- dataset
- speech: 会話と思われる音声を入れる
- moan: あえぎ声に思われる音声を入れる
- env_sound: 無音とか雑音とか環境音を入れる
次回
次回は今回作ったデータセットで転移学習