search
LoginSignup
48

More than 5 years have passed since last update.

posted at

updated at

Cloud Dataflow 超入門

Cloud Dataflow 超入門

by hayatoy
1 / 9

これはGCPUG Beginners Tokyo #3 のハンズオン資料です。


このスライドはQiitaにあります


bit.ly/dataflow-ho


Google Cloud Dataflowとは

  • データ処理を簡単にやってくれる
  • ワーカーのワークロードバランスを勝手にとってくれる
  • GCEで動いてるので何でもできる
  • Datalab (Jupyter) 上からdeployできる!

簡単に言うとMapReduceのスーパーイケてる版


チュートリアル用リポジトリ

git clone https://github.com/hayatoy/dataflow-tutorial.git


事前準備

That's it!


以降はコード説明です


Apache Beamのインポート

import apache_beam as beam

Dataflowの基本設定

ジョブ名、プロジェクト名、一時ファイルの置き場を指定します。

options = beam.utils.pipeline_options.PipelineOptions()
gcloud_options = options.view_as(
    beam.utils.pipeline_options.GoogleCloudOptions)
gcloud_options.job_name = 'dataflow-tutorial1'
gcloud_options.project = 'PROJECTID'
gcloud_options.staging_location = 'gs://PROJECTID/staging'
gcloud_options.temp_location = 'gs://PROJECTID/temp'

Dataflowのスケール設定

Workerの最大数や、マシンタイプ等を設定します。

WorkerのDiskサイズはデフォルトで250GB(Batch)、420GB(Streaming)と大きいので、ここで必要サイズを指定する事をオススメします。

worker_options = options.view_as(beam.utils.pipeline_options.WorkerOptions)
worker_options.disk_size_gb = 20
worker_options.max_num_workers = 2
# worker_options.num_workers = 2
# worker_options.machine_type = 'n1-standard-8'
# worker_options.zone = 'asia-northeast1-a'

実行環境の切り替え

  • DirectRunner: ローカルマシンで実行します
  • DataflowRunner: Dataflow上で実行します
options.view_as(beam.utils.pipeline_options.StandardOptions).runner = 'DirectRunner'
# options.view_as(beam.utils.pipeline_options.StandardOptions).runner = 'DataflowRunner'

準備は完了、以下パイプラインの例












パイプラインその1

GCSからファイルを読み込み、GCSにその内容を書き込むだけ

+----------------+
|                |
| Read GCS File  |
|                |
+-------+--------+
        |
        v
+-------+--------+
|                |
| Write GCS File |
|                |
+----------------+
p1 = beam.Pipeline(options=options)

(p1 | 'read' >> beam.io.ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
    | 'write' >> beam.io.WriteToText('gs://PROJECTID/test.txt', num_shards=1)
 )

p1.run().wait_until_finish()

パイプラインその2

BigQueryからデータを読み込み、GCSにその内容を書き込むだけ

BigQueryのデータセットは以下

https://bigquery.cloud.google.com/table/bigquery-public-data:samples.shakespeare

+----------------+
|                |
| Read BigQuery  |
|                |
+-------+--------+
        |
        v
+-------+--------+
|                |
| Write GCS File |
|                |
+----------------+
p2 = beam.Pipeline(options=options)

query = 'SELECT * FROM [bigquery-public-data:samples.shakespeare] LIMIT 10'
(p2 | 'read' >> beam.io.Read(beam.io.BigQuerySource(project='PROJECTID', use_standard_sql=False, query=query))
    | 'write' >> beam.io.WriteToText('gs://PROJECTID/test2.txt', num_shards=1)
 )

p2.run().wait_until_finish()

パイプラインその3

BigQueryからデータを読み込み、BigQueryにデータを書き込む

+----------------+
|                |
| Read BigQuery  |
|                |
+-------+--------+
        |
        v
+-------+--------+
|                |
| Write BigQuery |
|                |
+----------------+
p3 = beam.Pipeline(options=options)

# 注意:データセットを作成しておく
query = 'SELECT * FROM [bigquery-public-data:samples.shakespeare] LIMIT 10'
(p3 | 'read' >> beam.io.Read(beam.io.BigQuerySource(project='PROJECTID', use_standard_sql=False, query=query))
    | 'write' >> beam.io.Write(beam.io.BigQuerySink(
        'testdataset.testtable1',
        schema='corpus_date:INTEGER, corpus:STRING, word:STRING, word_count:INTEGER',
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
 )

p3.run().wait_until_finish()

パイプラインその4

  • BigQueryからデータを読み込み
  • データを加工して
  • BigQueryに書き込む
+----------------+
|                |
| Read BigQuery  |
|                |
+-------+--------+
        |
        v
+-------+--------+
|                |
| Modify Element |
|                |
+----------------+
        |
        v
+-------+--------+
|                |
| Write BigQuery |
|                |
+----------------+
def modify_data1(element):
    # beam.Mapは1行の入力に対し1行の出力をする場合に使う
    # element = {u'corpus_date': 0, u'corpus': u'sonnets', u'word': u'LVII', u'word_count': 1}

    corpus_upper = element['corpus'].upper()
    word_len = len(element['word'])

    return {'corpus_upper': corpus_upper,
            'word_len': word_len
            }


p4 = beam.Pipeline(options=options)

query = 'SELECT * FROM [bigquery-public-data:samples.shakespeare] LIMIT 10'
(p4 | 'read' >> beam.io.Read(beam.io.BigQuerySource(project='PROJECTID', use_standard_sql=False, query=query))
    | 'modify' >> beam.Map(modify_data1)
    | 'write' >> beam.io.Write(beam.io.BigQuerySink(
        'testdataset.testtable2',
        schema='corpus_upper:STRING, word_len:INTEGER',
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
 )

p4.run().wait_until_finish()

パイプラインその5

ブランチを分ける例

+----------------+
|                |
| Read BigQuery  |
|                |
+-------+--------+
        |
        +---------------------+
        |                     |
+-------v--------+    +-------v--------+
|                |    |                |
| Modify Element |    | Modify Element |
|                |    |                |
+-------+--------+    +-------+--------+
        |                     |
        +---------------------+
        |
+-------v--------+
|                |
| Flatten        |
|                |
+-------+--------+
        |
        |
+-------v--------+
|                |
| Save BigQuery  |
|                |
+----------------+
def modify1(element):
    # element = {u'corpus_date': 0, u'corpus': u'sonnets', u'word': u'LVII', u'word_count': 1}
    word_count = len(element['corpus'])
    count_type = 'corpus only'

    return {'word_count': word_count,
            'count_type': count_type
            }


def modify2(element):
    # element = {u'corpus_date': 0, u'corpus': u'sonnets', u'word': u'LVII', u'word_count': 1}
    word_count = len(element['word'])
    count_type = 'word only'

    return {'word_count': word_count,
            'count_type': count_type
            }


p5 = beam.Pipeline(options=options)

query = 'SELECT * FROM [bigquery-public-data:samples.shakespeare] LIMIT 10'
query_results = p5 | 'read' >> beam.io.Read(beam.io.BigQuerySource(
    project='PROJECTID', use_standard_sql=False, query=query))

# BigQueryの結果を二つのブランチに渡す
branch1 = query_results | 'modify1' >> beam.Map(modify1)
branch2 = query_results | 'modify2' >> beam.Map(modify2)

# ブランチからの結果をFlattenでまとめる
((branch1, branch2) | beam.Flatten()
                    | 'write' >> beam.io.Write(beam.io.BigQuerySink(
                        'testdataset.testtable3',
                        schema='word_count:INTEGER, count_type:STRING',
                        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
 )

p5.run().wait_until_finish()

パイプラインその6

Groupbyを使う

def modify_data2(kvpair):
    # groupbyによりkeyとそのkeyを持つデータのリストのタプルが渡される
    # kvpair = (u'word only', [4, 4, 6, 6, 7, 7, 7, 7, 8, 9])

    return {'count_type': kvpair[0],
            'sum': sum(kvpair[1])
            }


p6 = beam.Pipeline(options=options)

query = 'SELECT * FROM [PROJECTID:testdataset.testtable3] LIMIT 20'
(p6 | 'read' >> beam.io.Read(beam.io.BigQuerySource(project='PROJECTID', use_standard_sql=False, query=query))
    | 'pair' >> beam.Map(lambda x: (x['count_type'], x['word_count']))
    | "groupby" >> beam.GroupByKey()
    | 'modify' >> beam.Map(modify_data2)
    | 'write' >> beam.io.Write(beam.io.BigQuerySink(
        'testdataset.testtable4',
        schema='count_type:STRING, sum:INTEGER',
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
 )

p6.run().wait_until_finish()

パイプラインその7

WindowGroupByの区間を区切る

def assign_timevalue(v):
    # pcollectionのデータにタイムスタンプを付加する
    # 後段のwindowはこのタイムスタンプを基準に分割される
    # ここでは適当に乱数でタイムスタンプを入れている
    import apache_beam.transforms.window as window
    import random
    import time
    return window.TimestampedValue(v, int(time.time()) + random.randint(0, 1))


def modify_data3(kvpair):
    # groupbyによりkeyとそのkeyを持つデータのリストのタプルが渡される
    # windowで分割されているのでデータ数が少なくなる
    # kvpair = (u'word only', [4, 4, 6, 6, 7])

    return {'count_type': kvpair[0],
            'sum': sum(kvpair[1])
            }


p7 = beam.Pipeline(options=options)

query = 'SELECT * FROM [PROJECTID:testdataset.testtable3] LIMIT 20'
(p7 | 'read' >> beam.io.Read(beam.io.BigQuerySource(project='PROJECTID', use_standard_sql=False, query=query))
    | "assign tv" >> beam.Map(assign_timevalue)
    | 'window' >> beam.WindowInto(beam.window.FixedWindows(1))
    | 'pair' >> beam.Map(lambda x: (x['count_type'], x['word_count']))
    | "groupby" >> beam.GroupByKey()
    | 'modify' >> beam.Map(modify_data3)
    | 'write' >> beam.io.Write(beam.io.BigQuerySink(
        'testdataset.testtable5',
        schema='count_type:STRING, sum:INTEGER',
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
 )

p7.run().wait_until_finish()

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
48