0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

VertexAI pipelineでtar.gzファイルからデータ抽出&BigQueryに投入

Posted at

1.はじめに

  • ①tar.gzファイル(複数のcsvファイルが入ってる)を解凍、②解凍ファイル中から必要なcsvを指定してBigQueryにロードする、という作業を現在colaborator上で行っている
  • 毎回手動でやるのが面倒になってきたので、もっとうまい方法ないかと検討したところ、WorkflowとVertex AI Pipelineが候補に挙がる。
  • どうもPipelineの方が色々自由度が高そうなので、そちらで実行してみた。

2.コード

  • いろいろ試行錯誤して最後に実行できたコード
  • 必要なライブラリのインストールや設定は省略するけど、後から見るとすっきりした書き方で便利なのがよくわかった。速度もcolaboratoryの時の5~10倍くらいですごく早く処理が終わる!
  • 今持っている手動実行する系のものはこちらに移してしまおうと思えた。

*本来は機械学習のパイプラインも構築できるみたいなので、そちらも試す予定。

# componentを作る
@component(
    packages_to_install=[
        "google-cloud-storage",
        "pandas",
    ],
    base_image="python:3.9",
    output_component_file="unzip_data.yaml"
)
def unzip_data(
    PROJECT_ID : str,
    bucket: str,
    url: str,
    date: str,
    #stat: Output[stat],
)->str:
    from google.cloud import storage

    # 'Pulling' demo .csv data from a know location in GCS

    storage_client = storage.Client(PROJECT_ID)
    bucket = storage_client.get_bucket(bucket)
    blob = bucket.blob(url)
    blob.download_to_filename('temp.tar.gz')

    import tarfile

    # tar.gzファイルを開く
    with tarfile.open('temp.tar.gz', 'r:gz') as tar:
        tar.extractall()


    #解凍ファイルのうち必要なファイルをGCSにアップロード
    # 格納するGCSのPathを指定(/xxx/yyy.csv)的な
    gcs_path = str(date) + "/***.csv"
    blob_gcs = bucket.blob(gcs_path)
    # ローカルのファイルパスを指定
    local_path = "***.csv"
    blob_gcs.upload_from_filename(local_path)

    return date
    


@component(
    packages_to_install=["google-cloud-bigquery","google-cloud-storage"],
    base_image="python:3.9",
    output_component_file="unzip_data.yaml"
    )
def load_data_to_gbq(
    inputd: str,
    ) -> None:
    from google.cloud import bigquery

    # Construct a BigQuery client object.
    client = bigquery.Client(project="***")

    # TODO(developer): Set table_id to the ID of the table to create.

    date= inputd
    table_id = "my-project.my-dataset.my-table_{}".format(date)

    job_config = bigquery.LoadJobConfig(
    schema=[
    bigquery.SchemaField("uuid", "STRING"),
    bigquery.SchemaField("name", "STRING"),
    #~~色々
    ],
    skip_leading_rows=1,
    # The source format defaults to CSV, so the line below is optional.
    source_format=bigquery.SourceFormat.CSV,
    write_disposition=bigquery.job.WriteDisposition.WRITE_TRUNCATE

    )

    #unzip_dataでuploadしたファイルと同じ場所
    uri = "gs://***/"+ str(date) + "/***.csv"

    load_job = client.load_table_from_uri(
        uri, table_id, job_config=job_config,
    )  # Make an API request.

    load_job.result()  # Waits for the job to complete.

    destination_table = client.get_table(table_id)  # Make an API request.



# Building the 'PIPELINE'

@dsl.pipeline(
    pipeline_root=PIPELINE_ROOT,
    name="***-pipeline",  # Your own naming for the pipeline.
    )
def my_pipeline(
    PROJECT_ID: str = "my-project",
    url: str = "***/***.tar.gz",
    bucket: str = "***",
    date : str = "20230901"
    ):
    dataset_task = unzip_data(PROJECT_ID=PROJECT_ID,bucket=bucket, url=url,date=date)
    load_data_to_gbq(inputd=dataset_task.output)

    #dimensions = report_data(inputd=dataset_task.output)



0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?