LoginSignup
3
2

GCPでデータ基盤を構築し、可視化アプリを作成する

Last updated at Posted at 2023-06-21

はじめに

GCP環境で株価情報のスクレイピングを定期実行させ、データを GCS,BQ に保存し、株価をグラフ化するアプリの開発を行ったので備忘のためにも投稿します。

前提

  • GCP を利用できる環境があり、Google Cloud SDK を使用できる。
  • GCP でデータ基盤を構築する方法をメインで記載しますので、詳しいコーディングについて触れていないケースが多いです。最下部で参考サイトを紹介しているので、触れていない内容についてはそちらを参照ください。
  • 任意で変更できる箇所(ファイル名等)は「<>」で囲っていますので、任意で変更してください。
  • API が有効でない場合は、コマンド実行時に API を有効化するか聞かれるので、都度有効化してください。

前準備

前準備として、以下の作成を行います。

  • プロジェクト
  • config
  • GCS バケット
  • BQ データセット
  • サービスアカウント
  • インスタンス
  1. プロジェクトの作成
    local
    $ gcloud projects create <make-app-202306> -name <make-stock-chart>
    
    Create in progress for[https://cloudresourcemanager.googleapis.com/v1/projects/make-app-202306].
    Waiting for [operations/cp.5652924550004069807] to finish...done.         
    Enabling service [cloudapis.googleapis.com] on project [make-app-202306]...
    Operation "operations/acat.p2-28844578454-207c95a6-abd6-4735-af9d-d22b3d97bcab" finished successfully.
    
    
  2. config の作成

    今回の実装のための config を作成しておきます。

    local
    $ gcloud config configurations create <config-stock-app-qiita>
    
    Created [config-stock-app-qiita].
    Activated [config-stock-app-qiita].
    
  3. config の設定
    1. プロジェクトの紐づけ

      local
      $ gcloud config set projects <make-app-202306>
      
      Updated property [core/project].
      
    2. アカウントの紐づけ

      local
      $ gcloud config set account <abc@gmail.com>
      
      Updated property [core/account].
      
    3. デフォルトゾーンの設定

      local
      $ gcloud config set compute/zone asia-northeast1-a
      
      Updated property [compute/zone].
      

    以下のコマンドで config の一覧を取得することができます。IS_ACTIVETrueになっている config が現在ログインしている config です。

    local
    $ gcloud config configurations list
    
    NAME                    IS_ACTIVE  ACCOUNT                     PROJECT           COMPUTE_DEFAULT_ZONE  COMPUTE_DEFAULT_REGION
    config-stock-app-qiita  True       abc@gmail.com  make-app-202306   asia-northeast1-a
    
  4. 支払いアカウントの紐付け
    local
    $ gcloud alpha billing accounts projects link <make-app-202306> \
     --billing-accounts <010000-AAAAAA-BBBBBB>
    
    billingAccountName: billingAccounts/010000-AAAAAA-BBBBBB
    billingEnabled: true
    name: projects/make-app-202306/billingInfo
    projectId: make-app-202306
    

    アカウントIDについては以下のコマンドで取得することができます。

    local
    $ gcloud alpha billing accounts list
    
    ACCOUNT_ID            NAME              OPEN  MASTER_ACCOUNT_ID
    010000-AAAAAA-BBBBBB  請求先アカウント  True
    
    
  5. GCS バケットの作成

    スクレイピングしたデータを保存しておくためのバケットを作成します。

    local
    $ gsutil mb -l asia-northeast1 gs://<stock-datalake>
    
    Creating gs://stock-datalake/...
    

    以下のコマンドでバケットの一覧を取得することができます。

    $ gsutil ls
    
  6. BQ データセットの作成

    スクレイピングしたデータを保存しておくためのデータセットを作成します。

    local
    $ bq mk --dataset <make-app-202306>:<stock_tables>
    
    Dataset 'make-app-202306:stock_tables' successfully created.
    
  7. ファイアウォールの作成

    ブラウザから Jupyter Lab にアクセスできるようにするためファイアウォールを作成しておきます。

    local
    $ gcloud compute firewall-rules create <fw-jupyter> \
     --direction ingress \
     --action allow \
     --target-tags <tcp8000> \
     --rules tcp:8000
    
    Creating firewall...⠹Created [https://www.googleapis.com/compute/v1/projects/make-app-202306/global/firewalls/fw-jupyter].
    Creating firewall...done.                                                 
    NAME        NETWORK  DIRECTION  PRIORITY  ALLOW     DENY  DISABLED
    fw-jupyter  default  INGRESS    1000      tcp:8000        False
    
    
  8. インスタンスの作成
    local
    $ gcloud compute instances create <stock-app-vm> \
     --machine-type e2-micro \
     --scopes cloud-platform \
     --tags <tcp8000>
    
    
    Created [https://www.googleapis.com/compute/v1/projects/make-app-202306/zones/asia-northeast1-a/instances/stock-app-vm].
    NAME          ZONE               MACHINE_TYPE  PREEMPTIBLE  INTERNAL_IP  EXTERNAL_IP     STATUS
    stock-app-vm  asia-northeast1-a  e2-micro                   10.146.0.2   35.187.217.248  RUNNING
    

    インスタンスの作成時は同時に起動もされていますが、以下のコマンドでインスタンスの起動と停止を行うことができます。

    local
    $ gcloud compute instances <start or stop> <stock-app-vm>
    
    Starting instance(s) stock-app-vm...done.                                 
    Updated [https://compute.googleapis.com/compute/v1/projects/make-app-202306/zones/asia-northeast1-a/instances/stock-app-vm].
    Instance internal IP is 10.146.0.2
    Instance external IP is 35.187.217.248
    or
    Stopping instance(s) stock-app-vm...done.                                 
    Updated [https://compute.googleapis.com/compute/v1/projects/make-app-202306/zones/asia-northeast1-a/instances/stock-app-vm].
    
    

インスタンスの環境構築

基盤構築やアプリ開発には不要なのですが、pythonファイルの挙動を確認するためにも Jupyter が使える環境があると便利なので、インスタンス内に Docker をインストールし、Jupyter 環境を構築します。

  1. インスタンスへSSH接続
    local
    $ gcloud compute ssh --project <make-app-202306> <stock-app-vm>
    
  2. パッケージの更新
    instance
    sato@stock-app-vm:~$ sudo apt update
    
    Hit:1 https://deb.debian.org/debian bullseye InRelease
    ~中略~
    Get:11 https://packages.cloud.google.com/apt cloud-sdk-bullseye/main amd64 Packages [313 kB]
    Fetched 1075 kB in 1s (1234 kB/s)
    Reading package lists... Done
    Building dependency tree... Done
    Reading state information... Done
    
  3. Docker のインストール
    instance
    sato@stock-app-vm:~$ sudo apt install -y docker.io
    
    Reading package lists... Done
    Building dependency tree... Done
    Reading state information... Done
    The following additional packages will be installed:
    ~中略~
    Processing triggers for man-db (2.9.4-2) ...
    Processing triggers for libc-bin (2.31-13+deb11u6) ...
    

    Docker インストール直後は dockerコマンドが使用できないため、一度 exit でインスタンスから抜けて、再度 SSH接続後に以下手順を行うようにしてください。

  4. ユーザーを docker group へ追加
    instance
    sato@stock-app-vm:~$ sudo gpasswd -a <sato> docker
    
    Adding user sato to group docker
    

    ユーザー名については以下のコマンドで取得することができます。

    sato@stock-app-vm:~$ who
    
    sato  pts/0        2023-06-20 06:06 (0.0.0.0)
    
  5. ディレクトリ環境の作成

    インスタンス内に以下のようなディレクトリ構造を作成します。<main>ディレクトリは空ディレクトリで良いです。環境構築していく中でインストールしたライブラリ等をこのディレクトリに保管し、Docker を立ち上げるたびにマウントさせます。

    instance
    ~/<docker_env>/
        ├ <main>/
        └ Dockerfile
    
  6. Dockerfile の作成
    Dockerfile
    FROM ubuntu:latest
    RUN mkdir /work
    RUN apt update  && apt install -y python3 python3-pip \
        && pip3 install jupyterlab urllib3 pandas streamlit altair yfinance \
    	db-dtypes google-api-core google-cloud-storage google-cloud-bigquery 
    WORKDIR /work
    CMD ["jupyter", "lab", "--ip=0.0.0.0", "--allow-root", "--LabApp.token=''", "--port=8080"]
    
  7. Dockerイメージの作成
    instance
    sato@stock-app-vm:~/docker_env$ docker build -t <py_env> .
    
    Sending build context to Docker daemon   2.56kB
    Step 1/5 : FROM ubuntu:latest
    ~中略~
    Successfully built 6590430bcf3c
    Successfully tagged py_env:latest
    
  8. Dockerコンテナの起動
    instance
    sato@stock-app-vm:~/docker_env$ docker run -d -p 8000:8080 -v ~/<docker_env>/<main>/:/work/ <py_env>
    
    2151383f8c2e2d8b9a0dc6ca0a15cf9adaa443b9d5468491bbe0308ed4e132ea
    

    Dockerコンテナは以下のコマンドで停止させることができます。

    instance
    sato@stock-app-vm:~/docker_env$ docker stop <2151383f8c2e>
    

    コンテナIDは以下のコマンドで取得することができます(本来は稼働しているコンテナを表示するコマンドです)。

    instance
    sato@stock-app-vm:~/docker_env$ docker ps
    
  9. Jupyter Labの起動

    ブラウザで<35.200.4.12>:8000にアクセスすると、Jupyter Lab が開きます。
    外部IPアドレスについては以下のコマンドで取得することができます。

    local
    $ gcloud compute instances list
    
    NAME          ZONE               MACHINE_TYPE  PREEMPTIBLE  INTERNAL_IP  EXTERNAL_IP  STATUS
    stock-app-vm  asia-northeast1-a  e2-micro                   10.146.0.3   35.200.4.12  RUNNING
    

関数の実装

スクレイピングを行い GCS にデータを保存する関数①と、GCS のデータに更新があったら BQ のテーブルを更新する関数②を GCF にデプロイします。前者の関数については定期実行されるように設定します。

  1. 関数①の実装

    yfinance API を使用して、GAFA の株価を取得します。毎週土曜日9時に過去5日間の株価を取得し、企業ごとのCSVファイルを GCS に保存するようにします。
    関数の作成やデプロイはローカル環境で行いますが、挙動を確認するために Docker で Jupyter Lab 環境を作成したので、そちらでエラーが出ないか等確認してみてください。

    1. ディレクトリ環境の作成
      以下のようなディレクトリ構造をローカルに作成します。

      local
      ~/<pubsub>/
          ├ main.py
          └ requirements.txt
      
    2. pythonファイルの作成

      main.py
      import datetime
      import yfinance as yf
      from google.cloud import storage as gcs
      
      project_id = "<make-app-202306>"
      bucket_name = "<stock-datalake>"
      
      tckr_list = ["aapl", "amzn", "googl", "meta"]
      
      def <data_to_gcs>(event,context):
          for tckr in tckr_list:
              company = yf.Ticker(tckr)
              company_hist = company.history(period = "5d")
              company_hist = company_hist.reset_index()
          
              today = datetime.date.today()
              monday = today - datetime.timedelta(days = 5)
              friday = today - datetime.timedelta(days = 1)
              
              client = gcs.Client(project_id)
              bucket = client.get_bucket(bucket_name)
              blob_gcs = bucket.blob("{}_data_{}_{}".format(tckr,monday,friday))
          
              blob_gcs.upload_from_string(
                  data = company_hist.to_csv(index = False)
              )
      
    3. requirements.txt の作成

      requirements.txt
      google-cloud-storage==<2.9.0>
      yfinance==<0.2.18>
      
    4. GCF へデプロイ

      local
      pubsub $ gcloud functions deploy <data_to_gcs> \
       --runtime python39 \
       --trigger-topic <upload-gcs> \
       --region asia-northeast1
      
      Deploying function (may take a while - up to 2 minutes)...⠼
      For Cloud Build Logs, visit: https://console.cloud.google.com/cloud-build/builds;region=asia-northeast1/d55a9db1-46b6-41ca-aad6-269e8c4ec8aa?project=28844578454
      Deploying function (may take a while - up to 2 minutes)...done. 
      ~中略~
      updateTime: '2023-06-21T02:41:57.105Z'
      versionId: '1'
      
    5. Cloud Scheduler で定期実行

      local
      $ gcloud scheduler jobs create pubsub <scraping-stock> \
       --schedule "0 9 * * 6" \
       --time-zone Asia/Tokyo \
       --topic <upload-gcs> \
       --message-body "<get stock data>" \
       --location asia-northeast1
      
      name: projects/make-app-202306/locations/asia-northeast1/jobs/scraping-stock
      pubsubTarget:
        data: Z2V0IHN0b2NrIGRhdGE=
        topicName: projects/make-app-202306/topics/upload-gcs
      retryConfig:
        maxBackoffDuration: 3600s
        maxDoublings: 16
        maxRetryDuration: 0s
        minBackoffDuration: 5s
      schedule: 0 9 * * 6
      state: ENABLED
      timeZone: Asia/Tokyo
      userUpdateTime: '2023-06-21T02:50:18Z'
      
  2. 関数②の実装

    GCS に新しい CSV が保存された際に、その CSV を読み込んで BQ テーブルにデータを追加します。
    こちらについても関数の作成やデプロイはローカル環境で行いますが、挙動を確認するために Jupyter Lab でエラーが出ないか等確認してみてください。

    1. ディレクトリ環境の作成
      以下のようなディレクトリ構造をローカルに作成します。

      local
      ~/<trigger>/
          ├ main.py
          └ requirements.txt
      
    2. pythonファイルの作成

      main.py
      import pandas as pd
      import datetime
      from io import BytesIO
      from google.cloud import storage as gcs
      from google.cloud import bigquery as bq
      from google.api_core.exceptions import Conflict
      
      project_id = "<make-app-202306>"
      bucket_name = "<stock-datalake>"
      dataset_name = "<stock_tables>"
      
      
      def data_to_bq(data,context):
          bucket_name = data["bucket"]
          file_name = data["name"]
          tckr = file_name.split("_")[0]
          
          table_name = project_id + "." + dataset_name + "." + "{}_table".format(tckr)
          schema = [
              bq.SchemaField("Date", "TIMESTAMP", mode="REQUIRED"),
              bq.SchemaField("Open", "FLOAT", mode="REQUIRED"),
              bq.SchemaField("High", "FLOAT", mode="REQUIRED"),
              bq.SchemaField("Low", "FLOAT", mode="REQUIRED"),
              bq.SchemaField("Close", "FLOAT", mode="REQUIRED"),
              bq.SchemaField("Volume", "FLOAT", mode="REQUIRED"),
              bq.SchemaField("Dividends", "FLOAT", mode="REQUIRED"),
              bq.SchemaField("Stock Splits", "FLOAT", mode="REQUIRED"),
          ]
      
          client_bq = bq.Client(project_id)
          table = bq.Table(table_name, schema = schema)
          
          try:
              client_bq.create_table(table)
          except Conflict:
              pass
      
          
          today = datetime.date.today()
          monday = today - datetime.timedelta(days = 5)
          friday = today - datetime.timedelta(days = 1)
      
          client_gcs = gcs.Client(project_id)
          bucket = client_gcs.get_bucket(bucket_name)
          blob_gcs = bucket.blob("{}_data_{}_{}".format(tckr,monday,friday))
          df_data = blob_gcs.download_as_bytes()
          df = pd.read_csv(BytesIO(df_data))
      
          client_bq.insert_rows(table, df.values.tolist())
      
    3. requirements.txt の作成

      requirements.txt
      google-api-core==<2.11.0>
      google-cloud-bigquery==<3.11.1>
      google-cloud-storage==<2.9.0>
      pandas==<2.0.2>
      
    4. GCF へデプロイ

      local
      trigger $ gcloud functions deploy <data_to_bq> \
       --runtime python39 \
       --trigger-resource <stock-datalake> \
       --trigger-event google.storage.object.finalize \
       --region asia-northeast1
      
      Deploying function (may take a while - up to 2 minutes)...⠼                    
      For Cloud Build Logs, visit: https://console.cloud.google.com/cloud-build/builds;region=asia-northeast1/fb4b823d-9584-40d7-96de-c5062cbb1e91?project=28844578454
      Deploying function (may take a while - up to 2 minutes)...done.
      ~中略~
      updateTime: '2023-06-21T05:00:51.519Z'
      versionId: '1'
      

    Jupyter Lab 内で以下の test.jpynb を実行し、GCS に CSVファイル、BQ にテーブルが作成されていれば上手く実装できています。

    ```local:test.jpynb
     # [1]
     import datetime
     import yfinance as yf
     from google.cloud import storage as gcs
    
     # [2]
     project_id = "make-app-202306"
     bucket_name = "stock-datalake"
     tckr_list = ["aapl", "amzn", "googl", "meta"]
    
     # [3]
     for tckr in tckr_list:
         company = yf.Ticker(tckr)
         company_hist = company.history(period = "5d")
         company_hist = company_hist.reset_index()
     
         today = datetime.date.today()
         monday = today - datetime.timedelta(days = 5)
         friday = today - datetime.timedelta(days = 1)
         
         client = gcs.Client(project_id)
         bucket = client.get_bucket(bucket_name)
         blob_gcs = bucket.blob("{}_data_{}_{}".format(tckr,monday,friday))
     
         blob_gcs.upload_from_string(
             data = company_hist.to_csv(index = False)
         )
    
    
    

アプリの実装

BQ のテーブルを参照し、データを可視化するアプリを Cloud Run で作成します。

  1. ディレクトリ環境の作成

    以下のようなディレクトリ構造をローカルに作成します。

    local
    ~/<app_deploy>/
        ├ app.py
        ├ requirements.txt
        └ Prockfile
    
  2. app.py の作成
    app.py
    import pandas as pd
    import altair as alt
    import streamlit as st
    from google.cloud import bigquery as bq
    
    
    @st.cache_data
    def get_data(tckrs):
        df_all = pd.DataFrame()
        
        for company in tckrs:
            query = """
                select
                    Date
                    ,Close as {}
                from
                    `{}.{}_table`
                order by Date
            """.format(
                    company.upper()
                    ,dataset_name
                    ,company
                )
            df = client_bq.query(query).to_dataframe()
            df.drop_duplicates(inplace = True)
            df.set_index("Date", inplace = True)
            df.index = df.index.strftime("%Y/%m/%d")
            df = df.T
            df.index.name = "Ticker"
            
            df_all = pd.concat([df_all, df], axis = 0)
            df_all.dropna(axis = 1, inplace = True)
            
        return df_all
    
    try:
        project_id = "<make-app-202306>"
        client_bq = bq.Client(project_id)
        dataset_name = "<stock_tables>"
        tckrs = ["googl", "aapl", "meta", "amzn"]
        
        df = get_data(tckrs)
        st.title("米国株価可視化アプリ")
        st.sidebar.write('''
        # GAFA株価
        こちらは株価可視化ツールです。以下のオプションから表示日数を指定
        ''')
        max_cloumn = len(df.columns) - 1
        st.sidebar.write('''
        ## 表示日数選択
        ''')
        days = st.sidebar.slider("日数", 1, max_cloumn, min(max_cloumn, 30))
        
        
        st.write(f'''
        過去**{days}日間**のGAFA株価
        ''')
        
        st.sidebar.write('''
        ## 株価の範囲指定
        ''')
        ymin, ymax = st.sidebar.slider("範囲を指定してください", 0.0, 500.0, (0.0, 300.0))
        
        companies = st.multiselect(
            "銘柄を選択してください。",
            list(df.index),
            ["GOOGL","AAPL", "META", "AMZN"]
        )
        if not companies:
            st.error("少なくとも1銘柄は選んでください。")
        else:
            data = df.loc[companies]
            data = df.iloc[:, -days:]
            st.write("### 株価(USD)", data.sort_index())
            data = data.T.reset_index()
            data = pd.melt(data, id_vars = ["Date"]).rename(columns = {"value" : "Stock Prices(USD)"})
            chart = (
                alt.Chart(data)
                .mark_line(opacity = 0.8, clip = True)
                .encode(x = "Date:T",
                        y = alt.Y("Stock Prices(USD):Q", stack = None, scale = alt.Scale(domain = [ymin, ymax])),
                        color = "Ticker:N"
                ) 
            )
        st.altair_chart(chart, use_container_width = True)
    except:
        st.error(
            "エラーが起きています。"
        )
    
  3. requirements.txt の作成
    requirements.txt
    pandas == 2.0.2
    altair == 5.0.1
    streamlit == 1.23.1
    db-dtypes == 1.1.1
    google-cloud-bigquery == 3.11.1
    
  4. Procfile の作成
    Procfile
    web: streamlit run app.py --server.port ${PORT:-8080}
    
  5. Cloud Runへ デプロイ
    local
    app_deploy $ gcloud run deploy <stock-chart-app>
     --region asia-northeast1
     --source .
     --allow-unauthenticated --quiet
    

    Artifact Registry APICloud Run Admin APIが有効である必要があるため、これらのリソースの API を有効化してから実行してください。

    Cloud Run コンソール上に記載されている URL にアクセスし、以下のようなページが表示されれば実装完了です。
    スクリーンショット 2023-06-21 15.22.21.png

参考サイト

データ分析のための基盤構築
Cloud Functions で Python のプログラムを定期実行する
爆速で5つのPython Webアプリを開発
Streamlit with Google Cloud: Hello, world!

3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2