LoginSignup
1
1

More than 1 year has passed since last update.

[python] youtubeダウンロード+mp3変換+タグ付け+アートワーク

Last updated at Posted at 2022-09-18

youtubeダウンロード+mp3変換+タグ付け+アートワーク
メモに近い。

# -*- config: utf-8 -*-
#------------------------------------------------------------------------------
# import
#------------------------------------------------------------------------------
import os
import glob
import json
from pickle import TRUE
from mutagen.easyid3 import EasyID3
from mutagen.id3 import APIC, ID3
import requests
import time
import concurrent.futures
import argparse
import inspect
import custom_logger

loglevel = 'DEBUG'                               # ログレベル(DEBUG,INFO,WARNING,ERROR,CRITICAL)
logger = custom_logger.setup(loglevel)           # ロガー作成

#------------------------------------------------------------------------------
# variable
#------------------------------------------------------------------------------
logfilename = os.path.basename(__file__).replace('.py', '.log')

#------------------------------------------------------------------------------
# function
#------------------------------------------------------------------------------
# メイン
def main(args):
    logger.debug("[Do] " + inspect.currentframe().f_code.co_name)
    url = args.url
    futures = []

    dwld_json(url)
    find_json()
    extract_json()

    max = 30
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=max)
    # concurrent execution
    for i in range(max):
        # wapper_enc_x(max, i)
        futures.append(executor.submit(wapper_enc_x, max, i))
        time.sleep(0.1)

    # wait
    for future in concurrent.futures.as_completed(futures):
        True

    return

# ダウンロード
def dwld_json(url):
    logger.debug("[Do] " + inspect.currentframe().f_code.co_name + url)
    global playlist

    if '/playlist' in url:
        format = "%(playlist_uploader)s\\%(playlist_title)s\\%(playlist_index)s - %(title)s.%(ext)s"
        playlist = "true"
    else:
        format = "%(uploader)s\\%(title)s.%(ext)s"
        playlist = "false"
    opt = "--skip-download --write-info-json --no-overwrites"
    cmd = "yt-dlp " + opt + " -o " + "\"" + format + "\"" + " " + url
    cmd = cmd + ' > ' + logfilename + ' 2>&1'

    logger.info("[CMD]> " + str(cmd))
    os.system(cmd)

    return

# jsonファイル検索
def find_json():
    logger.debug("[Do] " + inspect.currentframe().f_code.co_name)
    global json_list
    json_list = []

    # ['.//ハック大学\\仕事ができない人から学ぼう!\\0 - 仕事ができない人から学ぼう!.info.json', './/ハック大学\\仕事ができない人から学ぼう!\\1 - デキない人から学ぶ、伝える力.info.json', './/ハック大学\\仕事ができない人から学ぼう!\\2 - デキな い人から学ぶ、質問力.info.json', './/ハック大学\\仕事ができない人から学ぼう!\\2 - 仕事ができない人から学ぼう!.info.json']
    for f_json in glob.glob('**//*.json', recursive=True):
        json_list.append([f_json])

    # print(json_list)

    return

# jsonファイルから情報抽出
def extract_json():
    logger.debug("[Do] " + inspect.currentframe().f_code.co_name)
    global json_list_exist

    json_list_exist = []
    k = 0

    for i in range(len(json_list)):
        #print(json_list[i])
        with open(json_list[i][0], 'r', encoding="utf-8_sig") as f:
            j = json.load(f)
            if 'webpage_url' in j:
                webpage_url = j['webpage_url']
            else:
                logger.debug('webpage_url: ' + json_list[i][0])
                continue
            if 'playlist_uploader' in j:
                artist = j['playlist_uploader'].rstrip()
            else:
                logger.debug('playlist_uploader: ' + json_list[i][0])
                continue
            if 'playlist' in j:
                album = j['playlist']
            else:
                album = 'single'
            if 'title' in j:
                title = j['title']
            else:
                logger.debug('title: ' + json_list[i][0])
                continue
            if 'thumbnails' in j:
                thumbnail_url = j['thumbnails'][-2]['url']
            else:
                logger.debug('thumbanail: ' + json_list[i][0])
                continue
            if not 'format' in j:
                # this is playlistjson
                logger.debug('format: ' + json_list[i][0])
                continue

            mp3_file = json_list[i][0].replace('.info.json', '.mp3')
            jpg_file = json_list[i][0].replace('.info.json', '.jpg')
            tmp_file = json_list[i][0].replace('.info.json', '.tmp')

            json_list_exist.append(json_list[i])
            json_list_exist[k].append(webpage_url)
            json_list_exist[k].append(artist)
            json_list_exist[k].append(album)
            json_list_exist[k].append(title)
            json_list_exist[k].append(thumbnail_url)
            json_list_exist[k].append(mp3_file)
            json_list_exist[k].append(jpg_file)
            json_list_exist[k].append(tmp_file)
            #print(json_list_exist)
            k += 1
    #print(json_list_exist)

    return

# サムネイルダウンロード
def dwld_thumbnail(jpg_file, thumbnail_url):
    logger.debug("[Do] " + inspect.currentframe().f_code.co_name + " " + jpg_file + " " + thumbnail_url)
    urldata = requests.get(thumbnail_url).content

    with open(jpg_file ,mode='wb') as f:
        f.write(urldata)

    return

# mp3ダウンロード
def dwld_mp3(tmp_file, webpage_url):
    logger.debug("[Do] " + inspect.currentframe().f_code.co_name + " " + tmp_file + " " + webpage_url)
    format = tmp_file
    opt = "-ix --audio-format mp3 --quiet"
    cmd = "yt-dlp " + opt + " -o " + "\"" + format + "\"" + " " + webpage_url

    print("[CMD]> " + str(cmd))
    os.system(cmd)

    return

# アートワーク
def update_coverart(mp3_file, jpg_file):
    logger.debug("[Do] " + inspect.currentframe().f_code.co_name + " " + str(mp3_file) + " " + str(jpg_file))
    tags = ID3(mp3_file)
    with open(jpg_file, "rb") as img_file:
        cover_img_byte_str = img_file.read()
        tags.add(APIC(mime="image/jpeg", type=3, desc=u'Cover', data=cover_img_byte_str))
    tags.save()

    return

# 不要ファイル削除
def rm_file():
    logger.debug("[Do] " + inspect.currentframe().f_code.co_name)
    for i in range(len(json_list_exist)):
        artist = json_list_exist[i][2]

        f_list = glob.glob(artist.strip() + "**/**", recursive=True)
        for f in f_list:
            if '.mp3' not in f:
                if os.path.isfile(f):
                    os.remove(f)

    return

# 並列用のラッパー
def wapper_enc_x(max, b):
    logger.debug("[Do] " + inspect.currentframe().f_code.co_name + " " + str(max) + " " + str(b))
    json_len = len(json_list_exist)
    for i in range(json_len):
        if i % max == b:
            # print (i,b)
            json = json_list_exist[i][0]
            webpage_url = json_list_exist[i][1]
            artist = json_list_exist[i][2]
            album = json_list_exist[i][3]
            title = json_list_exist[i][4]
            thumbnail_url = json_list_exist[i][5]
            mp3_file = json_list_exist[i][6]
            jpg_file = json_list_exist[i][7]
            tmp_file = json_list_exist[i][8]
            #print(json, webpage_url, artist, album, title, thumbnail_url, mp3_file, jpg_file, tmp_file, playlist)

            try:
                playlist
            except:
                print(json)
                continue
            try:
                album
            except:
                print(json)
                continue

            if playlist == "true" and album == "single":
                 print(playlist, album)
                 continue

            if playlist == "false" and album != "single":
                 print(playlist, album)
                 continue

            if not os.path.isfile(mp3_file):
                dwld_mp3(tmp_file, webpage_url)
                dwld_thumbnail(jpg_file, thumbnail_url)
                update_metadata(mp3_file, title, album, artist)
                update_coverart(mp3_file, jpg_file)

    return

# メタデータ更新・追加・削除
def update_metadata(mp3_file, title, album, artist):
    logger.debug("[Do] " + inspect.currentframe().f_code.co_name + " " + str(mp3_file) + " " + str(title) + " " + str(album) + " " + str(artist))
    #print(file, title, album, artist)
    tags = EasyID3(mp3_file)
    tags["title"] = title
    tags["album"] = album
    tags["artist"] = artist
    tags.save()

    return

# 引数確認
def check_args():
    logger.debug("[Do] " + inspect.currentframe().f_code.co_name)

    global playlist
    argparser = argparse.ArgumentParser()
    argparser.add_argument('-u', '--url', type=str, required=TRUE)
    argparser.add_argument('--basedir', type=str, default='D:\\Dropbox\\02.プライベート\\音楽\\Youtube')

    args = argparser.parse_args()
    os.chdir(args.basedir)

    if '/playlist' in args.url:
        playlist = "true"
    else:
        playlist = "false"

    return(args)

#------------------------------------------------------------------------------
# main
#------------------------------------------------------------------------------
if __name__ == "__main__":
    logger.debug("[Do] " + inspect.currentframe().f_code.co_name)
    args = check_args()
    main(args)
    rm_file()

exit()
1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1