とあるfluent pluginのバグを踏んでしまって、フローの途中で処理されずにエラーになってしまっていた事象がありました。
しかし幸いにもout_fileでテキストファイルに退避していたためこれを再処理できるかなー?と思い以下のような書き捨てスクリプトを書いてみました。
# -*- coding: utf-8 -*-
import sys
import json
import pdb
from fluent import sender
from datetime import datetime
# vars
send_tag = 'okuritai.tagdesu'
label = ''
extz = '+00:00'
# logger init
logger = sender.FluentSender(send_tag, host='localhost', port=24224)
#logger = sender.FluentSender(send_tag, host='unix:///path/to/td-agent.sock')
def parse(line):
dt, tag, message = line.split("\t")
dt = datetime.strptime(dt, '%Y-%m-%dT%H:%M:%S'+extz)
message = json.loads(message)
return dt, tag, message
def send(dt, message):
epoch = int(dt.strftime('%s'))
if not logger.emit_with_time(label, epoch, message):
print(logger.last_error)
def main(fname):
with open(fname) as f:
for line in f:
dt, tag, message = parse(line)
send(dt, message)
# argv
if len(sys.argv) < 2:
print "usage: fluentd_reprocessing.py <filename>"
sys.exit(1)
fname = sys.argv[1]
# run main
main(fname)
<timestamp>TAB<tag>TAB<message>
というフォーマットでファイルに出力されるので、それをパースしてsendし直しているだけ。
問題なく再ロード完了し、事なきを得ました :)