自動取得した株価データをBigQueryに保存
取得の方法はこちら → Seleniumを使い株価データを自動取得
はじめに
CSVデータはバックアップの意味も含め Cloud Storageに保存
Cloud StorageからCSVデータを一時テーブルとして作成
作成した一時テーブルのレコードを加工して本テーブルを作成
Cloud Storageに保存
Cloud Storage、BigQueryを使うためパッケージをインストール
$ pip install google-cloud-bigquery
$ pip install google-cloud-storage
パッケージのインポート、Clientを作成
...
from google.cloud import bigquery, storage
from google.cloud.exceptions import NotFound
...
class YahooDownload:
def __init__(self):
self.str_client = storage.Client()
self.bq_client = bigquery.Client()
CSVアップロード
GCPコンソールでバケットを作成しておく、今回はyahoo-finance-downloadという名前でバケットを作成
...
def upload(self):
bucket = str_client.bucket("weekend-hackathon-stock")
for _, v in self.download_list.iterrows():
stock_no = v["stock_no"]
blob = bucket.blob(f"yahoo-finance-download/{stock_no}.T.csv")
blob.upload_from_filename(f"{self.download_dir}/{stock_no}.T.csv")
処理を追加
...
def main(self):
try:
self.login()
self.download()
self.upload()
pass
except TimeoutException as ex:
print(ex)
pass
except Exception as e:
print(e)
pass
finally:
time.sleep(5)
self.driver.close()
self.driver.quit()
BigQueryに保存
BigQueryにbqコマンドを使ってバケットを作成
$ bq --location=US mk --dataset --description "yahoo_finance_download dataset using" \
weekend-hackathon:yahoo_finance_download
ポイントとしては自動取得したCSVデータの日付フォーマットがBigQueryのDATEフォーマットと一致しないため一時テーブルに日付をSTRINGで作成した後に日付を PARSE_DATE('%Y/%m/%e', trade_at)
でパースして本テーブルにデータ追加しないといけない
取得CSV
日付,始値,高値,安値,終値,出来高,調整後終値
2021/1/22,423,425,423,424,5200,424
2021/1/21,425,425,422,422,2500,422
BigQuery追加
...
def save(self):
for _, v in self.download_list.iterrows():
stock_no = v["stock_no"]
# 一時テーブル準備
external_config = bigquery.ExternalConfig("CSV")
external_config.source_uris = [
f"gs://weekend-hackathon-stock/yahoo-finance-download/{stock_no}.T.csv"
]
external_config.schema = [
bigquery.SchemaField("trade_at", "STRING", "REQUIRED"),
bigquery.SchemaField("open", "FLOAT", "REQUIRED"),
bigquery.SchemaField("high", "FLOAT", "REQUIRED"),
bigquery.SchemaField("low", "FLOAT", "REQUIRED"),
bigquery.SchemaField("close", "FLOAT", "REQUIRED"),
bigquery.SchemaField("volume", "INTEGER", "REQUIRED"),
bigquery.SchemaField("adj_close", "FLOAT", "REQUIRED"),
]
external_config.options.skip_leading_rows = 1
table_id = f"tmp_{stock_no}"
# 一時テーブル参照結果を本テーブルに追加
job_config = bigquery.QueryJobConfig(
destination = (
f"weekend-hackathon.yahoo_finance_download.{stock_no}"
),
write_disposition="WRITE_TRUNCATE",
)
job_config.table_definitions = {table_id: external_config}
sql = f"SELECT PARSE_DATE('%Y/%m/%e', trade_at) AS trad_at, open, high, low, close, volume, adj_close FROM {table_id}"
query_job = self.bq_client.query(sql, job_config=job_config)
query_job.result()
処理を追加
...
def main(self):
try:
self.login()
self.download()
self.upload()
self.save()
pass
except TimeoutException as ex:
print(ex)
pass
except Exception as e:
print(e)
pass
finally:
time.sleep(5)
self.driver.close()
self.driver.quit()
次回はBigQueryのデータを加工、分析しStreamlitを使いアプリっぽくします
いいね!と思ったら LGTM お願いします
【PR】週末ハッカソンというイベントやってます! → https://weekend-hackathon.toyscreation.jp/about/