1.はじめに
- ①tar.gzファイル(複数のcsvファイルが入ってる)を解凍、②解凍ファイル中から必要なcsvを指定してBigQueryにロードする、という作業を現在colaborator上で行っている
- 毎回手動でやるのが面倒になってきたので、もっとうまい方法ないかと検討したところ、WorkflowとVertex AI Pipelineが候補に挙がる。
- どうもPipelineの方が色々自由度が高そうなので、そちらで実行してみた。
2.コード
- いろいろ試行錯誤して最後に実行できたコード
- 必要なライブラリのインストールや設定は省略するけど、後から見るとすっきりした書き方で便利なのがよくわかった。速度もcolaboratoryの時の5~10倍くらいですごく早く処理が終わる!
- 今持っている手動実行する系のものはこちらに移してしまおうと思えた。
*本来は機械学習のパイプラインも構築できるみたいなので、そちらも試す予定。
# componentを作る
@component(
packages_to_install=[
"google-cloud-storage",
"pandas",
],
base_image="python:3.9",
output_component_file="unzip_data.yaml"
)
def unzip_data(
PROJECT_ID : str,
bucket: str,
url: str,
date: str,
#stat: Output[stat],
)->str:
from google.cloud import storage
# 'Pulling' demo .csv data from a know location in GCS
storage_client = storage.Client(PROJECT_ID)
bucket = storage_client.get_bucket(bucket)
blob = bucket.blob(url)
blob.download_to_filename('temp.tar.gz')
import tarfile
# tar.gzファイルを開く
with tarfile.open('temp.tar.gz', 'r:gz') as tar:
tar.extractall()
#解凍ファイルのうち必要なファイルをGCSにアップロード
# 格納するGCSのPathを指定(/xxx/yyy.csv)的な
gcs_path = str(date) + "/***.csv"
blob_gcs = bucket.blob(gcs_path)
# ローカルのファイルパスを指定
local_path = "***.csv"
blob_gcs.upload_from_filename(local_path)
return date
@component(
packages_to_install=["google-cloud-bigquery","google-cloud-storage"],
base_image="python:3.9",
output_component_file="unzip_data.yaml"
)
def load_data_to_gbq(
inputd: str,
) -> None:
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client(project="***")
# TODO(developer): Set table_id to the ID of the table to create.
date= inputd
table_id = "my-project.my-dataset.my-table_{}".format(date)
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("uuid", "STRING"),
bigquery.SchemaField("name", "STRING"),
#~~色々
],
skip_leading_rows=1,
# The source format defaults to CSV, so the line below is optional.
source_format=bigquery.SourceFormat.CSV,
write_disposition=bigquery.job.WriteDisposition.WRITE_TRUNCATE
)
#unzip_dataでuploadしたファイルと同じ場所
uri = "gs://***/"+ str(date) + "/***.csv"
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config,
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = client.get_table(table_id) # Make an API request.
# Building the 'PIPELINE'
@dsl.pipeline(
pipeline_root=PIPELINE_ROOT,
name="***-pipeline", # Your own naming for the pipeline.
)
def my_pipeline(
PROJECT_ID: str = "my-project",
url: str = "***/***.tar.gz",
bucket: str = "***",
date : str = "20230901"
):
dataset_task = unzip_data(PROJECT_ID=PROJECT_ID,bucket=bucket, url=url,date=date)
load_data_to_gbq(inputd=dataset_task.output)
#dimensions = report_data(inputd=dataset_task.output)