過去のログをEmbulk(エンバルク)を使ってバルクロードしよう

More than 1 year has passed since last update.

はじめに

ここ一ヶ月ほど触ってみてたEmbulkの利用方法の一例を示します。

もっと良い方法があるかもしれません。(是非教えてください)

古橋さんをはじめとしたトレジャーデータの皆様便利なツールを作っていただきありがとうございます。

Twitterで様々な質問に答えてくださった古橋さんに感謝いたします。

2016年6月29日追記: 参照してくださっている方が多いので最新のembulkの表記に修正しました。間違いがありましたらお気軽にお問い合わせください。

動機

  • いろいろなログが、メールやらファイルやらで蓄積されている。
  • RDBやElastic Search/Kibanaに簡単に突っ込んで可視化をしたい。
  • Embulk良さそう。
  • ログをCSVやTSVにするスクリプトは自前・ネットのものがある。

想定読者

  • 圧縮された過去のログファイルが沢山ある(今回はPostfixのログ)
  • ログを整形して加工するスクリプトが既にある。(あるいはネット上に転がっている)
  • Embulkを使ってRDBMS、Elastic Search、クラウドサービスにデータを登録したい。

流用できそうなケース

  • MySQLのslowクエリログの取り込み(EmbulkTwitterで活用したいという話があった)

ファイル構成

次のようなディレクトリ構成です。

.
|-- EMBULK_FILES  # Embulkで読み込むデータを保存するディレクトリ
|   |-- maillog-20150101.csv # EmBulkで読み込むデータ1
|   `-- maillog-20150102.csv # EmBulkで読み込むデータ2
|   
|-- LOG_FILES
|   |-- maillog-20150101.gz # Postfixのログファイル1
|   `-- maillog-20150102.gz # Postfixのログファイル2
|
|-- bin
|   `-- convert_embulk.sh # Embulkで読み込むデータに変換するスクリプト
`-- conf
    |-- embulk_guess.yml # Embulkでguessを実行するための最低限のファイル
    `-- embulk_load.yml  # Embulkでguessを実行して生成するファイル

Embulkへデータを投入するまでの流れ

  • ログファイルをcsvへ変換
  • Embulkの導入します。
  • Embulkプラグインの導入
  • Embulk guessコマンドによる設定ファイル生成
  • Embulkのpreviewコマンドで事前確認
  • Embulkのrunコマンドで実際にデータを投入
  • 次回のデータ投入
  • last_pathの補足説明

ログファイルをcsvへ変換

Postfixのログファイルを、Embulkで読込みできるCSVファイルに変換します。

何の変哲もないスクリプトです。gzipコマンドでPostfixのログファイルを伸張し、maillog-hashnize.plコマンドでCSV形式に変換します。

convert_embulk.sh
#!/bin/bash

LOG_FILES=$( cd $( dirname $0 )/../LOG_FILES ; pwd )
EMBULK_FILES=$( cd $( dirname $0 )/../EMBULK_FILES  ; pwd )

for i in ${LOG_FILES}/*.gz ; do 
  file=$( basename $( basename $i ) .gz )
  #echo ${EMBULK_FILES}/${file}.csv
  gzip -dc $i | maillog-hashnize.pl -y 2015 > ${EMBULK_FILES}/${file}.csv 2> /dev/null
done

変換スクリプトの実行例

./convert_embulk.sh 
maillog-20150101.gz --> maillog-20150101.csv
maillog-20150102.gz --> maillog-20150102.csv
maillog-20150103.gz --> maillog-20150103.csv
maillog-20150104.gz --> maillog-20150104.csv
maillog-20150105.gz --> maillog-20150105.csv
maillog-20150106.gz --> maillog-20150106.csv
maillog-20150107.gz --> maillog-20150107.csv
maillog-20150108.gz --> maillog-20150108.csv
maillog-20150109.gz --> maillog-20150109.csv
maillog-20150110.gz --> maillog-20150110.csv
maillog-20150111.gz --> maillog-20150111.csv
maillog-20150112.gz --> maillog-20150112.csv
maillog-20150113.gz --> maillog-20150113.csv
maillog-20150114.gz --> maillog-20150114.csv
maillog-20150115.gz --> maillog-20150115.csv
maillog-20150116.gz --> maillog-20150116.csv
maillog-20150117.gz --> maillog-20150117.csv
maillog-20150118.gz --> maillog-20150118.csv

コマンド実行後、EMBULK_FILESにたくさんのログファイルが生成されます。

Embulkの導入

Treasure Dataの新データ転送ツールEmbulkを触ってみた #dtm_meetup を読んでください。

Embulkプラグインの導入

読み込んだファイルは、PostgreSQLに格納します。Embulkの組み込みプラグインには、PostgreSQL用のプラグインが含まれていないので、別途プラグインを導入します。

現在Outputプラグインにはどのようなプラグインがあるのか確認をします。

List of Plugins by Categoryをみて確認することもできます。

embulk gem search embulk-output
2015-02-28 19:09:21,290 +0900: Embulk v0.4.10

*** REMOTE GEMS ***

embulk-output-command (0.1.2)
embulk-output-elasticsearch (0.1.3)
embulk-output-jdbc (0.2.0)
embulk-output-mysql (0.2.0)
embulk-output-parquet (0.1.0)
embulk-output-postgres-json (0.2.0)
embulk-output-postgresql (0.2.0)
embulk-output-redshift (0.2.0)

PostgreSQLプラグインがあることがわかります。

embulk gem install embulk-output-postgresql
2015-02-28 19:11:20,831 +0900: Embulk v0.4.10
Successfully installed embulk-output-postgresql-0.2.0
1 gem installed

プラグインが導入できたかgem listコマンドで確認をしましょう。

postgresqlプラグインが導入できていることが確認できました。

embulk gem list 
2015-02-28 19:15:08,663 +0900: Embulk v0.4.10

*** LOCAL GEMS ***

embulk-input-apache-dummy-log (0.1.0)
embulk-input-command (0.1.0)
embulk-output-postgresql (0.2.0, 0.1.2, 0.1.1)
embulk-plugin-input-pcapng-files (0.0.2)

※ なおプラグインは標準ではホームディレクトリ上の.embulkディレクトリの下にインストールされます。

プラグインの使い方は、現状プラグインのREADMEを読むのが唯一の方法です。わからないことは著者の人にTwitter等で質問をしてください。

Embulk guessコマンドによる設定ファイル生成

次のようなYAML形式のファイルを用意します。inの中に書くのは、読み込みしたいファイルの保存されているパスのみです。

embulk_guess.yml
in:
  type: file
  path_prefix: /pat/to/mlog/EMBULK_FILES/maillog

#out:
#  type: stdout  
out:
  type: postgresql
  host: localhost
  user: user
  password: '****'
  database: pg_test
  table: pg_log
#  mode: insert
  mode: replace

データの形式を推定するguessコマンド

embulk guessコマンドは入力ファイルの中身をちょっと読み込んで設定ファイルを出力する機能です。

embulk guess conf/embulk_guess.yml -o conf/embulk_load.yml

Embulkはカラムの情報を読み込んでデータの内容を推定します。とても便利な機能です。1から手で設定ファイルを書いていたのを考えるとなぜ、いままでguessがなかったのだろうと思います。特にカラム数が多い場合など!!

しかし残念ながら完全ではありません。たまに誤検出をすることもあります。

また検出されたカラム情報のname欄、RDBMSなどに導入する際のカラム名として使われます。通常カラム名にはダブルクォートは使えませんが、guessコマンドはだぶるクォートで文字列が囲まれていると、設定ファイルの名称にも使ってしまうようです。また、カラムが「queue id」とスペースが入っている場合もそのまま出力します。

今回はそんなだめなケースの一例です。ここは、手で修正します。

embulk_load.yml
in:
  type: file
  path_prefix: /path/to/mlog/EMBULK_FILES/maillog
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: ''
    header_line: true
    columns:
    - {name: '"queue id"', type: string}
    - {name: '"arrived time"', type: string}
    - {name: '"processed time"', type: string}
    - {name: '"smtp client hostname / uid"', type: string}
    - {name: '"smtp client IP address / username"', type: string}
    - {name: '"envelope from"', type: string}
    - {name: '"envelope to"', type: string}
    - {name: '"message-id"', type: string}
    - {name: '"status"', type: string}
    - {name: '"relay to"', type: string}
    - {name: '"delay time"', type: string}
    - {name: '"size"', type: string}
    - {name: '"information (reason of defered', type: string} # 本当は
    - {name: ' local mailbox name', type: string}             # このカラムは
    - {name: ' successful message...)"', type: string}        # 一つのカラムです。
exec: {}
out: {type: postgresql, host: localhost, user: user, password: '***',
  database: pg_test, table: pg_log, mode: replace}

修正後の設定ファイルは次のとおりです。

  • ダブルクォートを除去
  • カラムの名前に空白を含まないように修正(PostgreSQLのカラム名に使われるのでスペースは消すこと)
  • 誤検出してしまった二つのカラムを削除
embulk_load.yml
in:
  type: file
  path_prefix: /path/to/mlog/EMBULK_FILES/maillog
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: ''
    header_line: true
    columns:
    - {name: 'queue_id',          type: string}
    - {name: 'arrived_time',      type: timestamp, format: "%Y/%m/%d %H:%M:%S" }
    - {name: 'processed_time',    type: timestamp, format: "%Y/%m/%d %H:%M:%S" }
    - {name: 'smtp_cli_host_uid', type: string}
    - {name: 'smtp_cli_user',     type: string}
    - {name: 'envelope_from',     type: string}
    - {name: 'envelope_to',       type: string}
    - {name: 'message_id',        type: string}
    - {name: 'status',            type: string}
    - {name: 'relay_to',          type: string}
    - {name: 'delay_time',        type: string}
    - {name: 'size',              type: string}
    - {name: 'information',       type: string}
exec: {}
out: {type: postgresql, host: localhost, user: user, password: '***',
  database: pg_test, table: pg_log, mode: replace}

Embulkのpreviewコマンドで事前確認

Embulkには、データがちゃんと読み込みできるか確認をするpreviewがあります。実際にはデータベースにデータを投入しないで読み込みが正しくできるか確認をする機能です。

初期のバージョンでは、previewは表形式のみでの出力でした。Embulk 0.4.7からpreviewで縦表示ができるようになりました。

この作業は、@yaggytterさんが実施されました。ありがとうございます。(※私もちょっとだけコードを書きました)

※ また現在のEmbulkはFilterプラグインの実行結果までを確認できます。

embulk preview -G conf/embulk_load.yml
...
*************************** 1 ***************************
         queue_id (   string) : 379ED2300012
     arrived_time (timestamp) : 2014-12-31 04:02:56 UTC
   processed_time (timestamp) : 2014-12-31 04:12:09 UTC
smtp_cli_host_uid (   string) : unknown
    smtp_cli_user (   string) : xxx.xxx.xxx.xx
    envelope_from (   string) : user@example.com
      envelope_to (   string) : foo@example.com
       message_id (   string) : <1419966125.685001.1878@example.com>
           status (   string) : sent,sent,sent
         relay_to (   string) : local,local,local
       delay_time (   string) : 536
             size (   string) : 2,261
      information (   string) : delivered to command: /usr/bin/procmail
*************************** 2 ***************************

うまく読み込みができました。大丈夫そうです。

Embulkのrunコマンドで実際にデータを投入

実際にデータを投入しましょう。しかしログファイルは一回投入すれば終わりというものではありません。

日々ログファイルは増えていきます。そんな増えていくログファイルをEmbulkは簡単に取り扱いできます。

Embulkはデータを投入したあとに、次回データをロードする時の為に設定ファイルを出力する機能があります。

次回実行用の設定ファイルを生成することで、まだ読み込みをしていないログファイルだけをロードさせることができます。

次の例では、-oオプションを使って次回実行用の設定ファイルを同じファイル名で生成します。 (-oオプションは非推奨になりました。代わりに-cを使ってください。参考)

次の例では、-cオプションを使って次回実行用の設定ファイルを同じファイル名で生成します。

つまり、次回も実行の際も全く同じコマンドを再度実行するだけです。

前置きが長くなりした。次のコマンドでPostgreSQLにデータを追加します。

embulk run conf/embulk_load.yml -c conf/diff.yml 

参考: Embulk 0.8.3より前の場合は次のようにします。

embulk run conf/embulk_load.yml -o conf/embulk_load.yml

テーブルがない場合は、新しくテーブルが自動的に作成されます。
その後読み込んだデータが作成されたテーブルに挿入されます。

以上でデータベースにデータ投入ができました。

次回データ投入

実行結果の設定ファイルを確認すると、次の設定が追加されていることがわかります。

-cオプションをつけてコマンドを実行した場合

conf/diff.yml
  last_path: /path/to/mlog/EMBULK_FILES/maillog-20150219.csv

これは今回のデータ投入ではログファイルを、maillog-20150219.csvまで読み込みしたことを示しています。
次回は、maillog-20150219.csvより後のファイルだけを読み込んでくれます。

試しにやってみましょう。

EMBULK_FILESディレクトリにmaillog-201502XXファイルを追加します。前に読み込んだファイルは同じディレクトリにあるままです。

ls -1
maillog-20150101.csv
maillog-20150102.csv
maillog-20150103.csv
maillog-20150104.csv
….
maillog-20150219.csv # last_pathに設定されているファイル名
maillog-20150220.csv
maillog-20150221.csv
maillog-20150222.csv
maillog-20150223.csv
maillog-20150224.csv
maillog-20150225.csv
maillog-20150226.csv
maillog-20150227.csv

再度コマンドを実行します。コマンドのLoading filesのところで、last_pathに設定されているのファイル以降がデータとして読み取りされたことがわかります。

embulk run conf/embulk_load.yml -c conf/diff.yml  
2015-02-28 19:39:06,890 +0900: Embulk v0.8.3
2015-02-28 19:39:08.802 +0900 [INFO] (transaction): Listing local files at directory '/path/to/mlog/EMBULK_FILES' filtering filename by prefix 'maillog'
2015-02-28 19:39:08.812 +0900 [INFO] (transaction): Loading files [/path/to/mlog/EMBULK_FILES/maillog-20150220.csv, /path/to/mlog/EMBULK_FILES/maillog-20150221.csv, /path/to/mlog/EMBULK_FILES/maillog-20150222.csv, /path/to/mlog/EMBULK_FILES/maillog-20150223.csv, /path/to/mlog/EMBULK_FILES/maillog-20150224.csv, /path/to/mlog/EMBULK_FILES/maillog-20150225.csv, /path/to/mlog/EMBULK_FILES/maillog-20150226.csv, /path/to/mlog/EMBULK_FILES/maillog-20150227.csv]

再度設定ファイルを確認します。last_pathが更新されています。

  last_path: /path/to/mlog/mlog/EMBULK_FILES/maillog-20150227.csv

last_pathの補足説明

*last_pathはlsコマンドで表示したファイルの順序でデータを読み込みます。新しいファイルはlsの表示順序last_pathで表示されたより後に表示したファイルを読み込みします。

ファイル名を設定する際には注意をしてください。

この作業のここが嫌だ。

一言で言うなら、「どうせならEmbulkを実行したらgzipを伸張して変換するところからやってほしい」です。

それに対してEmbulkは次の解決策が用意されています。(がもうちょっと簡単に既存のデータを取り込みしたいなと思います)

  • パーサープラグインを(Ruby/Java)で書く
  • 一部制限付きだがコマンドプラグインを使う。

上記の方法でもできるのはわかっているのですが、プラグインを書かなくてももっと簡単にデータを取り込みしたいなというのが願望です。

パーサープラグインを書くことについて

  • ログ変換スクリプトで変換した結果はタブ区切りや、CSV形式でCSVパーサがそのまま使える。
  • この業界の中にはシェルスクリプトはかけるけど、RubyやJavaはかけないという人は一定数存在する。

input-commandコマンドプラグインを使う

  • guess, previewが使えない。ファイルがコミットしたとが現状わからない。

最後に

  • 自分のメモとこれからEmbulkを使われる人のための情報共有を目的で書きました。
  • 皆様のお役に立てば幸いです。
  • 質問疑問があればお気軽にご連絡ください。
Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account log in.