LoginSignup
3
5

More than 3 years have passed since last update.

[GCP]Cloud ShellでDataFlowをデプロイするまでの手順(Python利用)

Last updated at Posted at 2020-01-14

はじめに

DataFlowのデプロイ方法がパッと見つからなかったため、また単純に動かせるプログラムが見当たらなかったため、備忘録としてまとめました。

手順

1. apache_beamインストール

Cloud Shellで以下のコマンドを実行。

sudo pip3 install apache_beam[gcp]

以下のインストール方法ではbeam.io.ReadFromTextでエラーがでるのでダメです。

sudo pip install apache_beam

仮想環境を用いたapache_beamインストール方法は以下の通りです。

# フォルダ作成
mkdir python2
cd python2

# 仮想環境作成
python -m virtualenv env

# アクティベート
source env/bin/activate

# apache-beamインストール
pip install apache-beam[gcp]

2. プログラム作成

今回は以下のような単純なものを作成しました。
指定したバケットの直下にあるread.txtというファイルを読み込んで、write.txtというファイルに出力するだけです。

実際に試したい方はPROJECTID, JOB_NAME, BUCKET_NAME に適当な内容を入力してください。

gcs_readwrite.py
# coding:utf-8
import apache_beam as beam

# ジョブ名、プロジェクトID、バケット名を指定
PROJECTID = '<PROJECTID>'
JOB_NAME = '<JOB_NAME>'  # DataFlowのジョブ名を入力
BUCKET_NAME = '<BUCKET_NAME>'

# ジョブ名、プロジェクトID、一時ファイルの置き場を設定
options = beam.options.pipeline_options.PipelineOptions()
gcloud_options = options.view_as(
    beam.options.pipeline_options.GoogleCloudOptions)
gcloud_options.job_name = JOB_NAME
gcloud_options.project = PROJECTID
gcloud_options.staging_location = 'gs://{}/staging'.format(BUCKET_NAME)
gcloud_options.temp_location = 'gs://{}/tmp'.format(BUCKET_NAME)

# Workerの最大数や、マシンタイプ等を指定
worker_options = options.view_as(beam.options.pipeline_options.WorkerOptions)
# worker_options.disk_size_gb = 100
# worker_options.max_num_workers = 2
# worker_options.num_workers = 2
# worker_options.machine_type = 'n1-standard-8'
# worker_options.zone = 'asia-northeast1-a'

# 実行環境の切替
# options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DirectRunner'  # ローカルマシンで実行
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'  # Dataflow上で実行

# パイプライン
p = beam.Pipeline(options=options)

(p | 'read' >> beam.io.ReadFromText('gs://{}/read.txt'.format(BUCKET_NAME))
    | 'write' >> beam.io.WriteToText('gs://{}/write.txt'.format(BUCKET_NAME))
 )
p.run().wait_until_finish()

3. GCSの準備

  1. 上記プログラム内のBUCKET_NAMEで指定したバケット名を作成してください
  2. 作成したバケット直下にstaging, tmp というフォルダを作成してください
  3. ローカルでread.txtというファイルを作成してください。中身はなんでも良いです
  4. 作成したバケット直下にread.txtをアップロードしてください

4. ローカルで実行

まず上記プログラムの「実行環境の切替」で、以下のようにコメントアウトを切り替えます。

options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DirectRunner'  # ローカルマシンで実行
# options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'  # Dataflow上で実行

次に以下のコマンドを実行します。

python gcs_readwrite.py

これでバケット内にwrite.txt-00000-of-00001というファイルが作成されます。

5. デプロイ

まず上記プログラムの「実行環境の切替」で、以下のようにコメントアウトを切り替えます。

# options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DirectRunner'  # ローカルマシンで実行
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'  # Dataflow上で実行

次に以下のコマンドを実行します。

python gcs_readwrite.py

これでバケット内にwrite.txt-00000-of-00001というファイルが作成されます。
DataFlowのGUIで作成したジョブを選択すると、readwriteが「完了しました」になっていることがわかります。

image.png

おまけ(カスタムテンプレートの作成方法)

以下のような1行を追加して実行するだけでカスタムテンプレートが作成されます。
保存先やテンプレート名は自由に選択できます。

gcloud_options.template_location = 'gs://{}/template/template_name'.format(BUCKET_NAME)

カスタムテンプレートの利用は、
カスタムテンプレートからジョブを作成 -> テンプレートを選択 -> カスタムテンプレート -> テンプレートの GCS パスを指定
とするだけです。

参考

Python を使用したクイックスタート
https://cloud.google.com/dataflow/docs/quickstarts/quickstart-python?hl=ja

パイプラインの実行パラメータを指定する
https://cloud.google.com/dataflow/docs/guides/specifying-exec-params

Cloud Dataflow 超入門
https://qiita.com/hayatoy/items/987658490a69c7d24635

3
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
5