9
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Youtube Data APIとGCPでデータ取得からレポート作成までを自動化する

Last updated at Posted at 2020-12-21

はじめに

Youtube Data APIを使用し、データの取得からレポート作成までを自動化しました。
個人的に好きなYoutuberのデータを可視化したり分析したい!という思いがモチベーションとなっております。
Youtube Studioでは自分のチャンネルしか分析できないため、他チャンネルを分析したい時などに使用できるかもしれません。
今回、Youtube Data APIを使用するに当たって、APIキーを使って認証を行ったため全てのデータを取得することはできませんでした。もし自分のチャンネルに関するデータ取得を行う場合はOAuth2.0を使って認証を行うことをお勧めします。

Youtube Data APIを使ったデータの取得の方法

Youtube Data APIではチャンネル情報や動画の情報を取得することができます。
今回初めてYoutube Data APIを使用したため以下のサイトを参考にしていただきました。

構成

今回作成したアーキテクチャは以下の通りです。

image.png

  1. Cloud Schedulerでメッセージを発行する
  2. Pub/SubがCloud Functionsにメッセージを通知
  3. Cloud Functionsでデータを取得し、Cloud Storageに保存
  4. Cloud Storageにファイルがアップロードされたのをトリガーとして、Cloud Functionsが実行されBigQueryにデータをロード
  5. BigQueryと連携しデータポータルでレポート作成

以下でそれぞれのコンポーネントについて詳しく説明していきます。
※GCPを使用するため、プロジェクトを作成しておく必要があります。

Pub/Subのtopicを作成

今回、データ取得のFunctionsのトリガーをPub/Subのtopicとしたいので、事前にtopicを作成しておく必要があります。
以下のコマンドで簡単に作成できます。

$ gcloud pubsub topics create <topic_name>

以下で説明しますが、今回データ取得の関数を2つ定義し、かつschedulerで実行する頻度も異なるためtopicを2つ作成しておく必要があります。

Cloud Functions

次に、Cloud FunctionsでYoutubeのデータを取得する関数とGCSからBigQueryにデータをロードする関数を定義します。
今回は2つのデータ取得関数とGCSからBQにロードする関数の計3つの関数を作成しました。

  • チャンネルの登録者数と総再生回数を取得する関数
  • アップロードされている動画それぞれの詳細データを取得する関数
  • GCSにファイルがアップロードされたのをトリガーとして、BigQueryのテーブルにロードする関数

上2つの関数で取得するデータは一つの関数で取得することもできますが、取得する頻度を変えたかったので、関数を分けました。
以下でそれぞれの関数について見ていきます。

チャンネルの登録者数と総再生回数を取得する関数

このfunctionは以下の6つのファイルで構成されています。
・main.py
・get_channel.py
・load_to_gcs.py
・settings.py
・.env
・requirements.txt

コードは以下の通りです。
※事前にCloud StorageにBucketを作成しておく必要があります。


main.pyには実際に実行したい関数を定義します。

main.py
from apiclient.discovery import build

from get_channel import GetChannel
import settings


def youtube(event, context):
    # 設定
    API_KEY = settings.API_KEY
    YOUTUBE_API_SERVICE_NAME = 'youtube'
    YOUTUBE_API_VERSION = 'v3'    
    channel_name = <チャンネルの名前>

    youtube = build(
        YOUTUBE_API_SERVICE_NAME,
        YOUTUBE_API_VERSION,
        developerKey=API_KEY,
        cache_discovery=False
    )
    channel = GetChannel(youtube, channel_name)
    channel.get_channel_details()

get_channel_basicはチャンネルのidを取ってくるメソッドです。
get_channel_detailsではチャンネルのidを元に、subscriberCountとviewCountを取得し、データ取得時間と合わせてpandasでSeries化し、load_to_gcs.pyのread_from_gcs_and_appendに渡しています。

get_channel.py
from datetime import datetime

from apiclient.discovery import build
import pandas as pd

import load_to_gcs

class GetChannel(object):
    def __init__(self, youtube, channel_name):
        self.youtube = youtube    
        self.channel_name = channel_name

    def get_channel_basic(self):

        response = self.youtube.search()\
                       .list(q=self.channel_name, part='id,snippet', maxResults=1).execute()

        for item in response.get('items', []):
            if item['id']['kind'] == 'youtube#channel':
                return item['id']['channelId']
    
    def get_channel_details(self):   
        response = self.youtube.channels().list(
            part = 'snippet,statistics',
            id = self.get_channel_basic()
            ).execute()

        for item in response.get("items", []):
            if item["kind"] == "youtube#channel":
                series = pd.Series([item['statistics']['subscriberCount'], 
                                   item['statistics']['viewCount'],
                                   datetime.now().strftime('%Y-%m-%d')],
                                   ['subscriberCount', 'viewCount', 'date'])
                load_to_gcs.read_from_gcs_and_append(series)

load_to_gcs.pyではまずbucketにあるcsvファイルを持ってきます。
その後pandasでデータフレーム化し、そのデータフレームに取得したデータを追加します。
最後にそのデータフレームをcsv化し、bucketにアップロードしています。
取得してきた、データをそのままオブジェクトとしてbucketにアップロードしても大丈夫ですが、毎日データを取得するかつ取得データが1行なので一つのオブジェクトを更新していく形で保存しています。

load_to_gcs.py
import datetime
from io import BytesIO

from google.cloud import storage as gcs
import pandas as pd

import settings


def read_from_gcs_and_append(series):
    project_id = settings.project_id
    bucket_name = settings.bucket_name
    gcs_path = <bucket以下のGCSオブジェクトのpath> # csv形式

    client = gcs.Client(project_id)
    bucket = client.get_bucket(bucket_name)

    blob_read = bucket.blob(gcs_path)
    content_read = blob_read.download_as_string()

    df = pd.read_csv(BytesIO(content_read))
    df = df.append(series, ignore_index=True)

    blob_write = bucket.blob(gcs_path)
    blob_write.upload_from_string(df.to_csv(index=False, header=True), content_type='text/csv')

setting.pyではpython-dotenvを使ってAPI_KEYやproject_id、bucket_nameを環境変数として、osライブラリでそれらを取得するという処理を行っています。
これはgithubでコード管理する際を考慮しています。

settings.py
import os
from os.path import join, dirname
from dotenv import load_dotenv

dotenv_path = join(dirname(__file__), '.env')
load_dotenv(dotenv_path)

API_KEY = os.environ.get('API_KEY')
project_id = os.environ.get('project_id')
bucket_name = os.environ.get('bucket_name')

.envにはAPI_KEYやproject_id、bucket_nameを記述しておきます。

.env
API_KEY=<API_KEY>
project_id=<project_id>
bucket_name=<bucket_name>

requirements.pyではイントールしたいパッケージとバージョンを記述します。
Cloud Functionsでサードパーティライブラリを使用する場合は記述が必要です。

requirements.txt
google-api-core==1.23.0
google-api-python-client==1.12.8
google-cloud-storage==1.33.0
google-auth==1.23.0
pandas==1.1.5
python-dotenv==0.15.0

アップロードされた動画それぞれの詳細データを取得する関数

このfunctionは以下の7つのファイルで構成されています。
・main.py
・get_channel.py
・get_video.py
・load_to_gcs.py
・settings.py
・.env
・requirements.txt

コードは以下の通りです。
※ settings.py、.env、requirements.pyは上記と同じコードなので記載してません。

main.pyには実際に実行したい関数を定義します。

main.py
from apiclient.discovery import build

from get_channel import GetChannel
from get_videos_test import GetVideo
import settings


def youtube(event, context):
    # 設定
    API_KEY = settings.API_KEY
    YOUTUBE_API_SERVICE_NAME = 'youtube'
    YOUTUBE_API_VERSION = 'v3'    
    channel_name = <チャンネルの名前>

    youtube = build(
        YOUTUBE_API_SERVICE_NAME,
        YOUTUBE_API_VERSION,
        developerKey=API_KEY,
        cache_discovery=False
    )

    videos = GetVideo(youtube, channel_name)
    videos.get_video_details_csv()

get_channel.pyではチャンネルidを取得する関数を定義します。

get_channel.py
from apiclient.discovery import build

class GetChannel(object):
    def __init__(self, youtube, channel_name):
        self.youtube = youtube    
        self.channel_name = channel_name

    def get_channel_basic(self):

        response = self.youtube.search()\
                       .list(q=self.channel_name, part='id,snippet', maxResults=1).execute()

        for item in response.get('items', []):
            if item['id']['kind'] == 'youtube#channel':
                return item['id']['channelId']

get_video.pyでは上記のGetChannelを継承したGetVideoというクラスを定義しています。
その中のget_video_basicメソッドでは全ての動画のidを取得し、リストで返します。
get_video_details_csvでは動画のidを元に、動画のtitle, viewCount, likeCount, dislikeCount, favoriteCount, commentCount, tags, publishedAt を取得し、データフレーム化しています。
その後、load_to_gcs.pyのload_to_gcs_csv関数にデータフレームを渡しています。

get_video.py
from apiclient.discovery import build
import pandas as pd
from google.cloud import storage as gcs

from get_channel import GetChannel
from load_to_gcs import load_to_gcs_json, load_to_gcs_csv

class GetVideo(GetChannel):
    def get_video_basic(self):
        nextPagetoken = None
        nextpagetoken = None
        video_ids = []
        while True:
            if nextPagetoken != None:
                nextpagetoken = nextPagetoken
            response = self.youtube.search().list(
            part = "snippet",
            channelId = self.get_channel_basic(),
            maxResults = 50,
            order = "date", #日付順にソート
            pageToken = nextpagetoken
            ).execute()

            for item in response.get("items", []):
                if item["id"]["kind"] == "youtube#video":
                    video_ids.append(item["id"]["videoId"])
            try:
                nextPagetoken = response["nextPageToken"]
            except:
                break

        return video_ids
    
    def get_video_details_csv(self):
        videos = []
        for video_id in self.get_video_basic():
            response = self.youtube.videos().list(
                part = 'snippet,statistics',
                id = video_id
            ).execute()
            for item in response.get("items", []):
                if item["kind"] == "youtube#video":
                    videos.append(
                    [item["snippet"]["title"],
                     item["statistics"]["viewCount"],
                     item["statistics"]["likeCount"],
                     item["statistics"]["dislikeCount"],
                     item["statistics"]["favoriteCount"] if "favoriteCount" in item["statistics"].keys() else None,
                     item["statistics"]["commentCount"] if "commentCount" in item["statistics"].keys() else None,
                     item["snippet"]["tags"] if "tags" in item["snippet"].keys() else None,
                     item["snippet"]["publishedAt"]
                     ])
        df_video = pd.DataFrame(videos, 
                                columns=['title', 'viewCount', 'likeCount', 'dislikeCount', 'favoriteCount', 'commentCount', 'tags', 'publishAt'])
        load_to_gcs_csv(df_video)

load_to_gcs.pyのload_to_gcs_csv関数では受け取ったデータフレーム化して、bucketにアップロードしています。
アップロードの際にオブジェクトの名前に日時を付けてわかりやすくしています。
また、Youtubeの動画情報は取得した日時によってデータが異なるので、過去のデータも使用できるようにCloud Storageに保存しておきます。ライフサイクル機能を使用して、一定期間を過ぎたオブジェクトは削除する設定を行った方が良いかもしれません。

load_to_gcs.py
import datetime

from google.cloud import storage as gcs
import pandas as pd

import settings

def load_to_gcs_csv(df):
    now = datetime.datetime.now()
    project_id = settings.project_id
    bucket_name = settings.bucket_name
    gcs_path = 'youtube-test/{}-test.csv'.format(now.strftime('%Y-%m-%d-%H:%M'))

    client = gcs.Client(project_id)
    bucket = client.get_bucket(bucket_name)

    blob = bucket.blob(gcs_path)
    blob.upload_from_string(df.to_csv(index=False, header=True), content_type='text/csv')


GCSからBigQueryにロードする関数

このfunctionは以下の7つのファイルで構成されています。
・main.py
・settings.py
・.env
・requirements.txt

コードは以下の通りです。
※事前にBigQueryでデータセットと2つのテーブルを作成しておく必要があります


main.pyには実際に定義したい関数を定義します。
今回GCSのバケットは同じにしてあるので、取得してきたデータによって場合分けを行う必要があります。
総再生回数と登録者数を保存するオブジェクトの名前は常に同じなのでそれを条件に場合分けを行っています。
その後はbigqueryモジュールを使用してテーブルにデータを書き込んでいます。
その際、レポートに使用するデータは最新にしておきたいため、WRITE_TRUNCATEで最初から書き込みを行うようにしています。

main.py
from google.cloud import bigquery

import settings

def load_data(data, context):
    # check content-type
    if data['contentType'] != 'text/csv':
        print('Not supported file type: {}'.format(data['contentType']))
        return
    # get file info
    bucket_name = data['bucket']
    file_name = data['name']
    uri = 'gs://{}/{}'.format(bucket_name, file_name)
    
    dataset_id = settings.dataset_id
    if file_name == <総再生回数と登録者数のオブジェクト>:
        table_id = settings.table_view_subscriber_id
    else:
        table_id = settings.table_video_id
    client = bigquery.Client()
    dataset_ref =client.dataset(dataset_id)

    # Set Load Config
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.source_format = bigquery.SourceFormat.CSV
    job_config.write_disposition = 'WRITE_TRUNCATE'

    # Load data
    load_job = client.load_table_from_uri(
        uri, dataset_ref.table(table_id), job_config=job_config
    )
    print("Starting job {}".format(load_job.job_id))
    load_job.result()
    print("Job finished.")
settings.py
import os
from os.path import join, dirname
from dotenv import load_dotenv

dotenv_path = join(dirname(__file__), '.env')
load_dotenv(dotenv_path)

dataset_id = os.environ.get('dataset_id')
table_view_subscriber_id = os.environ.get('table_view_subscriber_id')
table_video_id = os.environ.get('table_video_id')
..env
dataset_id=<dataset_id>
table_video_id=<動画データテーブルid>
table_view_subscriber_id=<再生回数と登録者数テーブルid>
requirements.txt
google-cloud-bigquery==1.24.0
python-dotenv==0.15.0

Cloud Scheduler

続いてCloud Schedulerでジョブを作成していきます。
Cloud Schedulerではcron形式で実行タイミングを設定できます。(詳しくはこちら)

上記で作成した、2つのデータ取得関数に対して別々にジョブを作成します。
まず、チャンネルの登録者数と総再生回数を取得する関数ですが、毎日00:00に実行するようなジョブを作成します。
設定は以下の通りです。
image.png

続いてアップロードされた動画それぞれの詳細データを取得する関数に対しては、毎月10日の00:00に実行するようなジョブを作成します。
image.png
以上で定期的にデータを取得できていると思います。
テストとして、頻度の設定を「*/15 * * * *」などにして15分間隔で実際に動かしてみても良いと思います。成功しているかどうかは実際にCloud StorageやBigQueryを見ても確認できますが、ロギングでログを確かめた方が良いと思います。

※テストが終了したら必ずジョブを止めるか、元の頻度に戻すかして下さい

データポータルでレポート作成

最後にデータポータルでレポートを作成します。
データポータル(旧Data Stuio)は無料で使用することができます。
こちらからデータポータルでレポートを作成できます。

今回取得したYoutubeデータからは以下のような可視化を行うことが可能です。

・当月の総再生回数の移り変わり(時系列グラフ)
・当月の登録者数の移り変わり(時系列グラフ)
・動画の再生回数ランキング(表形式)
・動画のいいね数ランキング(表形式)
・再生回数の累計グラフ(時系列グラフ)

上記などのグラフや図をを一つのレポートにまとめメール配信のスケジュールを設定することで、定期的にPDFでレポートが送信されます。
以下がスケジュールの設定方法です。
「共有」=>「メール配信のスケジュール」から定期的にレポートをメール配信できます。
今回は毎月10日にレポートが送られてくるようにします。
image.png

以上でデータの取得からレポート作成までの自動化は完了です。
最後に月一レポートの例を載せておきます。
image.png
※現時点では先月のデータが存在しないため、上記のレポートは見本として一部自作のデータを使用しています

おわりに

非常に読みづらかったと思いますが、最後まで読んでいただきありがとうございました。
今回、Youtube Data APIとGCPを使用してデータの取得からレポート作成までの自動化を行いました。
Youtubeのデータ以外にも、functionを書き変えれば適用できると思います。
もし、データ取得時などに前処理などを行いたいなどでコンポーネントが増えていく場合は、Cloud Composerなどでワークフローを定義した方が良いかもしれません。
間違いやわかりづらいところがありましたら、ご指摘のほどよろしくお願い致します。
また、もっと簡単な方法などがあれば教えていただけると幸いです。

9
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?