概要
目標:男女別の音声データセットを作成したい
全体の流れ
- キーワードを指定してYouTubeの動画をダウンロード
- 動画から音声データを抽出
- 動画から1秒ごとの画像を取得
- 3で取得した画像から男女を判別
- 顔の特徴量を計算し、1秒前の画像と比較
- 音声の切り出し
0. 必要なライブラリとディレクトリの準備
from pytube import YouTube
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from oauth2client.tools import argparser
import os
import shutil
import cv2
import glob
import dlib
import matplotlib.pyplot as plt
import numpy as np
import keras
from keras.applications.vgg16 import preprocess_input
from keras.preprocessing import image
import matplotlib.pyplot as plt
from keras.models import Sequential, Model
from keras.layers import Dense, Input, Flatten, Dropout
from keras.applications.vgg16 import VGG16
import pandas as pd
import sys
from PIL import Image
import moviepy.editor as mp
from pydub import AudioSegment
keyword = "NHKニュース7"
data_path = os.path.join("../data/", keyword)
if os.path.exists("../data/") == False:
os.mkdir("../data/")
if os.path.exists(data_path) == False:
os.mkdir(data_path)
1. キーワードを指定してYouTubeの動画をダウンロード
YouTube APIを使用して、この記事を参考に動画をダウンロード。
download_movies.py
def download_video(keyword, video_num, dl_path, DEVELOPER_KEY,
YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION):
if os.path.exists(dl_path) == False:
os.mkdir(dl_path)
youtube = build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION,
developerKey=DEVELOPER_KEY)
# maxResultsは50以下のみ指定可能
search_response = youtube.search().list(
q=keyword,
part="id,snippet",
maxResults=video_num).execute()
videos = []
for search_result in search_response.get("items", []):
if search_result["id"]["kind"] == "youtube#video":
videos.append("%s" % (search_result["id"]["videoId"]))
print("Downloading {} videos.".format(len(videos)))
for ID in videos:
try:
query = 'https://www.youtube.com/watch?v=' + ID
yt = YouTube(query)
yt.streams.filter(subtype='mp4').first().download(dl_path)
print("download succeeded")
except:
pass
video_num = 10
dl_path = os.path.join(data_path, "video")
DEVELOPER_KEY = "***"
YOUTUBE_API_SERVICE_NAME = "***"
YOUTUBE_API_VERSION = "v3"
download_video(keyword=keyword,
video_num=video_num,
dl_path=dl_path,
DEVELOPER_KEY=DEVELOPER_KEY,
YOUTUBE_API_SERVICE_NAME=YOUTUBE_API_SERVICE_NAME,
YOUTUBE_API_VERSION=YOUTUBE_API_VERSION)
2. 動画から音声データを抽出
VideoFileClipを使用し、ダウンロードした動画から音声のみを抽出。
movie_to_audio.py
def all_movie_to_audio(input_dir, output_dir):
file_list = glob.glob("".join([input_dir, "/*"]))
for input_movie in file_list:
clip_input = mp.VideoFileClip(input_movie)
audio_name = input_movie.split("/")[-1][:-1] + "3"
clip_input.audio.write_audiofile(os.path.join(output_dir, audio_name))
print("done")
audio_path = os.path.join(data_path, "audio")
if os.path.exists(audio_path) == False:
os.mkdir(audio_path)
all_movie_to_audio(input_dir=dl_path, output_dir=audio_path)
3-1. 動画から1秒ごとの画像を取得
この記事を参考に、1秒ごとのキャプチャを取得。秒数をファイル名として保存する。
画像はaudio以下の動画名に応じたディレクトリに保存される。
movie_to_pic.py
def extract_pic(video_file, images_dir):
#動画ファイルを読み込む
video = cv2.VideoCapture(video_file)
if not os.path.exists(images_dir):
os.mkdir(images_dir)
#フレーム数を取得
frame_count = int(video.get(7))
frame_rate = int(video.get(5))
for i in range(0, frame_count):
is_read, frame = video.read()
if i % frame_rate == 0:
cv2.imwrite(images_dir+ "/" + str(i // frame_rate).zfill(6) + ".png", frame)
def all_movie_to_frames(dl_path, data_path):
video_files = glob.glob(dl_path + "/*")
frame_dir_name = os.path.join(data_path, "frames")
if os.path.exists(frame_dir_name) == False:
os.mkdir(frame_dir_name)
frame_rate_list = []
for file in video_files:
frame_dir = frame_dir_name + "/ " + file.split("/")[-1][:-4]
os.mkdir(frame_dir)
frame_rate = extract_pic(video_file=file, images_dir=frame_dir)
frame_rate_list.append(frame_rate)
return(frame_rate_list)
dl_path = os.path.join(data_path, "video")
frame_rate_list = all_movie_to_frames(dl_path=dl_path, data_path=data_path)
3-2. 画像から顔部分を切り出し
4で男女判別をする際、画像全体ではなく顔部分に適切なマージンをつけて切り出した画像の方が精度がよかったので、この工程を追加。
def facedetector_dlib(img, image_path):
try:
detector = dlib.get_frontal_face_detector()
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
dets, scores, idx = detector.run(img_rgb, 0)
color = (0, 0, 255)
s = len(dets)
rect_list = []
if len(dets) == 1:
# 顔画像が1つ以上と判定された時だけ顔の切り出し画像を作成する
for i, rect in enumerate(dets):
rect_list.append([rect.left(), rect.top(), rect.right(), rect.bottom()])
return(s, rect_list[0])
except:
# メモリエラーの時など
return (img, '')
def crop_frames(data_path, frames_dir):
dir_list = glob.glob(frames_dir + "/*")
for directory in dir_list:
file_list = sorted(glob.glob(directory + "/*"))
crop_dir = os.path.join(data_path, "crop_frames")
if os.path.exists(crop_dir) == False:
os.mkdir(crop_dir)
each_crop_directory = os.path.join(crop_dir, directory.split("/")[-1])
os.mkdir(each_crop_directory)
for pic in file_list:
im = cv2.imread(pic)
s, rect_list= facedetector_dlib(im, pic)
im = Image.open(pic)
file_path = os.path.join(each_crop_directory, pic.split("/")[-1])
if type(s) == int:
if s == 1:
axis_1 = rect_list[2] - rect_list[0]
axis_2 = rect_list[3] - rect_list[1]
im_crop = im.crop((rect_list[0] - axis_1 * 0.35, rect_list[1] - axis_2 * 0.35,
rect_list[2] + axis_1 * 0.35, rect_list[3] + axis_2 * 0.35))
im_crop.save(file_path)
# 顔が無ければ s='' が返る
frames_dir = os.path.join(data_path, "frames")
crop_frames(data_path=data_path, frames_dir=frames_dir)
4. 3で取得した画像から男女を判別
※男女を判別するモデルを学習する過程については、長くなるのでこの記事に書きました。
predict_mw.py
img_width, img_height = 224, 224
input_tensor = Input(shape=(img_height, img_width, 3))
vgg16_model = VGG16(include_top=False, input_tensor=input_tensor)
for layer in vgg16_model.layers[:15]:
layer.trainable = False
# define FC layer
fc_model = Sequential()
fc_model.add(Flatten(input_shape=vgg16_model.output_shape[1:]))
fc_model.add(Dense(256, activation='relu'))
fc_model.add(Dropout(0.5))
fc_model.add(Dense(1, activation='sigmoid'))
model = Model(inputs=vgg16_model.input, outputs=fc_model(vgg16_model.output))
model.compile(
optimizer=keras.optimizers.SGD(lr=1e-4, momentum=0.9),
loss='binary_crossentropy',
metrics=['accuracy']
)
model_path = "../models/margin_30per_close_50.hdf5"
model.load_weights(model_path)
def predict_mw(file_name):
# 引数で指定した画像ファイルを読み込む
# サイズはVGG16のデフォルトである224x224にリサイズされる
img = image.load_img(file_name, target_size=(224, 224))
x = image.img_to_array(img)
x = np.expand_dims(x, axis=0)
preds = model.predict(preprocess_input(x))
if preds > 0.5:
mw_pred = "W"
else:
mw_pred = "M"
return(mw_pred)
5. 顔の特徴量を計算し、1秒前の画像と比較
openfaceを使用し、顔の特徴量を計算。
1秒前の画像と、特徴ベクトル間の距離を計算して保存する。
calc_feature_value.py
fileDir = "***"
modelDir = os.path.join(fileDir, 'models')
dlibModelDir = os.path.join(modelDir, 'dlib')
openfaceModelDir = os.path.join(modelDir, 'openface')
dlibFacePredictor = os.path.join(dlibModelDir, "shape_predictor_68_face_landmarks.dat")
align = openface.AlignDlib(dlibFacePredictor)
networkModel = os.path.join(openfaceModelDir, 'nn4.small2.v1.t7')
imgDim = 96
net = openface.TorchNeuralNet(networkModel, imgDim)
def getRep(imgPath):
try:
bgrImg = cv2.imread(imgPath)
if bgrImg is None:
raise Exception("Unable to load image: {}".format(imgPath))
rgbImg = cv2.cvtColor(bgrImg, cv2.COLOR_BGR2RGB)
bb = align.getLargestFaceBoundingBox(rgbImg)
if bb is None:
raise Exception("Unable to find a face: {}".format(imgPath))
alignedFace = align.align(imgDim, rgbImg, bb,
landmarkIndices=openface.AlignDlib.OUTER_EYES_AND_NOSE)
if alignedFace is None:
raise Exception("Unable to align image: {}".format(imgPath))
rep = net.forward(alignedFace)
return rep
except:
return 0
def write_feature_csv(data_path):
dir_list= glob.glob(os.path.join(data_path, "crop_frames") + "/*")
for directory in dir_list:
file_feature_value_list = []
file_list = sorted(glob.glob(directory + "/*"))
for file in file_list:
# それぞれの画像に対して特徴量を計算
file_name_feature_pair = [file.split("/")[-1], getRep(file), predict_mw(file_name=file)]
file_feature_value_list.append(file_name_feature_pair)
diff_list = []
diff_list.append(0)
for num in range(len(file_feature_value_list) - 1):
d = file_feature_value_list[num][1] - file_feature_value_list[num + 1][1]
diff_list.append(np.dot(d,d))
df = pd.DataFrame(file_feature_value_list, columns = ["PNG File", "Feature Value", "M or W"])
# ベクトルの距離を計算してDiff列に保存する
df["Diff"] = diff_list
csv_dir = os.path.join(data_path, "mw_feature_csv")
if os.path.exists(csv_dir) == False:
os.mkdir(csv_dir)
df.to_csv(os.path.join(csv_dir, directory.split("/")[-1] + ".csv"))
write_feature_csv(data_path = data_path)
6. 音声の切り出し
同じ人が連続して写っている箇所を2の音声から切り出し、男女のラベルを付加する
1秒前の画像と比較して、
・特徴ベクトル間の距離が1未満
・男女の判定が同じ
場合のみ、1つの音声データとして保存可能、という条件で音声を切り出す。
trim_audio.py
def one_sequence(audio_path, sec_1, sec_2, export_file):
sound = AudioSegment.from_file(audio_path, format="mp3")
sec_1 = sec_1 * 1000
sec_2 = sec_2 * 1000
sound = sound[sec_1:sec_2]
sound.export(export_file)
if os.path.exists(os.path.join(data_path, "trimmed_audio")) == False:
os.mkdir(os.path.join(data_path, "trimmed_audio"))
csv_list = glob.glob(os.path.join(data_path, "mw_feature_csv") + "/*")
for csv_path in csv_list:
csv_name = csv_path.split("/")[-1][1:-4]
if os.path.exists(os.path.join(data_path, "trimmed_audio", csv_name)) == False:
os.mkdir(os.path.join(data_path, "trimmed_audio", csv_name))
df = pd.read_csv(csv_path)
df["OK/NG"] = 0
for i in range(0, len(df)):
if ((int(df.iloc[i, 1][:-4]) - int(df.iloc[i - 1, 1][:-4])) > 1) \
or (df.iloc[i, 3] != df.iloc[i - 1, 3]) \
or (df.iloc[i, 4] > 0.8):
df.iloc[i, 5] = "NG"
else:
df.iloc[i, 5] = "OK"
starts = []
ends = []
mws = []
status = "off"
for i in range(0, len(df)):
if (status == "off"):
if (df.iloc[i, 5] == "OK"):
starts.append(int(df.iloc[i - 1, 1][:-4]))
mws.append(df.iloc[i, 3])
status = "on"
else:
pass
else:
if (df.iloc[i, 5] == "NG"):
ends.append(int(df.iloc[i - 1, 1][:-4]))
status = "off"
else:
pass
for (start, end, mw) in zip(starts, ends, mws):
if (start + 5 < end):
one_sequence(audio_path = os.path.join(data_path, "audio", csv_name + ".mp3"),
sec_1=int(start),
sec_2=int(end),
export_file=os.path.join(data_path, "trimmed_audio", csv_name,
mw + "_" + str(start) + "_" + str(end) + ".mp3"))