LoginSignup
14
8

More than 5 years have passed since last update.

CloudDataflowをPythonで触ったので、その時のメモ

Last updated at Posted at 2018-07-01

ちょっとGCPを触る機会があったので、そのメモです。
結構よくあるケースだから記事がなかったのか、
自分のスキルが雑魚なのか。
あまり日本語の記事もなかったので、きっといつか誰かの役に立つはず。
クエリとかは実際からはシンプルに修正してます。

やったこと

  1. GCSからデータを取得してちょっと加工する
  2. 加工したデータをBigQueryにアップロードする
  3. GCSのデータをアーカイブする
  4. BigQueryからSQLでデータを取得
  5. BigQueryの別のテーブルにロードする

実行環境

PC:MacBook Pro
OS:Mac OS High Sierra
言語:Python2.7
フレームワーク:Apache Beam SDK

環境セットアップ

セットアップとか、サンプルの実行は
こちらを参考にしました。

メイン処理と引数

parserで引数を設定します。
PipelineOptionsを生成して、

main

    argv = sys.argv
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                        dest='input',
                        default=default_input,
                        help='Input file to process.')

    known_args, pipeline_args = parser.parse_known_args(argv)

    # PipelineOptionを設定
    options = PipelineOptions(pipeline_args)

    # googleで実行するためのオプションを追加で設定
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = projectid
    google_cloud_options.staging_location = staging_location
    google_cloud_options.temp_location = temp_location
    # GoogleDataflowで実行する場合はこのように指定します。
    options.view_as(StandardOptions).runner = 'DataflowRunner'

1.GCSからデータを取得してちょっと加工する→2.加工したデータをBigQueryにアップロードする

サンプルではCSVデータを加工していないのですが、
メソッドcsvParseを修正したら、加工できます。加工した値をreturnしてくださいませ。
今回は、変数rowsからBQのみに書き出していますが、
ここでGCSにも出力みたいなことも可能です。そうすると、dataflow上でフローチャートが分岐するイメージです。

pipeline1

def pipeline1(options=None, known_args=None):
    """
    GCSからCSVを読み込み、BQに投げ込むパイプライン

    :param options:
    :param known_args:
    :return:
    """

    # CSVデータのパース
    def csvParse(elem):
        import csv
        text = elem.encode('utf-8')
        reader = csv.reader(text.splitlines(), delimiter=',')
        row = next(reader)

        return {
            'id': int(row[0]),
            'name': row[1],
            'age': row[2]
        }

    # パイプラインを生成
    p = beam.Pipeline(options=options)

    # GCSから読み込んで、CSVをパースする
    # skip_header_linesで、ヘッダーをスキップ可能
    # 読み込んだCSVはrows変数に格納される
    rows = p | 'Read From GCS' >> beam.io.ReadFromText(known_args.input, skip_header_lines=1) \
           | 'parse data' >> beam.Map(csvParse)

    # rowsをBQに格納する
    # schemaで、bqのテーブルのスキーマを指定する。先に作ってある場合は、
    # bq show --format=prettyjson projectname:datasetname.tablename | jq '.schema.fields.name' > schema.txt
    # で確認する
    # create_dispositionはCREATE_NEVER、CREATE_IF_NEEDED
    # から選択。write_dispositionは、WRITE_TRUNCATE、WRITE_APPEND 、WRITE_EMPTY
    # から選択
    rows | 'Insert to BQ.master' >> beam.io.WriteToBigQuery(project=projectid, dataset=datasetname, table=tablename,
                                                            schema='id:INTEGER,name:STRING,age:INTEGER',
                                                            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
    ## 実行する
    p.run().wait_until_finish()

3.GCSのデータをアーカイブする

BQに取り込んだら別のバケットにアーカイブするのを想定しています。
「BQへ書き込んだあと」という表現はできないので、
パイプラインが完了したあとにアーカイブの処理を記述します。
ちなみにここに記述した処理は、GoogleCloudDataflowには表示されません。

archive_gcs2gcs

    def gcs_move(src, dest):
        gcsio.GcsIO().copy(src, dest)

    def gcs_delete(fileName):
        gcsio.GcsIO().delete(fileName)


    # csvファイル一覧を取得
    fileList = gcsio.GcsIO().glob(known_args.input)
    if len(fileList) == 0:
        exit()

    # GCSから読み込んだファイルのアーカイブ
    timestamp = datetime.now()
    for file in fileList:
        filename = file.encode('utf-8')
        bucket, name = gcsio.parse_gcs_path(filename)
        newFileName = 'gs://{}/{}/{}_{}'.format(
            archiveBucket,
            timestamp.strftime("%Y/%m/%d"),
            timestamp.strftime("%H%M%S"), name)

        gcs_move(filename, newFileName)
        gcs_delete(filename)

4.BigQueryからSQLでデータを取得→5.BigQueryの別のテーブルにロードする

BigQuerySourceを使って、クエリを実行します。
取得したデータは変数linesに入るので、
今度は、それをBigQuerySinkで別のテーブルにロードします。

pipeline2

def pipeline2(options=None, known_args=None):
    """
    BQにクエリを投げ、別のテーブルに出力するパイプライン

    :param options:
    :param known_args:
    :return:
    """
    query = """
SELECT
  id,
  name,
  age
from {}:{}.{}
""".format(projectid, datasetname, tablename)

    ## 新たなパイプラインを生成
    q = beam.Pipeline(options=options)

    # クエリを実行
    lines = q | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))

    # 別のテーブルに書き込む
    lines | 'WriteToBQ' >> beam.io.Write(
        beam.io.BigQuerySink(project=projectid, dataset=datasetname, table=tablename2,
                             schema='id:INTEGER,name:STRING,age:INTEGER',
                             create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                             write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
    # 実行する
    q.run().wait_until_finish()

全体

上記合わせて、こんな感じになりました。

dataflowsample.py

import argparse
import sys
from datetime import datetime
from apache_beam.io.gcp import gcsio
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions

projectid = 'myproject'
datasetname = 'mydataset'
tablename = 'mytable'
tablename2 = 'mytable2'
default_input = 'gs://gcs_input_bucket/*.csv'
archiveBucket = 'gs://gcs_archive-bucket'
staging_location = 'gs://gcs_dataflow_bucket/code/'
temp_location = 'gs://gcs_dataflow_bucket/tmp/'


def pipeline1(options=None, known_args=None):
    """
    GCSからCSVを読み込み、BQに投げ込むパイプライン

    :param options:
    :param known_args:
    :return:
    """

    # CSVデータのパース
    def csvParse(elem):
        import csv
        text = elem.encode('utf-8')
        reader = csv.reader(text.splitlines(), delimiter=',')
        row = next(reader)

        return {
            'id': int(row[0]),
            'name': row[1],
            'age': row[2]
        }

    # パイプラインを生成
    p = beam.Pipeline(options=options)

    # GCSから読み込んで、CSVをパースする
    # skip_header_linesで、ヘッダーをスキップ可能
    # 読み込んだCSVはrows変数に格納される
    rows = p | 'Read From GCS' >> beam.io.ReadFromText(known_args.input, skip_header_lines=1) \
           | 'parse data' >> beam.Map(csvParse)

    # rowsをBQに格納する
    # schemaで、bqのテーブルのスキーマを指定する。先に作ってある場合は、
    # bq show --format=prettyjson projectname:datasetname.tablename | jq '.schema.fields.name' > schema.txt
    # で確認する
    # create_dispositionはCREATE_NEVER、CREATE_IF_NEEDED
    # から選択。write_dispositionは、WRITE_TRUNCATE、WRITE_APPEND 、WRITE_EMPTY
    # から選択
    rows | 'Insert to BQ.master' >> beam.io.WriteToBigQuery(project=projectid, dataset=datasetname, table=tablename,
                                                            schema='id:INTEGER,name:STRING,age:INTEGER',
                                                            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
    ## 実行する
    p.run().wait_until_finish()


def pipeline2(options=None, known_args=None):
    """
    BQにクエリを投げ、別のテーブルに出力するパイプライン

    :param options:
    :param known_args:
    :return:
    """
    query = """
SELECT
  id,
  name,
  age
from {}:{}.{}
""".format(projectid, datasetname, tablename)

    ## 新たなパイプラインを生成
    q = beam.Pipeline(options=options)

    # クエリを実行
    lines = q | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))

    # 別のテーブルに書き込む
    lines | 'WriteToBQ' >> beam.io.Write(
        beam.io.BigQuerySink(project=projectid, dataset=datasetname, table=tablename2,
                             schema='id:INTEGER,name:STRING,age:INTEGER',
                             create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                             write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
    # 実行する
    q.run().wait_until_finish()


def gcs_move(src, dest):
    gcsio.GcsIO().copy(src, dest)


def gcs_delete(fileName):
    gcsio.GcsIO().delete(fileName)


if __name__ == '__main__':
    argv = sys.argv
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                        dest='input',
                        default=default_input,
                        help='Input file to process.')

    known_args, pipeline_args = parser.parse_known_args(argv)

    # PipelineOptionを設定
    options = PipelineOptions(pipeline_args)

    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = projectid
    google_cloud_options.staging_location = staging_location
    google_cloud_options.temp_location = temp_location
    options.view_as(StandardOptions).runner = 'DataflowRunner'

    # csvファイル一覧を取得
    fileList = gcsio.GcsIO().glob(known_args.input)
    if len(fileList) == 0:
        exit()

    # CSVデータの取り込み
    pipeline1(options, known_args)

    # GCSから読み込んだファイルのアーカイブ
    timestamp = datetime.now()
    for file in fileList:
        filename = file.encode('utf-8')
        bucket, name = gcsio.parse_gcs_path(filename)
        newFileName = '{}/{}/{}_{}'.format(
            archiveBucket,
            timestamp.strftime("%Y/%m/%d"),
            timestamp.strftime("%H%M%S"), name)

        gcs_move(filename, newFileName)
        gcs_delete(filename)

    # BQからBQ
    pipeline2(options, known_args)

実行してみます。
実行時にはGOOGLE_APPLICATION_CREDENTIALSをちゃんと指定しましょう。

実行結果

dataflowの確認

pipeline1の実行結果が、dataflowで表示されています。
image.png
pipeline2の実行結果も。
image.png
パイプラインを2つ生成しているので、jobが2本実行されています。
image.png

CloudStorageの確認

gcsからも、実行前に存在していたデータが
image.png
inputから消えていて。
image.png
指定しているバケットに移動されています。
image.png

BigQueryの確認

BQにもテーブルが2つ存在していて
image.png
table1にデータがロードされています。
image.png
table2も。
image.png

まとめ

上記のソースでDataflowでCloudStorageからのデータ読み込み,バケットの移動、BQへのロードができました。

ただ、今回のソースだと、厳密には
取得したGCSのオブジェクトのリスト=ApacheBeamで読み込んだファイルのリストではないので、
データ到着と、処理実行の時間が明確なら、まぁ問題ないかもしれませんが考慮が必要ですね。

14
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
8