※入門的な内容です。
環境
- Windows 7 SP1 Pro x64
- JDK 8u60 x64
- Embulk 0.7.2
- コマンドプロンプト
- Atom
Embulk のインストール
1: Windows PowerShell 4.0 (Windows Management Framework 4.0, Windows6.1-KB2819745-x64-MultiPkg.msu) をインストールします (要再起動)
2: コマンドプロンプトを開きます
3: Embulk を置くフォルダを作成します (ここでは C:\work\embulk
としました)
mkdir C:\work\embulk
cd C:\work\embulk
4: Embulk をダウンロードします
PowerShell -Command "& {Invoke-WebRequest http://dl.embulk.org/embulk-latest.jar -OutFile embulk.bat}"
以上で Embulk が C:\work\embulk\embulk.bat
に保存されます。必要に応じてパスを通してください。本記事ではパスは通さない場合を想定して記載しています。
bundle でテスト環境を作る
通常 embulk gem install
でプラグインをインストールすると C:\User\Dev\.embulk
といったフォルダが作成されてこの中にインストールされます。プラグインを作成する場合は少し面倒なので bundle
コマンドを使ってテスト環境を作成します。
1: 任意のディレクトリに移動します (ここでは C:\work\embulk
としました)
cd C:\work\embulk
2: テスト環境を作成します (ここでは bundle_dir
としました)
embulk bundle new bundle_dir
これで C:\work\embulk\bundle_dir
に bundle 環境の雛型が作成されます。
3: Gemfile を編集します
作成されたフォルダの Gemfile
に使いたいプラグインを記述します。今回は以下の記述を追記しました。
gem 'pry'
gem 'embulk-output-elasticsearch'
gem 'embulk-filter-speedometer'
gem 'embulk-filter-eval'
gem 'embulk-decoder-commons-compress'
4: 環境を更新します
テスト環境フォルダに移動して embulke bundle
コマンドを実行します。今回は embulk.bat
にパスを通していないので以下のようになります。
cd C:\work\embulk\bundle_dir
..\embulk bundle
テスト環境で Embulk を使う
テスト環境を使って Embulk を動かすには各コマンドに -b [bundle ディレクトリ]
オプションを付与します。具体例を以下に挙げます。
具体例
例えば以下のようなログファイルが C:\work\embulk_test_logs\ovpalog_HOSTNAME_cpu_20150801
にあるとします。
Logfile:/var/log/ovpalog HOSTNAME
, ,Intrpt,System, User , ,
Date , Time ,CPU % ,CPU % ,CPU % ,CPU % ,
08/01/2015,00:00:00, 0.08, 0.54, 0.33, 0.87,
08/01/2015,00:05:00, 0.08, 0.58, 0.42, 1.00,
08/01/2015,00:10:00, 0.10, 0.63, 4.81, 5.44,
08/01/2015,00:15:00, 0.29, 1.12, 44.12, 45.24,
これをパースする設定ファイルを C:\work\embulk\config.yml
に作成します。内容は以下の通りです。
in:
type: file
path_prefix: C:\work\embulk_test_logs\ovpalog_HOSTNAME_cpu_
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
skip_header_lines: 3
columns:
- {name: 'Date', type: timestamp, format: '%m/%d/%Y'}
- {name: 'Time', type: timestamp, format: '%H:%M:%S'}
- {name: 'Intrpt CPU %', type: double}
- {name: 'System CPU %', type: double}
- {name: 'User CPU %', type: double}
- {name: 'CPU %', type: double}
- {name: 'Empty Column', type: string}
quote: '"'
escape: '"'
trim_if_not_quoted: true
allow_extra_columns: false
allow_optional_columns: false
default_timezone: 'Asia/Tokyo'
filters:
- type: eval
eval_columns:
- 'Date': value + record['Time'].to_i + Time.now.utc_offset
out:
type: stdout
少し解説をすると embulk-filter-eval を使って Date 列に Time 列の時刻を加算しています。なお Time を数値にして加算するときに時差を考慮しています。
これを使ってパース結果をプレビューするには以下のコマンドを打ちます。
cd C:\work\embulk
embulk preview config.yml -b bundle_dir
parser プラグインを作成する
具体例にあるログの Time 列や末尾にある Empty Column 列は不要なので消したいと思います1。また、ヘッダにある HOSTNAME
部分を各レコードに付与することを考えます。
実現方法としてはプラグインを組み合わせて処理する、というのが正攻法のだと思うのですがここでは parser プラグインを自作して対応します。
embulk-parser-ovpacpulog
本来プラグインを作成するには Creating plugins のやり方に沿ってやるべきですが、ここではより手軽にプラグインを作って試せる方法として bundle_dir
内にプラグインファイルを作成する方法を採りました。
そこで bundle_dir\embulk\parser\ovpacpulog.rb
を以下のように作成します。解説はコメントで入れています。
# coding: utf-8
require 'pry'
module Embulk
module Parser
# これを雛形にパーサを作るときはクラス名の変更を忘れないこと
class OvpaCpuLogParser < ParserPlugin
# プラグイン登録
# - 登録した名前で設定ファイルから呼び出す
Plugin.register_parser("ovpacpulog", self)
# トランザクション制御処理
# - 設定ファイルの情報は config 変数に格納されている
# - yield で &control ブロック (内容は ParserPlugin 参照) に処理を渡す
def self.transaction(config, &control)
# パース処理に渡す情報を格納するハッシュ
task = {
# ホスト名の設定 (必須)
"hostname" => config.param("hostname", :string),
# 先頭何行をスキップするかの設定 (指定なしの場合は 0)
"skip_header_lines" => config.param("skip_header_lines", :integer, default: 0),
# タイムゾーンの設定 (指定なしの場合は UTC)
"default_timezone" => config.param("default_timezone", :string, default: "UTC")
}
# データ構造の定義配列
# - 項番,カラム名,型を指定
columns = [
Column.new(0, "Hostname", :string),
Column.new(1, "Date", :timestamp),
Column.new(2, "Intrpt CPU %", :double),
Column.new(3, "System CPU %", :double),
Column.new(4, "User CPU %", :double),
Column.new(5, "CPU %", :double)
]
# &control ブロックの実施
# - ザックリ言えばこのクラスを new (init) してから run を実行する
yield(task, columns)
end
# 初期化処理
# - 親クラスの initialize から呼び出される
def init
@hostname = task["hostname"]
@skip_header_lines = task["skip_header_lines"]
@default_timezone = task["default_timezone"]
end
# 読み込んだ内容に対するパース処理
def run(file_input)
# タイムゾーンを一時的に切り替える
timezone = ENV["TZ"]
ENV["TZ"] = @default_timezone
# file_input の内容は LineDecoder から取得すること
# - テンプレートのように buffer から直接読み取ると行の途中で途切れることがあるため
decoder_task = task.load_config(Java::LineDecoder::DecoderTask)
decoder = Java::LineDecoder.new(file_input.instance_eval { @java_file_input }, decoder_task)
while decoder.nextFile
# ヘッダのスキップ
if @skip_header_lines > 0
@skip_header_lines.times { decoder.poll }
end
while line = decoder.poll
# ここにパース処理を書く
# 今回のログのフォーマット例
# 08/01/2015,00:00:00, 0.08, 0.54, 0.33, 0.87,
# 正規表現でパースする
res = /^(.*?)\/(.*?)\/(.*?),(.*?),(.*?),(.*?),(.*?),(.*?),$/.match(line)
# マッチした場合は nil 以外が返る
if !(res.nil?)
m = res.to_a
# 時刻部分を整形する
# TODO: パースエラー時の処理を入れる
t = Time.parse([[m[3],m[1],m[2]].join("/"),m[4]].join(" "))
# 定義に従ってデータを構築する
record = [
@hostname,
t,
m[5].to_f,
m[6].to_f,
m[7].to_f,
m[8].to_f
]
page_builder.add(record)
# マッチしなかった場合はエラーを表示する
else
Embulk.logger.error("Parsing failed at \'#{line}\'")
end
end
end
# 出力内容を確定する
page_builder.finish
# タイムゾーンを元に戻す
ENV["TZ"] = timezone
end
end
end
end
実行
さて、動かしてみます。
C:\wrok\embulk\config_ovpacpulog.yml
を以下のように作成します。
in:
type: file
path_prefix: C:\work\embulk_test_logs\ovpalog_HOSTNAME_cpu_
parser:
type: ovpacpulog
hostname: 'HOSTNAME'
skip_header_lines: 3
default_timezone: 'Asia/Tokyo'
out:
type: stdout
以下のコマンドで実行結果をプレビューします。
cd C:\work\embulk
embulk preview config_ovpacpulog.yml -b bundle_dir
デバッグ
pry を使って CUI 上でデバッグできます2。使い方は Web ページを参照すれば分かりますが binding.pry
と記述した場所でブレークしてコンソールに移ることができます。
テスト
割愛します。
他のプラグインを見たところ embulk-parser-query_string が Ruby 実装で TravisCI 連携までしているので参考になると思います。ここまでできればいいなぁ。
参考
- Embulk 0.7 documentation
- [Embulk]データを加工するプラグインの開発とデバッグ
- 今更聞けないpryの使い方と便利プラグイン集
- treasure-data/embulk-parser-query_string
以上