2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Youtube APIデータをBigQueryへロードする作業を自動化する

Posted at

初めに

初学者である私がこちらの記事をかなり参考にして
データ基盤を構築した際の備忘録となっています。
初めてデータ基盤を構築したい方のためにできるだけ詳細に記載していますので、
周りくどい内容であることをご承知ください。

##前提条件
Google Cloud Platformのアカウントを取得している
・YouTubeAPIを取得している

YouTube APIの取得に関しては以下の記事でわかりやすく説明されています!
【Python】YouTube Data API を使って、いろんな情報を取得してみた!

0.設計したアーキテクチャ

GCP アーキテクチャ.png

  1. Cloud Schedulerでメッセージを発行する
  2. Pub/SubがCloud Functionsにメッセージを通知
  3. Cloud Functionsでデータを取得し、Cloud Storageに保存
  4. Cloud Storageにファイルがアップロードされたのをトリガーに、
    Cloud Functionsが実行されBigQueryにデータをロード
  5. BigQueryと連携しデータポータルでレポート作成

以下でそれぞれのコンポーネントについて詳しく説明していきます。
※GCPを使用するため、プロジェクトを作成しておく必要があります。

1.Pub/Subのトピックおよびサブスクリプションを作成

順番が前後しますが、Cloud Schedulerの設定のために
Pub/Subのトピックとサブスクリプションを作成する必要があります。

公式の手順に沿って作成していきます。
####1.1.GCP上でShellを起動
下図の赤丸の部分をクリックするとShellが起動されます。
スクリーンショット 2021-08-10 13.33.52.png

####1.2.トピックの作成
my-topic を含むトピックを作成されます。

gcloud pubsub topics create <my-topic>

####1.3.サブスクリプションの作成
my-sub を含むサブスクリプションを作成し、先ほど作成した my-topic に添付します。

gcloud pubsub subscriptions create <my-sub> --topic=<my-topic>

これでトピックおよびサブスクリプションの準備が完了です!

2.Cloud Schedulerでメッセージを発行

####2.1.ジョブを作成

※頻度に関してはこちらを参照してください。

Cloud Pub/Sub トピックは「1.2.トピックの作成」で作成したものを選択してください。

3.Cloud FunctionsでYouTubeデータを取得する関数を作成

####3.1.1. 事前に行っておくこと
1. GCS(Google Cloud Storage)にバケットを作成
2. 作成したバケットの中に**「'subscriberCount', 'viewCount', 'date'」**
のカラム名が入った空データのcsvファイルを保存(このcsvファイルにデータが保存されていく)

###3.1.「チャンネルの登録者数、総再生回数」を取得する関数
関数を作成から以下の通りに記載します(関数名は好きなもので結構です)。

Cloud Pub/Sub トピックは「1.2.トピックの作成」で作成したものを選択してください。

ランチタイム:python3.9(Pythonであればなんでも大丈夫です)
ソースコードは:インラインエディタ
エントリポイント:youtube(main.pyにある関数を定義します)
ちなみに関数youtubeの引数にはPub/Subから通知されたメッセージが入ります。

Cloud Functions内に以下の5つのファイルを作成します
・main.py
・requirements.txt
・get_channel.py
・load_to_gcs.py
・settings.py

main.py
from apiclient.discovery import build
from get_channel import GetChannel
import settings

def youtube(event, context):
    # 設定
    API_KEY = settings.API_KEY
    YOUTUBE_API_SERVICE_NAME = 'youtube'
    YOUTUBE_API_VERSION = 'v3'    
    channel_name = '<ここに取得したいチャンネル名を記載>'

    youtube = build(
        YOUTUBE_API_SERVICE_NAME,
        YOUTUBE_API_VERSION,
        developerKey=API_KEY,
        cache_discovery=False
    )
    channel = GetChannel(youtube, channel_name)
    channel.get_channel_details()
requirements.txt
google-api-core==1.23.0
google-api-python-client==1.12.8
google-cloud-storage==1.33.0
google-auth==1.23.0
pandas==1.1.5
python-dotenv==0.15.0
get_channel.py
from datetime import datetime
from apiclient.discovery import build
import pandas as pd
import load_to_gcs

class GetChannel(object):
    def __init__(self, youtube, channel_name):
        self.youtube = youtube    
        self.channel_name = channel_name

    def get_channel_basic(self):

        response = self.youtube.search()\
                       .list(q=self.channel_name, part='id,snippet', maxResults=1).execute()

        for item in response.get('items', []):
            if item['id']['kind'] == 'youtube#channel':
                return item['id']['channelId']

    def get_channel_details(self):   
        response = self.youtube.channels().list(
            part = 'snippet,statistics',
            id = self.get_channel_basic()
            ).execute()

        for item in response.get("items", []):
            if item["kind"] == "youtube#channel":
                series = pd.Series([item['statistics']['subscriberCount'], 
                                   item['statistics']['viewCount'],
                                   datetime.now().strftime('%Y-%m-%d')],
                                   ['subscriberCount', 'viewCount', 'date'])
                load_to_gcs.read_from_gcs_and_append(series)

load_to_gcs.py
import datetime
from io import BytesIO
from google.cloud import storage as gcs
import pandas as pd
import settings

def read_from_gcs_and_append(series):
    project_id = settings.project_id
    bucket_name = settings.bucket_name
    gcs_path = '<GCSに保存したファイル名>.csv' # csv形式

    client = gcs.Client(project_id)
    bucket = client.get_bucket(bucket_name)

    blob_read = bucket.blob(gcs_path)
    content_read = blob_read.download_as_string()

    df = pd.read_csv(BytesIO(content_read))
    df = df.append(series, ignore_index=True)

    blob_write = bucket.blob(gcs_path)
    blob_write.upload_from_string(df.to_csv(index=False, header=True), content_type='text/csv')
settings.py
API_KEY = '<自分のYouTube API を入力>'
project_id = '<自分のproject_idを入力>'
bucket_name = '<load_to_gcs.pyでcsvファイルを保存したbucket_nameを入力>'

###3.2.「アップされた動画の詳細情報」を取得する関数
Cloud Pub/Sub トピックは「1.2.トピックの作成」で作成したものを選択してください。
ランチタイム:python3.9(Pythonであればなんでも大丈夫です)
ソースコードは:インラインエディタ
エントリポイント:youtube(main.pyにある関数を定義します)
ちなみに関数youtubeの引数にはPub/Subから通知されたメッセージが入ります。

今回は以下の6つのファイルを作成します
・main.py
・requirements.txt
・get_channel.py
・load_to_gcs.py
・settings.py
・get_video.py

※ settings.py、requirements.pyは上記と同じコードなので記載してません。

main.py
from apiclient.discovery import build
from get_channel import GetChannel
from get_video import GetVideo
import settings

def youtube(event, context):
    # 設定
    API_KEY = settings.API_KEY
    YOUTUBE_API_SERVICE_NAME = 'youtube'
    YOUTUBE_API_VERSION = 'v3'    
    channel_name = '<ここに取得したいチャンネル名を記載>'

    youtube = build(
        YOUTUBE_API_SERVICE_NAME,
        YOUTUBE_API_VERSION,
        developerKey=API_KEY,
        cache_discovery=False
    )

    videos = GetVideo(youtube, channel_name)
    videos.get_video_details_csv()
get_channnel.py
from apiclient.discovery import build

class GetChannel(object):
    def __init__(self, youtube, channel_name):
        self.youtube = youtube    
        self.channel_name = channel_name

    def get_channel_basic(self):
        response = self.youtube.search().list(q=self.channel_name, 
                                              part='id,snippet',
                                              maxResults=1).execute()

        for item in response.get('items', []):
            if item['id']['kind'] == 'youtube#channel':
                return item['id']['channelId']
load_to_gcs.py
import datetime
from google.cloud import storage as gcs
import pandas as pd
import settings

def load_to_gcs_csv(df):
    now = datetime.datetime.now()
    project_id = settings.project_id
    bucket_name = settings.bucket_name
    gcs_path = '{}-test.csv'.format(now.strftime('%Y-%m-%d-%H:%M'))

    client = gcs.Client(project_id)
    bucket = client.get_bucket(bucket_name)

    blob = bucket.blob(gcs_path)
    blob.upload_from_string(df.to_csv(index=False, header=True), content_type='text/csv')
get_video.py
from apiclient.discovery import build
import pandas as pd
from google.cloud import storage as gcs
from get_channel import GetChannel
from load_to_gcs import load_to_gcs_csv

class GetVideo(GetChannel):
    def get_video_basic(self):
        nextPagetoken = None
        video_ids = []
        while True:
            if nextPagetoken != None:
                nextpagetoken = nextPagetoken
            response = self.youtube.search().list(
                             part = "snippet",
                             channelId = self.get_channel_basic(),
                             maxResults = 50,
                             order = "date", #日付順にソート
                             pageToken = nextpagetoken
                       ).execute()

            for item in response.get("items", []):
                if item["id"]["kind"] == "youtube#video":
                    video_ids.append(item["id"]["videoId"])
            try:
                nextPagetoken = response["nextPageToken"]
            except:
                break
        return video_ids

    def get_video_details_csv(self):
        videos = []
        for video_id in self.get_video_basic():
            response = self.youtube.videos().list(
                part = 'snippet,statistics',
                id = video_id
            ).execute()
            for item in response.get("items", []):
                if item["kind"] == "youtube#video":
                    videos.append(
                    [item["snippet"]["title"],
                     item["statistics"]["viewCount"],
                     item["statistics"]["likeCount"],
                     item["statistics"]["dislikeCount"],
                     item["statistics"]["favoriteCount"] if "favoriteCount" in item["statistics"].keys() else None,
                     item["statistics"]["commentCount"] if "commentCount" in item["statistics"].keys() else None,
                     item["snippet"]["tags"] if "tags" in item["snippet"].keys() else None,
                     item["snippet"]["publishedAt"]
                     ])
        df_video = pd.DataFrame(videos, 
                                columns=['title', 'viewCount', 'likeCount', 'dislikeCount', 'favoriteCount', 'commentCount', 'tags', 'publishAt'])
        load_to_gcs_csv(df_video)

###3.3.「GCSからBigQueryへロード」する関数
####事前に行っておくこと
BigQueryにdetasetおよびtable(2つ)を作成します。

Cloud Shell上から以下のコマンドでdatasetとtableを作成できます

PROJECT=<自分のproject ID>
DATASET=<作成するDetaset名>
bq --project_id ${PROJECT} mk ${DATASET}
bq --project_id ${PROJECT} mk --time_partitioning_type=DAY ${DATASET}.<作成するtable名1>
bq --project_id ${PROJECT} mk --time_partitioning_type=DAY ${DATASET}.<作成するtable名2>

作成できれば、以下のようにtableのスキーマを設定します
######・<上記で作成したtable名1>のスキーマ

######・<上記で作成したtable名2>のスキーマ

####関数の定義
Storageへの保存をトリガーにするため、
以下の通りに設定する。

ランチタイム:python3.9(Pythonであればなんでも大丈夫です)
ソースコードは:インラインエディタ
エントリポイント:youtube(main.pyにある関数を定義します)
ちなみに関数youtubeの引数にはPub/Subから通知されたメッセージが入ります。

今回は以下の3つのファイルを作成します
・main.py
・requirements.txt
・settings.py

main.py
from google.cloud import bigquery
import settings

def load_data(data, context):
    # check content-type
    if data['contentType'] != 'text/csv':
        print('Not supported file type: {}'.format(data['contentType']))
        return
    # get file info
    bucket_name = data['bucket']
    file_name = data['name']
    uri = 'gs://{}/{}'.format(bucket_name, file_name)

    dataset_id = settings.dataset_id
    if file_name == '<GCSに保存してあるcsvファイル名>':
        table_id = settings.<上記で作成したtable名1>
    else:
        table_id = settings.<上記で作成したtable名2>
    client = bigquery.Client()
    dataset_ref =client.dataset(dataset_id)

    # Set Load Config
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.source_format = bigquery.SourceFormat.CSV
    job_config.write_disposition = 'WRITE_TRUNCATE'

    # Load data
    load_job = client.load_table_from_uri(
        uri, dataset_ref.table(table_id), job_config=job_config
    )
    print("Starting job {}".format(load_job.job_id))
    load_job.result()
    print("Job finished.")
requirements.txt
google-cloud-bigquery==1.24.0
python-dotenv==0.15.0
settings.py
dataset_id = 'test_dataset'
table_view_subscriber_id = 'table_view_subscriber_id'
table_video_id = 'table_video_id'

##最後に
これでBigQueryへのロードまでを自動化することができたと思います!

データを取得するタイミングをずらしたい場合は、
Pub/Subのトピック・サブスクリプションを1つ追加して、
Cloud Schedulerの時間を指定することで簡単に変更することができます。

また、得られたデータはCloud Data Portalで簡単に可視化することができます。
興味のある方は是非チャレンジしてください!

初学者であるが故に至らぬ点が多くあると思いますので
ご指摘・修正点があればコメントください!
また、ご不明点があれば、お気軽にコメントしてください!

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?