LoginSignup
0
0

More than 1 year has passed since last update.

【GCP】GCSにあるZipfilesをBigqueryに~Dataflowによる実装

Posted at

一、背景

背景:AdobeAnalyticsのデータをBigqueryに蓄積して活用しようということで、1時間分のCSVデータをZipfiles形式で1時間ごとGoogleCloudStorageに転送されていて、その後Bigqueryにテーブルとして集約します。
元々はDataproc上のSparkJobで処理しているですが、コスパを考えてDataflowへの移行を決まりました。
実行するための権限周りについてはGoogleの公式ドキュメントまでご参照いただければ幸いです。

二、アーキテクチャ

今回のアーキテクチャは下記のようになります。
AAtoGCP-Dataflow_ver.jpg
GCS上に格納されるZIPファイルをトリガーとして、CloudFuntionsを起動し、その後Dataflowに送信してflexTemplatesのJobを実行し、CSVデータをBigqueryのテーブルにアウトプットします。

三、ApacheBeam&Dataflowの話

DataflowのフレームワークはApacheBeamです。
詳しい説明は割愛させていただきます。
自分は下記のサイトで色々を勉強しました。
https://beam.apache.org/get-started/
https://cloud.google.com/dataflow/docs/concepts/beam-programming-model?hl=ja

四、実装

1、DataflowのflexTemplatesを作成

①pipelineOptions&構成

import logging
import argparse

import apache_beam as beam
from apache_beam import DoFn
from apache_beam.io import fileio 
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery, BigQueryDisposition

#gcp config in local
job_name = "" 
project = ""  

class BeamOptions:
    def __init__(self,runner,pipeline_args):
        self.options = PipelineOptions(pipeline_args)
        # Setup Option
        self.options.view_as(SetupOptions).save_main_session = True
        # Standard Option
        self.options.view_as(StandardOptions).runner = runner

# run pipeline
def run(runner="DirectRunner", 
        pipeline_args=None,
        bucket_name = None,
        file_name = None,
        table_spec=None,
        table_schema=None,):
    beamoptions = BeamOptions(runner,pipeline_args)
    options = beamoptions.options
    # パイプラインの生成
    with beam.Pipeline(options=options) as pipeline:
        unzip_data = (
        pipeline
        | "read zipfile from gcs" >> beam.io.fileio.MatchFiles(f'gs://{bucket_name}/{file_name}')
        | "match zipfile" >> beam.io.fileio.ReadMatches()
        | "unzip csv" >> beam.Map(unzip)
        )
        trans_data = (
        unzip_data
        | "transform dict" >> beam.ParDo(convert_dict())
        )
        # BigQueryへデータ登録
        trans_data | "Write to BigQuery" >> WriteToBigQuery(
            table_spec,
            schema=table_schema,
            create_disposition=BigQueryDisposition.CREATE_NEVER,
            write_disposition=BigQueryDisposition.WRITE_APPEND,
        )       


#run
if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)

    #Arguments from the command line.
    parser = argparse.ArgumentParser()
    parser.add_argument('--bucket_name',
                dest='bucket_name',
                help='Input Bucket for the pipeline',
                default='')
    parser.add_argument('--file_name',
                dest='file_name',
                help='Input zipfile(s) for the pipeline',
                default='')
    parser.add_argument('--table_spec',
                dest='table_spec',
                help='output table_spec for the pipeline',
                default='')
    parser.add_argument('--schema_file',
                dest='schema_file',
                help='schema_file for the pipeline',
                default='')

    # Parse arguments.
    known_args, pipeline_args = parser.parse_known_args()
    bucket_name = known_args.bucket_name
    file_name = known_args.file_name
    table_spec = known_args.table_spec
    schema_file = get_schema_from_gcs(project_id=project,
                                  bucket_name=known_args.bucket_name,
                                  gcs_file_name=known_args.schema_file)

    #ParDo Functions
    convert_dict() 

    run(
        # runnerを指定する
        # "DirectRunner" ->ローカル実行, "DataflowRunner" ->Dataflow実行
        runner = "DataflowRunner", 
        pipeline_args = pipeline_args,
        bucket_name=bucket_name,
        file_name=file_name,
        table_spec = table_spec,
        table_schema = schema_file
    )

②unzip関数

#unzip function
def unzip(zips):
    import zipfile
    with zipfile.ZipFile(zips.open()) as zf:
        with zf.open(zf.namelist()[0]) as f:
            elements = f.readlines()[1:]
            return elements

③GCSのスキーマファイル取得

def get_schema_from_gcs(project_id, bucket_name, gcs_file_name):
    from google.cloud import storage
    import json
    client = storage.Client(project=project_id)
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(gcs_file_name)
    text = blob.download_as_string().decode()
    json_dict = json.loads(text)
    return json_dict

④dofn関数:CSV転換

#convert to dict
class convert_dict(DoFn):
  def process(self, elements):
    from datetime import datetime
    import re
    for element in elements:
        element = element.decode('utf-8').split("\n")[0]
        element_f = re.split('","|",',element)
        element_s = re.split(',',element_f[-1])
        element = element_f[0:-1] + element_s
        element = [i.replace('.00','') for i in element]
        dic = {
           "id": element[0],
           "number": element[1],
           "pages": element[2],
           # ...
            }
        yield dic

⑤Dockerfile&②reuquirements.txt
Dockerfile

FROM gcr.io/dataflow-templates-base/python39-template-launcher-base

ARG WORKDIR=/dataflow/template
RUN mkdir -p ${WORKDIR}
WORKDIR ${WORKDIR}

COPY  requirements.txt .
COPY  run.py .

ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/run.py"

RUN apt-get update && apt-get install -y
RUN pip install --upgrade pip
RUN pip install apache-beam[gcp]==2.44.0
RUN pip install -U -r ./requirements.txt

ENTRYPOINT ["/opt/google/dataflow/python_template_launcher"]

requirements.txt

google-cloud-storage
...(その他必要なライブラリ)

⑥metadata.json
metadata.jsonでは、パイプラインの外部変数コントロールするためのファイルです。

{
  "name": "your name",
  "description": "your desription",
  "parameters": [
  {
    "name": "bucket_name",
    "label": "Input bucket in gcs",
    "helpText": "Input Bucket for the pipeline.",
    "isOptional": true
  },
  {
    "name": "file_name",
    "label": "Input file from gcs",
    "helpText": "Input zipfile(s) for the pipeline",
    "isOptional": true
  },
  {
    "name": "table_spec",
    "label": "Bigquery table_spec",
    "helpText": "output table_spec for the pipeline",
    "isOptional": true
  },
  {
    "name": "schema_file",
    "label": "schema_file for aa in bigquery",
    "helpText": "schema_file for the pipeline",
    "isOptional": true
  }
]
}

2、CloudFuntionsを作成

①main.py

from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
from datetime import datetime
import os
import logging

def dataflow_job_start(data, context):
    # read from gcs
    PROJECTID = os.environ.get("your project_id") 
    file_name = data['name'] 
    logging.getLogger().setLevel(logging.INFO)
    logging.info(f"{file_name} is going to deal...")
    
    jobName = "your name"
    template = os.environ.get("your params-name")
    parameters = {
        'bucket_name': os.environ.get("bucket_name"), 
        'file_name': file_name,
        'table_spec': os.environ.get("your table_spec"), 
        'schema_file': os.environ.get("your schema_file"), 
    }

    credentials = GoogleCredentials.get_application_default()
    service  = build("dataflow","v1b3",credentials=credentials, cache_discovery=False)

    request = service.projects().locations().flexTemplates().launch(
        projectId=PROJECTID,
        location=os.environ.get("your region"),
        body={
            "launch_parameter": {
            "jobName": jobName,
            "parameters": parameters,
            "containerSpecGcsPath": template,
            "environment": {
                "tempLocation": os.environ.get("your tempLocation"),
                "subnetwork": os.environ.get("subnetwork")
                "serviceAccountEmail": os.environ.get("serviceAccountEmail")
            },
          }
        }
    )
    return request.execute()

②reuquirements.txt

google-api-python-client==1.7.8
oauth2client==4.1.3

最後

CLoudBuildとgithubの連携もやりたいですが(毎回Dockerfileをpushするのは面倒...)、それは別投稿させていただきます。
unzipで読み込んだデータでは全てメモリ中で処理するなので、分散処理されていないことがわかっていました。
またBigqueryへの書き込みがかなり制限限られているため、結局時間がかかりました。
改善策を出たらまた追記します。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0