前提
shift-jisをutfに変換したい. BigQueryに入れたい場合などに、shift-jis対応していないので、UTF8にしたい需要は結構あるはず。
さらに、ファイルが大きい場合、分散処理を行いたい。
なのでDataflowでサクッとできるようなものを作った
実装
- python3.9
- beam 2.37.0
converting_to_shiftjis.py
import argparse
import logging
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from apache_beam.coders.coders import Coder
class ShiftJisCoder(Coder):
def encode(self, value):
return value.encode('shift_jis')
def decode(self, value):
return value.decode('shift_jis')
def is_deterministic(self):
return True
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default='./shift-jis.txt',
help='Input file to process.')
parser.add_argument(
'--output',
dest='output',
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(
SetupOptions
).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as p:
lines: ReadFromText = p | 'Read' >> ReadFromText(
known_args.input, coder=ShiftJisCoder())
lines | 'Write' >> WriteToText(known_args.output)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
実行
python -m converting_to_shiftjis \
--region $DATAFLOW_REGION \
--input gs://dataflow-samples/input.txt \
--output gs://STORAGE_BUCKET/results/outputs \
--runner DataflowRunner \
--project $PROJECT_ID \
--temp_location gs://$STORAGE_BUCKET/tmp/
参照