youtubeダウンロード+mp3変換+タグ付け+アートワーク
メモに近い。
# -*- config: utf-8 -*-
#------------------------------------------------------------------------------
# import
#------------------------------------------------------------------------------
import os
import glob
import json
from pickle import TRUE
from mutagen.easyid3 import EasyID3
from mutagen.id3 import APIC, ID3
import requests
import time
import concurrent.futures
import argparse
import inspect
import custom_logger
loglevel = 'DEBUG' # ログレベル(DEBUG,INFO,WARNING,ERROR,CRITICAL)
logger = custom_logger.setup(loglevel) # ロガー作成
#------------------------------------------------------------------------------
# variable
#------------------------------------------------------------------------------
logfilename = os.path.basename(__file__).replace('.py', '.log')
#------------------------------------------------------------------------------
# function
#------------------------------------------------------------------------------
# メイン
def main(args):
logger.debug("[Do] " + inspect.currentframe().f_code.co_name)
url = args.url
futures = []
dwld_json(url)
find_json()
extract_json()
max = 30
executor = concurrent.futures.ThreadPoolExecutor(max_workers=max)
# concurrent execution
for i in range(max):
# wapper_enc_x(max, i)
futures.append(executor.submit(wapper_enc_x, max, i))
time.sleep(0.1)
# wait
for future in concurrent.futures.as_completed(futures):
True
return
# ダウンロード
def dwld_json(url):
logger.debug("[Do] " + inspect.currentframe().f_code.co_name + url)
global playlist
if '/playlist' in url:
format = "%(playlist_uploader)s\\%(playlist_title)s\\%(playlist_index)s - %(title)s.%(ext)s"
playlist = "true"
else:
format = "%(uploader)s\\%(title)s.%(ext)s"
playlist = "false"
opt = "--skip-download --write-info-json --no-overwrites"
cmd = "yt-dlp " + opt + " -o " + "\"" + format + "\"" + " " + url
cmd = cmd + ' > ' + logfilename + ' 2>&1'
logger.info("[CMD]> " + str(cmd))
os.system(cmd)
return
# jsonファイル検索
def find_json():
logger.debug("[Do] " + inspect.currentframe().f_code.co_name)
global json_list
json_list = []
# ['.//ハック大学\\仕事ができない人から学ぼう!\\0 - 仕事ができない人から学ぼう!.info.json', './/ハック大学\\仕事ができない人から学ぼう!\\1 - デキない人から学ぶ、伝える力.info.json', './/ハック大学\\仕事ができない人から学ぼう!\\2 - デキな い人から学ぶ、質問力.info.json', './/ハック大学\\仕事ができない人から学ぼう!\\2 - 仕事ができない人から学ぼう!.info.json']
for f_json in glob.glob('**//*.json', recursive=True):
json_list.append([f_json])
# print(json_list)
return
# jsonファイルから情報抽出
def extract_json():
logger.debug("[Do] " + inspect.currentframe().f_code.co_name)
global json_list_exist
json_list_exist = []
k = 0
for i in range(len(json_list)):
#print(json_list[i])
with open(json_list[i][0], 'r', encoding="utf-8_sig") as f:
j = json.load(f)
if 'webpage_url' in j:
webpage_url = j['webpage_url']
else:
logger.debug('webpage_url: ' + json_list[i][0])
continue
if 'playlist_uploader' in j:
artist = j['playlist_uploader'].rstrip()
else:
logger.debug('playlist_uploader: ' + json_list[i][0])
continue
if 'playlist' in j:
album = j['playlist']
else:
album = 'single'
if 'title' in j:
title = j['title']
else:
logger.debug('title: ' + json_list[i][0])
continue
if 'thumbnails' in j:
thumbnail_url = j['thumbnails'][-2]['url']
else:
logger.debug('thumbanail: ' + json_list[i][0])
continue
if not 'format' in j:
# this is playlistjson
logger.debug('format: ' + json_list[i][0])
continue
mp3_file = json_list[i][0].replace('.info.json', '.mp3')
jpg_file = json_list[i][0].replace('.info.json', '.jpg')
tmp_file = json_list[i][0].replace('.info.json', '.tmp')
json_list_exist.append(json_list[i])
json_list_exist[k].append(webpage_url)
json_list_exist[k].append(artist)
json_list_exist[k].append(album)
json_list_exist[k].append(title)
json_list_exist[k].append(thumbnail_url)
json_list_exist[k].append(mp3_file)
json_list_exist[k].append(jpg_file)
json_list_exist[k].append(tmp_file)
#print(json_list_exist)
k += 1
#print(json_list_exist)
return
# サムネイルダウンロード
def dwld_thumbnail(jpg_file, thumbnail_url):
logger.debug("[Do] " + inspect.currentframe().f_code.co_name + " " + jpg_file + " " + thumbnail_url)
urldata = requests.get(thumbnail_url).content
with open(jpg_file ,mode='wb') as f:
f.write(urldata)
return
# mp3ダウンロード
def dwld_mp3(tmp_file, webpage_url):
logger.debug("[Do] " + inspect.currentframe().f_code.co_name + " " + tmp_file + " " + webpage_url)
format = tmp_file
opt = "-ix --audio-format mp3 --quiet"
cmd = "yt-dlp " + opt + " -o " + "\"" + format + "\"" + " " + webpage_url
print("[CMD]> " + str(cmd))
os.system(cmd)
return
# アートワーク
def update_coverart(mp3_file, jpg_file):
logger.debug("[Do] " + inspect.currentframe().f_code.co_name + " " + str(mp3_file) + " " + str(jpg_file))
tags = ID3(mp3_file)
with open(jpg_file, "rb") as img_file:
cover_img_byte_str = img_file.read()
tags.add(APIC(mime="image/jpeg", type=3, desc=u'Cover', data=cover_img_byte_str))
tags.save()
return
# 不要ファイル削除
def rm_file():
logger.debug("[Do] " + inspect.currentframe().f_code.co_name)
for i in range(len(json_list_exist)):
artist = json_list_exist[i][2]
f_list = glob.glob(artist.strip() + "**/**", recursive=True)
for f in f_list:
if '.mp3' not in f:
if os.path.isfile(f):
os.remove(f)
return
# 並列用のラッパー
def wapper_enc_x(max, b):
logger.debug("[Do] " + inspect.currentframe().f_code.co_name + " " + str(max) + " " + str(b))
json_len = len(json_list_exist)
for i in range(json_len):
if i % max == b:
# print (i,b)
json = json_list_exist[i][0]
webpage_url = json_list_exist[i][1]
artist = json_list_exist[i][2]
album = json_list_exist[i][3]
title = json_list_exist[i][4]
thumbnail_url = json_list_exist[i][5]
mp3_file = json_list_exist[i][6]
jpg_file = json_list_exist[i][7]
tmp_file = json_list_exist[i][8]
#print(json, webpage_url, artist, album, title, thumbnail_url, mp3_file, jpg_file, tmp_file, playlist)
try:
playlist
except:
print(json)
continue
try:
album
except:
print(json)
continue
if playlist == "true" and album == "single":
print(playlist, album)
continue
if playlist == "false" and album != "single":
print(playlist, album)
continue
if not os.path.isfile(mp3_file):
dwld_mp3(tmp_file, webpage_url)
dwld_thumbnail(jpg_file, thumbnail_url)
update_metadata(mp3_file, title, album, artist)
update_coverart(mp3_file, jpg_file)
return
# メタデータ更新・追加・削除
def update_metadata(mp3_file, title, album, artist):
logger.debug("[Do] " + inspect.currentframe().f_code.co_name + " " + str(mp3_file) + " " + str(title) + " " + str(album) + " " + str(artist))
#print(file, title, album, artist)
tags = EasyID3(mp3_file)
tags["title"] = title
tags["album"] = album
tags["artist"] = artist
tags.save()
return
# 引数確認
def check_args():
logger.debug("[Do] " + inspect.currentframe().f_code.co_name)
global playlist
argparser = argparse.ArgumentParser()
argparser.add_argument('-u', '--url', type=str, required=TRUE)
argparser.add_argument('--basedir', type=str, default='D:\\Dropbox\\02.プライベート\\音楽\\Youtube')
args = argparser.parse_args()
os.chdir(args.basedir)
if '/playlist' in args.url:
playlist = "true"
else:
playlist = "false"
return(args)
#------------------------------------------------------------------------------
# main
#------------------------------------------------------------------------------
if __name__ == "__main__":
logger.debug("[Do] " + inspect.currentframe().f_code.co_name)
args = check_args()
main(args)
rm_file()
exit()