7
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Embulk の独自パーサを作成する事始め

Last updated at Posted at 2015-08-23

※入門的な内容です。

環境

  • Windows 7 SP1 Pro x64
  • JDK 8u60 x64
  • Embulk 0.7.2
  • コマンドプロンプト
  • Atom

Embulk のインストール

1: Windows PowerShell 4.0 (Windows Management Framework 4.0, Windows6.1-KB2819745-x64-MultiPkg.msu) をインストールします (要再起動)

2: コマンドプロンプトを開きます

3: Embulk を置くフォルダを作成します (ここでは C:\work\embulk としました)

mkdir C:\work\embulk
cd C:\work\embulk

4: Embulk をダウンロードします

PowerShell -Command "& {Invoke-WebRequest http://dl.embulk.org/embulk-latest.jar -OutFile embulk.bat}"

以上で Embulk が C:\work\embulk\embulk.bat に保存されます。必要に応じてパスを通してください。本記事ではパスは通さない場合を想定して記載しています。

bundle でテスト環境を作る

通常 embulk gem install でプラグインをインストールすると C:\User\Dev\.embulk といったフォルダが作成されてこの中にインストールされます。プラグインを作成する場合は少し面倒なので bundle コマンドを使ってテスト環境を作成します。

1: 任意のディレクトリに移動します (ここでは C:\work\embulk としました)

cd C:\work\embulk

2: テスト環境を作成します (ここでは bundle_dir としました)

embulk bundle new bundle_dir

これで C:\work\embulk\bundle_dir に bundle 環境の雛型が作成されます。

3: Gemfile を編集します
作成されたフォルダの Gemfile に使いたいプラグインを記述します。今回は以下の記述を追記しました。

Gemfile
gem 'pry'
gem 'embulk-output-elasticsearch'
gem 'embulk-filter-speedometer'
gem 'embulk-filter-eval'
gem 'embulk-decoder-commons-compress'

4: 環境を更新します
テスト環境フォルダに移動して embulke bundle コマンドを実行します。今回は embulk.bat にパスを通していないので以下のようになります。

cd C:\work\embulk\bundle_dir
..\embulk bundle

テスト環境で Embulk を使う

テスト環境を使って Embulk を動かすには各コマンドに -b [bundle ディレクトリ] オプションを付与します。具体例を以下に挙げます。

具体例

例えば以下のようなログファイルが C:\work\embulk_test_logs\ovpalog_HOSTNAME_cpu_20150801 にあるとします。

ovpalog_HOSTNAME_cpu_20150801
Logfile:/var/log/ovpalog HOSTNAME
          ,        ,Intrpt,System, User ,      ,
   Date   ,  Time  ,CPU % ,CPU % ,CPU % ,CPU % ,
08/01/2015,00:00:00,  0.08,  0.54,  0.33,  0.87,
08/01/2015,00:05:00,  0.08,  0.58,  0.42,  1.00,
08/01/2015,00:10:00,  0.10,  0.63,  4.81,  5.44,
08/01/2015,00:15:00,  0.29,  1.12, 44.12, 45.24,

これをパースする設定ファイルを C:\work\embulk\config.yml に作成します。内容は以下の通りです。

config.yml
in:
  type: file
  path_prefix: C:\work\embulk_test_logs\ovpalog_HOSTNAME_cpu_
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    skip_header_lines: 3
    columns:
    - {name: 'Date', type: timestamp, format: '%m/%d/%Y'}
    - {name: 'Time', type: timestamp, format: '%H:%M:%S'}
    - {name: 'Intrpt CPU %', type: double}
    - {name: 'System CPU %', type: double}
    - {name: 'User CPU %', type: double}
    - {name: 'CPU %', type: double}
    - {name: 'Empty Column', type: string}
    quote: '"'
    escape: '"'
    trim_if_not_quoted: true
    allow_extra_columns: false
    allow_optional_columns: false
    default_timezone: 'Asia/Tokyo'
filters:
  - type: eval
    eval_columns:
      - 'Date': value + record['Time'].to_i + Time.now.utc_offset
out:
  type: stdout

少し解説をすると embulk-filter-eval を使って Date 列に Time 列の時刻を加算しています。なお Time を数値にして加算するときに時差を考慮しています。

これを使ってパース結果をプレビューするには以下のコマンドを打ちます。

cd C:\work\embulk
embulk preview config.yml -b bundle_dir

このような出力が出れば成功です。
embulk_preview_result.png

parser プラグインを作成する

具体例にあるログの Time 列や末尾にある Empty Column 列は不要なので消したいと思います1。また、ヘッダにある HOSTNAME 部分を各レコードに付与することを考えます。
実現方法としてはプラグインを組み合わせて処理する、というのが正攻法のだと思うのですがここでは parser プラグインを自作して対応します。

embulk-parser-ovpacpulog

本来プラグインを作成するには Creating plugins のやり方に沿ってやるべきですが、ここではより手軽にプラグインを作って試せる方法として bundle_dir 内にプラグインファイルを作成する方法を採りました。

そこで bundle_dir\embulk\parser\ovpacpulog.rb を以下のように作成します。解説はコメントで入れています。

ovpacpulog.rb
# coding: utf-8
require 'pry'

module Embulk
  module Parser
    # これを雛形にパーサを作るときはクラス名の変更を忘れないこと
    class OvpaCpuLogParser < ParserPlugin

      # プラグイン登録
      #   - 登録した名前で設定ファイルから呼び出す
      Plugin.register_parser("ovpacpulog", self)

      # トランザクション制御処理
      #   - 設定ファイルの情報は config 変数に格納されている
      #   - yield で &control ブロック (内容は ParserPlugin 参照) に処理を渡す
      def self.transaction(config, &control)

        # パース処理に渡す情報を格納するハッシュ
        task = {
          # ホスト名の設定 (必須)
          "hostname" => config.param("hostname", :string),
          # 先頭何行をスキップするかの設定 (指定なしの場合は 0)
          "skip_header_lines" => config.param("skip_header_lines", :integer, default: 0),
          # タイムゾーンの設定 (指定なしの場合は UTC)
          "default_timezone" => config.param("default_timezone", :string, default: "UTC")
        }

        # データ構造の定義配列
        #   - 項番,カラム名,型を指定
        columns = [
          Column.new(0, "Hostname", :string),
          Column.new(1, "Date", :timestamp),
          Column.new(2, "Intrpt CPU %", :double),
          Column.new(3, "System CPU %", :double),
          Column.new(4, "User CPU %", :double),
          Column.new(5, "CPU %", :double)
        ]

        # &control ブロックの実施
        #  - ザックリ言えばこのクラスを new (init) してから run を実行する
        yield(task, columns)
      end

      # 初期化処理
      #   - 親クラスの initialize から呼び出される
      def init
        @hostname = task["hostname"]
        @skip_header_lines = task["skip_header_lines"]
        @default_timezone = task["default_timezone"]
      end

      # 読み込んだ内容に対するパース処理
      def run(file_input)
        # タイムゾーンを一時的に切り替える
        timezone = ENV["TZ"]
        ENV["TZ"] = @default_timezone

        # file_input の内容は LineDecoder から取得すること
        #   - テンプレートのように buffer から直接読み取ると行の途中で途切れることがあるため
        decoder_task = task.load_config(Java::LineDecoder::DecoderTask)
        decoder = Java::LineDecoder.new(file_input.instance_eval { @java_file_input }, decoder_task)

        while decoder.nextFile

          # ヘッダのスキップ
          if @skip_header_lines > 0
            @skip_header_lines.times { decoder.poll }
          end

          while line = decoder.poll
            # ここにパース処理を書く
            # 今回のログのフォーマット例
            # 08/01/2015,00:00:00,  0.08,  0.54,  0.33,  0.87,

            # 正規表現でパースする
            res = /^(.*?)\/(.*?)\/(.*?),(.*?),(.*?),(.*?),(.*?),(.*?),$/.match(line)

            # マッチした場合は nil 以外が返る
            if !(res.nil?)
              m = res.to_a

              # 時刻部分を整形する
              # TODO: パースエラー時の処理を入れる
              t = Time.parse([[m[3],m[1],m[2]].join("/"),m[4]].join(" "))

              # 定義に従ってデータを構築する
              record = [
                @hostname,
                t,
                m[5].to_f,
                m[6].to_f,
                m[7].to_f,
                m[8].to_f
              ]
              page_builder.add(record)
            # マッチしなかった場合はエラーを表示する
            else
              Embulk.logger.error("Parsing failed at \'#{line}\'")
            end
          end
        end

        # 出力内容を確定する
        page_builder.finish
        # タイムゾーンを元に戻す
        ENV["TZ"] = timezone
      end

    end
  end
end

実行

さて、動かしてみます。
C:\wrok\embulk\config_ovpacpulog.yml を以下のように作成します。

config_ovpacpulog.yml
in:
  type: file
  path_prefix: C:\work\embulk_test_logs\ovpalog_HOSTNAME_cpu_
  parser:
    type: ovpacpulog
    hostname: 'HOSTNAME'
    skip_header_lines: 3
    default_timezone: 'Asia/Tokyo'
out:
  type: stdout

以下のコマンドで実行結果をプレビューします。

cd C:\work\embulk
embulk preview config_ovpacpulog.yml -b bundle_dir

このような出力が出れば成功です。
ovpacpulog_preview_result.png

デバッグ

pry を使って CUI 上でデバッグできます2。使い方は Web ページを参照すれば分かりますが binding.pry と記述した場所でブレークしてコンソールに移ることができます。

テスト

割愛します。
他のプラグインを見たところ embulk-parser-query_string が Ruby 実装で TravisCI 連携までしているので参考になると思います。ここまでできればいいなぁ。

参考

  1. Embulk 0.7 documentation
  2. [Embulk]データを加工するプラグインの開発とデバッグ
  3. 今更聞けないpryの使い方と便利プラグイン集
  4. treasure-data/embulk-parser-query_string

以上

  1. embulk-filter-eval の out_columns オプションはうまく動作しないようです。

  2. JRuby だと pry-nav や pry-byebug が使えないようなので、少し不便ですが致し方ないです。

7
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?