LoginSignup
0
0

Cloud Dataflowのクラシックテンプレートを作成する

Posted at

前提

  • classic template使うと、parameterを変えて使いまわせる
  • API呼び出しなどが可能
  • shiftjis_to_utf8 を使った

テンプレート

実装

import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.coders.coders import Coder


class ShiftJisCoder(Coder):
    def encode(self, value):
        return value.encode('shift_jis')

    def decode(self, value):
        return value.decode('shift_jis')

    def is_deterministic(self):
        return True


class CustomOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--input',
            help='Path of the file to read from')
        parser.add_value_provider_argument(
            '--output',
            help='Output file to write results to.')


custom_options = CustomOptions()
pipeline_options = PipelineOptions(options=custom_options)
p = beam.Pipeline(options=pipeline_options)
lines: ReadFromText = p | 'Read' >> ReadFromText(
    custom_options.input, coder=ShiftJisCoder())
lines | 'Write' >> WriteToText(custom_options.output)
p.run()

template化


python -m shiftjis_to_utf8 \
    --runner DataflowRunner \
    --project YOUR_PROJECT \
    --staging_location gs://BUCKET/staging \
    --temp_location gs://BUCKET/tmp \
    --template_location gs://BUCKET/templates/shiftjis_to_utf8 \
    --region asia-northeast1

templateを使ったジョブの実行

gcloud dataflow jobs run test_template_job \
        --gcs-location gs://BUCKET/templates/shiftjis_to_utf8 \
        --parameters input=gs://BUCKET/input/some_file,output=gs://BUCKET/output/sample_result

参照

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0