前提
- classic template使うと、parameterを変えて使いまわせる
- API呼び出しなどが可能
- shiftjis_to_utf8 を使った
テンプレート
実装
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.coders.coders import Coder
class ShiftJisCoder(Coder):
def encode(self, value):
return value.encode('shift_jis')
def decode(self, value):
return value.decode('shift_jis')
def is_deterministic(self):
return True
class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
help='Path of the file to read from')
parser.add_value_provider_argument(
'--output',
help='Output file to write results to.')
custom_options = CustomOptions()
pipeline_options = PipelineOptions(options=custom_options)
p = beam.Pipeline(options=pipeline_options)
lines: ReadFromText = p | 'Read' >> ReadFromText(
custom_options.input, coder=ShiftJisCoder())
lines | 'Write' >> WriteToText(custom_options.output)
p.run()
template化
python -m shiftjis_to_utf8 \
--runner DataflowRunner \
--project YOUR_PROJECT \
--staging_location gs://BUCKET/staging \
--temp_location gs://BUCKET/tmp \
--template_location gs://BUCKET/templates/shiftjis_to_utf8 \
--region asia-northeast1
templateを使ったジョブの実行
gcloud dataflow jobs run test_template_job \
--gcs-location gs://BUCKET/templates/shiftjis_to_utf8 \
--parameters input=gs://BUCKET/input/some_file,output=gs://BUCKET/output/sample_result
参照