はじめに
Youtube Data APIを使用し、データの取得からレポート作成までを自動化しました。
個人的に好きなYoutuberのデータを可視化したり分析したい!という思いがモチベーションとなっております。
Youtube Studioでは自分のチャンネルしか分析できないため、他チャンネルを分析したい時などに使用できるかもしれません。
今回、Youtube Data APIを使用するに当たって、APIキーを使って認証を行ったため全てのデータを取得することはできませんでした。もし自分のチャンネルに関するデータ取得を行う場合はOAuth2.0を使って認証を行うことをお勧めします。
Youtube Data APIを使ったデータの取得の方法
Youtube Data APIではチャンネル情報や動画の情報を取得することができます。
今回初めてYoutube Data APIを使用したため以下のサイトを参考にしていただきました。
- 【Python】YouTube Data API v3を利用した特定チャンネルの動画情報取得 | Analytics Board | python・Reactを勉強したい初心者のための入門サイト
- 【Python】YouTube Data API を使って、いろんな情報を取得してみた! - Qiita
構成
今回作成したアーキテクチャは以下の通りです。
- Cloud Schedulerでメッセージを発行する
- Pub/SubがCloud Functionsにメッセージを通知
- Cloud Functionsでデータを取得し、Cloud Storageに保存
- Cloud Storageにファイルがアップロードされたのをトリガーとして、Cloud Functionsが実行されBigQueryにデータをロード
- BigQueryと連携しデータポータルでレポート作成
以下でそれぞれのコンポーネントについて詳しく説明していきます。
※GCPを使用するため、プロジェクトを作成しておく必要があります。
Pub/Subのtopicを作成
今回、データ取得のFunctionsのトリガーをPub/Subのtopicとしたいので、事前にtopicを作成しておく必要があります。
以下のコマンドで簡単に作成できます。
$ gcloud pubsub topics create <topic_name>
以下で説明しますが、今回データ取得の関数を2つ定義し、かつschedulerで実行する頻度も異なるためtopicを2つ作成しておく必要があります。
Cloud Functions
次に、Cloud FunctionsでYoutubeのデータを取得する関数とGCSからBigQueryにデータをロードする関数を定義します。
今回は2つのデータ取得関数とGCSからBQにロードする関数の計3つの関数を作成しました。
- チャンネルの登録者数と総再生回数を取得する関数
- アップロードされている動画それぞれの詳細データを取得する関数
- GCSにファイルがアップロードされたのをトリガーとして、BigQueryのテーブルにロードする関数
上2つの関数で取得するデータは一つの関数で取得することもできますが、取得する頻度を変えたかったので、関数を分けました。
以下でそれぞれの関数について見ていきます。
チャンネルの登録者数と総再生回数を取得する関数
このfunctionは以下の6つのファイルで構成されています。
・main.py
・get_channel.py
・load_to_gcs.py
・settings.py
・.env
・requirements.txt
コードは以下の通りです。
※事前にCloud StorageにBucketを作成しておく必要があります。
main.pyには実際に実行したい関数を定義します。
from apiclient.discovery import build
from get_channel import GetChannel
import settings
def youtube(event, context):
# 設定
API_KEY = settings.API_KEY
YOUTUBE_API_SERVICE_NAME = 'youtube'
YOUTUBE_API_VERSION = 'v3'
channel_name = <チャンネルの名前>
youtube = build(
YOUTUBE_API_SERVICE_NAME,
YOUTUBE_API_VERSION,
developerKey=API_KEY,
cache_discovery=False
)
channel = GetChannel(youtube, channel_name)
channel.get_channel_details()
get_channel_basicはチャンネルのidを取ってくるメソッドです。
get_channel_detailsではチャンネルのidを元に、subscriberCountとviewCountを取得し、データ取得時間と合わせてpandasでSeries化し、load_to_gcs.pyのread_from_gcs_and_appendに渡しています。
from datetime import datetime
from apiclient.discovery import build
import pandas as pd
import load_to_gcs
class GetChannel(object):
def __init__(self, youtube, channel_name):
self.youtube = youtube
self.channel_name = channel_name
def get_channel_basic(self):
response = self.youtube.search()\
.list(q=self.channel_name, part='id,snippet', maxResults=1).execute()
for item in response.get('items', []):
if item['id']['kind'] == 'youtube#channel':
return item['id']['channelId']
def get_channel_details(self):
response = self.youtube.channels().list(
part = 'snippet,statistics',
id = self.get_channel_basic()
).execute()
for item in response.get("items", []):
if item["kind"] == "youtube#channel":
series = pd.Series([item['statistics']['subscriberCount'],
item['statistics']['viewCount'],
datetime.now().strftime('%Y-%m-%d')],
['subscriberCount', 'viewCount', 'date'])
load_to_gcs.read_from_gcs_and_append(series)
load_to_gcs.pyではまずbucketにあるcsvファイルを持ってきます。
その後pandasでデータフレーム化し、そのデータフレームに取得したデータを追加します。
最後にそのデータフレームをcsv化し、bucketにアップロードしています。
取得してきた、データをそのままオブジェクトとしてbucketにアップロードしても大丈夫ですが、毎日データを取得するかつ取得データが1行なので一つのオブジェクトを更新していく形で保存しています。
import datetime
from io import BytesIO
from google.cloud import storage as gcs
import pandas as pd
import settings
def read_from_gcs_and_append(series):
project_id = settings.project_id
bucket_name = settings.bucket_name
gcs_path = <bucket以下のGCSオブジェクトのpath> # csv形式
client = gcs.Client(project_id)
bucket = client.get_bucket(bucket_name)
blob_read = bucket.blob(gcs_path)
content_read = blob_read.download_as_string()
df = pd.read_csv(BytesIO(content_read))
df = df.append(series, ignore_index=True)
blob_write = bucket.blob(gcs_path)
blob_write.upload_from_string(df.to_csv(index=False, header=True), content_type='text/csv')
setting.pyではpython-dotenvを使ってAPI_KEYやproject_id、bucket_nameを環境変数として、osライブラリでそれらを取得するという処理を行っています。
これはgithubでコード管理する際を考慮しています。
import os
from os.path import join, dirname
from dotenv import load_dotenv
dotenv_path = join(dirname(__file__), '.env')
load_dotenv(dotenv_path)
API_KEY = os.environ.get('API_KEY')
project_id = os.environ.get('project_id')
bucket_name = os.environ.get('bucket_name')
.envにはAPI_KEYやproject_id、bucket_nameを記述しておきます。
API_KEY=<API_KEY>
project_id=<project_id>
bucket_name=<bucket_name>
requirements.pyではイントールしたいパッケージとバージョンを記述します。
Cloud Functionsでサードパーティライブラリを使用する場合は記述が必要です。
google-api-core==1.23.0
google-api-python-client==1.12.8
google-cloud-storage==1.33.0
google-auth==1.23.0
pandas==1.1.5
python-dotenv==0.15.0
アップロードされた動画それぞれの詳細データを取得する関数
このfunctionは以下の7つのファイルで構成されています。
・main.py
・get_channel.py
・get_video.py
・load_to_gcs.py
・settings.py
・.env
・requirements.txt
コードは以下の通りです。
※ settings.py、.env、requirements.pyは上記と同じコードなので記載してません。
main.pyには実際に実行したい関数を定義します。
from apiclient.discovery import build
from get_channel import GetChannel
from get_videos_test import GetVideo
import settings
def youtube(event, context):
# 設定
API_KEY = settings.API_KEY
YOUTUBE_API_SERVICE_NAME = 'youtube'
YOUTUBE_API_VERSION = 'v3'
channel_name = <チャンネルの名前>
youtube = build(
YOUTUBE_API_SERVICE_NAME,
YOUTUBE_API_VERSION,
developerKey=API_KEY,
cache_discovery=False
)
videos = GetVideo(youtube, channel_name)
videos.get_video_details_csv()
get_channel.pyではチャンネルidを取得する関数を定義します。
from apiclient.discovery import build
class GetChannel(object):
def __init__(self, youtube, channel_name):
self.youtube = youtube
self.channel_name = channel_name
def get_channel_basic(self):
response = self.youtube.search()\
.list(q=self.channel_name, part='id,snippet', maxResults=1).execute()
for item in response.get('items', []):
if item['id']['kind'] == 'youtube#channel':
return item['id']['channelId']
get_video.pyでは上記のGetChannelを継承したGetVideoというクラスを定義しています。
その中のget_video_basicメソッドでは全ての動画のidを取得し、リストで返します。
get_video_details_csvでは動画のidを元に、動画のtitle, viewCount, likeCount, dislikeCount, favoriteCount, commentCount, tags, publishedAt を取得し、データフレーム化しています。
その後、load_to_gcs.pyのload_to_gcs_csv関数にデータフレームを渡しています。
from apiclient.discovery import build
import pandas as pd
from google.cloud import storage as gcs
from get_channel import GetChannel
from load_to_gcs import load_to_gcs_json, load_to_gcs_csv
class GetVideo(GetChannel):
def get_video_basic(self):
nextPagetoken = None
nextpagetoken = None
video_ids = []
while True:
if nextPagetoken != None:
nextpagetoken = nextPagetoken
response = self.youtube.search().list(
part = "snippet",
channelId = self.get_channel_basic(),
maxResults = 50,
order = "date", #日付順にソート
pageToken = nextpagetoken
).execute()
for item in response.get("items", []):
if item["id"]["kind"] == "youtube#video":
video_ids.append(item["id"]["videoId"])
try:
nextPagetoken = response["nextPageToken"]
except:
break
return video_ids
def get_video_details_csv(self):
videos = []
for video_id in self.get_video_basic():
response = self.youtube.videos().list(
part = 'snippet,statistics',
id = video_id
).execute()
for item in response.get("items", []):
if item["kind"] == "youtube#video":
videos.append(
[item["snippet"]["title"],
item["statistics"]["viewCount"],
item["statistics"]["likeCount"],
item["statistics"]["dislikeCount"],
item["statistics"]["favoriteCount"] if "favoriteCount" in item["statistics"].keys() else None,
item["statistics"]["commentCount"] if "commentCount" in item["statistics"].keys() else None,
item["snippet"]["tags"] if "tags" in item["snippet"].keys() else None,
item["snippet"]["publishedAt"]
])
df_video = pd.DataFrame(videos,
columns=['title', 'viewCount', 'likeCount', 'dislikeCount', 'favoriteCount', 'commentCount', 'tags', 'publishAt'])
load_to_gcs_csv(df_video)
load_to_gcs.pyのload_to_gcs_csv関数では受け取ったデータフレーム化して、bucketにアップロードしています。
アップロードの際にオブジェクトの名前に日時を付けてわかりやすくしています。
また、Youtubeの動画情報は取得した日時によってデータが異なるので、過去のデータも使用できるようにCloud Storageに保存しておきます。ライフサイクル機能を使用して、一定期間を過ぎたオブジェクトは削除する設定を行った方が良いかもしれません。
import datetime
from google.cloud import storage as gcs
import pandas as pd
import settings
def load_to_gcs_csv(df):
now = datetime.datetime.now()
project_id = settings.project_id
bucket_name = settings.bucket_name
gcs_path = 'youtube-test/{}-test.csv'.format(now.strftime('%Y-%m-%d-%H:%M'))
client = gcs.Client(project_id)
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(gcs_path)
blob.upload_from_string(df.to_csv(index=False, header=True), content_type='text/csv')
GCSからBigQueryにロードする関数
このfunctionは以下の7つのファイルで構成されています。
・main.py
・settings.py
・.env
・requirements.txt
コードは以下の通りです。
※事前にBigQueryでデータセットと2つのテーブルを作成しておく必要があります
main.pyには実際に定義したい関数を定義します。
今回GCSのバケットは同じにしてあるので、取得してきたデータによって場合分けを行う必要があります。
総再生回数と登録者数を保存するオブジェクトの名前は常に同じなのでそれを条件に場合分けを行っています。
その後はbigqueryモジュールを使用してテーブルにデータを書き込んでいます。
その際、レポートに使用するデータは最新にしておきたいため、WRITE_TRUNCATEで最初から書き込みを行うようにしています。
from google.cloud import bigquery
import settings
def load_data(data, context):
# check content-type
if data['contentType'] != 'text/csv':
print('Not supported file type: {}'.format(data['contentType']))
return
# get file info
bucket_name = data['bucket']
file_name = data['name']
uri = 'gs://{}/{}'.format(bucket_name, file_name)
dataset_id = settings.dataset_id
if file_name == <総再生回数と登録者数のオブジェクト>:
table_id = settings.table_view_subscriber_id
else:
table_id = settings.table_video_id
client = bigquery.Client()
dataset_ref =client.dataset(dataset_id)
# Set Load Config
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.source_format = bigquery.SourceFormat.CSV
job_config.write_disposition = 'WRITE_TRUNCATE'
# Load data
load_job = client.load_table_from_uri(
uri, dataset_ref.table(table_id), job_config=job_config
)
print("Starting job {}".format(load_job.job_id))
load_job.result()
print("Job finished.")
import os
from os.path import join, dirname
from dotenv import load_dotenv
dotenv_path = join(dirname(__file__), '.env')
load_dotenv(dotenv_path)
dataset_id = os.environ.get('dataset_id')
table_view_subscriber_id = os.environ.get('table_view_subscriber_id')
table_video_id = os.environ.get('table_video_id')
dataset_id=<dataset_id>
table_video_id=<動画データテーブルid>
table_view_subscriber_id=<再生回数と登録者数テーブルid>
google-cloud-bigquery==1.24.0
python-dotenv==0.15.0
Cloud Scheduler
続いてCloud Schedulerでジョブを作成していきます。
Cloud Schedulerではcron形式で実行タイミングを設定できます。(詳しくはこちら)
上記で作成した、2つのデータ取得関数に対して別々にジョブを作成します。
まず、チャンネルの登録者数と総再生回数を取得する関数ですが、毎日00:00に実行するようなジョブを作成します。
設定は以下の通りです。
続いてアップロードされた動画それぞれの詳細データを取得する関数に対しては、毎月10日の00:00に実行するようなジョブを作成します。
以上で定期的にデータを取得できていると思います。
テストとして、頻度の設定を「*/15 * * * *」などにして15分間隔で実際に動かしてみても良いと思います。成功しているかどうかは実際にCloud StorageやBigQueryを見ても確認できますが、ロギングでログを確かめた方が良いと思います。
※テストが終了したら必ずジョブを止めるか、元の頻度に戻すかして下さい
データポータルでレポート作成
最後にデータポータルでレポートを作成します。
データポータル(旧Data Stuio)は無料で使用することができます。
こちらからデータポータルでレポートを作成できます。
今回取得したYoutubeデータからは以下のような可視化を行うことが可能です。
・当月の総再生回数の移り変わり(時系列グラフ)
・当月の登録者数の移り変わり(時系列グラフ)
・動画の再生回数ランキング(表形式)
・動画のいいね数ランキング(表形式)
・再生回数の累計グラフ(時系列グラフ)
上記などのグラフや図をを一つのレポートにまとめメール配信のスケジュールを設定することで、定期的にPDFでレポートが送信されます。
以下がスケジュールの設定方法です。
「共有」=>「メール配信のスケジュール」から定期的にレポートをメール配信できます。
今回は毎月10日にレポートが送られてくるようにします。
以上でデータの取得からレポート作成までの自動化は完了です。
最後に月一レポートの例を載せておきます。
※現時点では先月のデータが存在しないため、上記のレポートは見本として一部自作のデータを使用しています
おわりに
非常に読みづらかったと思いますが、最後まで読んでいただきありがとうございました。
今回、Youtube Data APIとGCPを使用してデータの取得からレポート作成までの自動化を行いました。
Youtubeのデータ以外にも、functionを書き変えれば適用できると思います。
もし、データ取得時などに前処理などを行いたいなどでコンポーネントが増えていく場合は、Cloud Composerなどでワークフローを定義した方が良いかもしれません。
間違いやわかりづらいところがありましたら、ご指摘のほどよろしくお願い致します。
また、もっと簡単な方法などがあれば教えていただけると幸いです。