#digdag + Embulkでファイル名を維持したままファイルを圧縮して転送
##はじめに
時刻ごとに吐かれる大量のログデータを再利用やエビデンスも兼ねて圧縮して転送したい
現在は自作のシェルを用いてバッチ処理で行なっているが、負荷が高く時間もかかるのでEmbulkを用いてそれらを軽減したいと思った
Embulkだけでは時刻の入ったファイル名を維持したまま転送することができないが、digdagと組み合わせることで出来る
##概要
digdagでファイル名を取得し、Embulkに変数として渡すことでファイル名を保持したまま圧縮、転送する
##ディレクトリ構造
├ compress.dig
├ compress_config.yml
├ scripts
└ _init_.py
##digdagの設定ファイル
digdagではPythonのAPIを用いて圧縮するファイル名を取得する
compress.dig
##転送元ディレクトリからファイル一覧を取得
+getCompressFiles:
py>: scripts.Compress.getCompressFiles
##ファイルを順番に圧縮して転送
+compress:
for_each>:
file: ${files}
_do:
sh>: embulk compress_config.yml
scripts/__init__.py
import digdag
import os
class Compress(object):
def getCompressFiles(self):
digdag.env.store({'files':os.listdir('転送元ディレクトリ')})
##Embulkの設定ファイル
Embulk側ではguessした後の設定ファイルを、digdagの変数を読み込むように編集する
compress_config.yml
exec:
min_output_tasks: 1 #ファイルを最終的に一つにする
in:
type: file
path_prefix: '転送元ディレクトリ'\${file}
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: "\t"
quote: '"'
escape: '"'
trim_if_not_quoted: false
skip_header_lines: 1
allow_extra_columns: false
allow_optional_columns: false
columns:
- {name: dt, type: string} #以降のカラムは省略
out:
type: file
path_prefix: '転送先ディレクトリ'\${file}
sequence_format: ""
file_ext: .gz #今回はgzipに圧縮するので、拡張子にgzをつける
formatter:
type: csv
delimiter: "\t"
newline: CRLF
newline_in_field: LF
charset: UTF-8
quote_policy: MINIMAL
quote: '"'
escape: "\\"
null_string: "\\N"
encoders:
- {type: gzip, level: 6}
##参考
digdag
http://docs.digdag.io/
embulk
http://www.embulk.org/docs/