一、背景
背景:AdobeAnalyticsのデータをBigqueryに蓄積して活用しようということで、1時間分のCSVデータをZipfiles形式で1時間ごとGoogleCloudStorageに転送されていて、その後Bigqueryにテーブルとして集約します。
元々はDataproc上のSparkJobで処理しているですが、コスパを考えてDataflowへの移行を決まりました。
実行するための権限周りについてはGoogleの公式ドキュメントまでご参照いただければ幸いです。
二、アーキテクチャ
今回のアーキテクチャは下記のようになります。
GCS上に格納されるZIPファイルをトリガーとして、CloudFuntionsを起動し、その後Dataflowに送信してflexTemplatesのJobを実行し、CSVデータをBigqueryのテーブルにアウトプットします。
三、ApacheBeam&Dataflowの話
DataflowのフレームワークはApacheBeamです。
詳しい説明は割愛させていただきます。
自分は下記のサイトで色々を勉強しました。
https://beam.apache.org/get-started/
https://cloud.google.com/dataflow/docs/concepts/beam-programming-model?hl=ja
四、実装
1、DataflowのflexTemplatesを作成
①pipelineOptions&構成
import logging
import argparse
import apache_beam as beam
from apache_beam import DoFn
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery, BigQueryDisposition
#gcp config in local
job_name = ""
project = ""
class BeamOptions:
def __init__(self,runner,pipeline_args):
self.options = PipelineOptions(pipeline_args)
# Setup Option
self.options.view_as(SetupOptions).save_main_session = True
# Standard Option
self.options.view_as(StandardOptions).runner = runner
# run pipeline
def run(runner="DirectRunner",
pipeline_args=None,
bucket_name = None,
file_name = None,
table_spec=None,
table_schema=None,):
beamoptions = BeamOptions(runner,pipeline_args)
options = beamoptions.options
# パイプラインの生成
with beam.Pipeline(options=options) as pipeline:
unzip_data = (
pipeline
| "read zipfile from gcs" >> beam.io.fileio.MatchFiles(f'gs://{bucket_name}/{file_name}')
| "match zipfile" >> beam.io.fileio.ReadMatches()
| "unzip csv" >> beam.Map(unzip)
)
trans_data = (
unzip_data
| "transform dict" >> beam.ParDo(convert_dict())
)
# BigQueryへデータ登録
trans_data | "Write to BigQuery" >> WriteToBigQuery(
table_spec,
schema=table_schema,
create_disposition=BigQueryDisposition.CREATE_NEVER,
write_disposition=BigQueryDisposition.WRITE_APPEND,
)
#run
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
#Arguments from the command line.
parser = argparse.ArgumentParser()
parser.add_argument('--bucket_name',
dest='bucket_name',
help='Input Bucket for the pipeline',
default='')
parser.add_argument('--file_name',
dest='file_name',
help='Input zipfile(s) for the pipeline',
default='')
parser.add_argument('--table_spec',
dest='table_spec',
help='output table_spec for the pipeline',
default='')
parser.add_argument('--schema_file',
dest='schema_file',
help='schema_file for the pipeline',
default='')
# Parse arguments.
known_args, pipeline_args = parser.parse_known_args()
bucket_name = known_args.bucket_name
file_name = known_args.file_name
table_spec = known_args.table_spec
schema_file = get_schema_from_gcs(project_id=project,
bucket_name=known_args.bucket_name,
gcs_file_name=known_args.schema_file)
#ParDo Functions
convert_dict()
run(
# runnerを指定する
# "DirectRunner" ->ローカル実行, "DataflowRunner" ->Dataflow実行
runner = "DataflowRunner",
pipeline_args = pipeline_args,
bucket_name=bucket_name,
file_name=file_name,
table_spec = table_spec,
table_schema = schema_file
)
②unzip関数
#unzip function
def unzip(zips):
import zipfile
with zipfile.ZipFile(zips.open()) as zf:
with zf.open(zf.namelist()[0]) as f:
elements = f.readlines()[1:]
return elements
③GCSのスキーマファイル取得
def get_schema_from_gcs(project_id, bucket_name, gcs_file_name):
from google.cloud import storage
import json
client = storage.Client(project=project_id)
bucket = client.bucket(bucket_name)
blob = bucket.blob(gcs_file_name)
text = blob.download_as_string().decode()
json_dict = json.loads(text)
return json_dict
④dofn関数:CSV転換
#convert to dict
class convert_dict(DoFn):
def process(self, elements):
from datetime import datetime
import re
for element in elements:
element = element.decode('utf-8').split("\n")[0]
element_f = re.split('","|",',element)
element_s = re.split(',',element_f[-1])
element = element_f[0:-1] + element_s
element = [i.replace('.00','') for i in element]
dic = {
"id": element[0],
"number": element[1],
"pages": element[2],
# ...
}
yield dic
⑤Dockerfile&②reuquirements.txt
Dockerfile
FROM gcr.io/dataflow-templates-base/python39-template-launcher-base
ARG WORKDIR=/dataflow/template
RUN mkdir -p ${WORKDIR}
WORKDIR ${WORKDIR}
COPY requirements.txt .
COPY run.py .
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/run.py"
RUN apt-get update && apt-get install -y
RUN pip install --upgrade pip
RUN pip install apache-beam[gcp]==2.44.0
RUN pip install -U -r ./requirements.txt
ENTRYPOINT ["/opt/google/dataflow/python_template_launcher"]
requirements.txt
google-cloud-storage
...(その他必要なライブラリ)
⑥metadata.json
metadata.jsonでは、パイプラインの外部変数コントロールするためのファイルです。
{
"name": "your name",
"description": "your desription",
"parameters": [
{
"name": "bucket_name",
"label": "Input bucket in gcs",
"helpText": "Input Bucket for the pipeline.",
"isOptional": true
},
{
"name": "file_name",
"label": "Input file from gcs",
"helpText": "Input zipfile(s) for the pipeline",
"isOptional": true
},
{
"name": "table_spec",
"label": "Bigquery table_spec",
"helpText": "output table_spec for the pipeline",
"isOptional": true
},
{
"name": "schema_file",
"label": "schema_file for aa in bigquery",
"helpText": "schema_file for the pipeline",
"isOptional": true
}
]
}
2、CloudFuntionsを作成
①main.py
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
from datetime import datetime
import os
import logging
def dataflow_job_start(data, context):
# read from gcs
PROJECTID = os.environ.get("your project_id")
file_name = data['name']
logging.getLogger().setLevel(logging.INFO)
logging.info(f"{file_name} is going to deal...")
jobName = "your name"
template = os.environ.get("your params-name")
parameters = {
'bucket_name': os.environ.get("bucket_name"),
'file_name': file_name,
'table_spec': os.environ.get("your table_spec"),
'schema_file': os.environ.get("your schema_file"),
}
credentials = GoogleCredentials.get_application_default()
service = build("dataflow","v1b3",credentials=credentials, cache_discovery=False)
request = service.projects().locations().flexTemplates().launch(
projectId=PROJECTID,
location=os.environ.get("your region"),
body={
"launch_parameter": {
"jobName": jobName,
"parameters": parameters,
"containerSpecGcsPath": template,
"environment": {
"tempLocation": os.environ.get("your tempLocation"),
"subnetwork": os.environ.get("subnetwork")
"serviceAccountEmail": os.environ.get("serviceAccountEmail")
},
}
}
)
return request.execute()
②reuquirements.txt
google-api-python-client==1.7.8
oauth2client==4.1.3
最後
CLoudBuildとgithubの連携もやりたいですが(毎回Dockerfileをpushするのは面倒...)、それは別投稿させていただきます。
unzipで読み込んだデータでは全てメモリ中で処理するなので、分散処理されていないことがわかっていました。
またBigqueryへの書き込みがかなり制限限られているため、結局時間がかかりました。
改善策を出たらまた追記します。