LoginSignup
14
12

More than 5 years have passed since last update.

Embulkを使ってS3上のELBのログをTreasureDataにインポート

Last updated at Posted at 2015-06-27

先日、TreasureDataからDataConnectorの発表があり、S3からのデータの取り込みがクライアントレスで定期的に実行できるようになりました。
これにより、ユーザはインポート用サーバを持たずに、いろいろなデータソースからデータを取り込むことができるようになりました。
(新機能)「Data Connector for Amazon S3」によるデータロード革命
dataconnector

その一方で、自前のバッチ処理がある場合には、自分たちの環境下でコントロールしたい場合もあります。
そうしたケースにも対応するために、embulk-output-tdも合わせてリリースが行われました。

今回は、embulk-output-tdをつかって、S3からTreasureDataにデータを取り込むまでを実施します。

実行環境

  • Mac version 10.10.3
  • Embulk v0.6.13
  • embulk-output-td v0.1.0
  • embulk-input-s3 v0.2.0

インストール

参考: Embulk on Github

$ curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar" -k
$ chmod +x ~/.embulk/bin/embulk
$ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc
$ embulk gem install embulk-output-td
$ embulk gem install embulk-input-s3

インポート対象のファイル

ELBのログの仕様は下記に記載されています。
Elastic Load Balancing アクセスログ

ログの有効化については、こちら

フィールド名 フィールド値の例
timestamp 2014-02-15T23:39:43.945958Z
elb my-test-loadbalancer
client:port 192.168.131.39:2817
backend:port 10.0.0.1:80
request_processing_time 0.000073
backend_processing_time 0.001048
response_processing_time 0.000057
elb_status_code 200
backend_status_code 200
received_bytes 0
sent_bytes 29
"request" "GET http://www.example.com: 80/HTTP/1.1"

ファイル生成のルール: "{Bucket}/{Prefix}/AWSLogs/{AWS AccountID}/elasticloadbalancing/{Region}/{Year}/{Month}/{Day}/{AWS Account ID}elasticloadbalancing{Region}{Load Balancer Name}{End Time}{Load Balancer IP}{Random String}.log"

上記のデータをトレジャーデータでアップロードする際の注意点としては、
カラム名の対応がTreasuredataでは、英数字小文字または'_'のみとなっています。
また、TreasureDataのパーティショニングのキーであるtimeカラムはunixtimeのため、timestampのフィールドを使う際にも丸め込みが必要となります。

この点に注意して、次のGuessコマンドをしていきましょう。

embulk-input-s3の設定

それでは、ELBのログをまずはGuessしてみます。
そのためにはまずは必要な設定ファイルを
embulk-input-s3でチェックして、必要な項目を修正します。

elb.yml
in:
  type: s3
  bucket: my-s3-bucket
  path_prefix: AWSLogs/9192993993/elasticloadbalancing/
  access_key_id: ABCXYZ123ABCXYZ123
  secret_access_key: AbCxYz123aBcXyZ123
$ embulk guess elb.yml -o config.yml
2015-06-26 17:36:25.714 -0700: Embulk v0.6.13
2015-06-26 17:36:26.805 -0700 [INFO] (guess): Loaded plugin embulk-input-s3 (0.2.0)
2015-06-26 17:36:28.973 -0700 [INFO] (guess): Loaded plugin embulk-input-s3 (0.2.0)
in:
  type: s3
  bucket: my-s3-bucket
  path_prefix: AWSLogs/9192993993/elasticloadbalancing/
  access_key_id: ABCXYZ123ABCXYZ123
  secret_access_key: AbCxYz123aBcXyZ123
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: ''
    escape: ''
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: '2015-06-10T23:50:48.421631Z production-event-collector 66.249.82.184:63668
        10.153.176.66:5140 0.000041 0.013111 0.000021 200 200 1289 16 "POST http://uredata.com:80/js/v3/event/
        HTTP/1.1" "Mozilla/5.0 (Linux; Android 4.4.4; 401SO Build/23.0.H.0.334) AppleWebKit/537.36
        (KHTML', type: string}
    - {name: ' like Gecko) Chrome/43.0.2357.92 Mobile Safari/537.36" - -', type: string}
exec: {}

Created 'config.yml' file.

columnsはあまりうまくguessれてないようです。もう一度ELBのドキュメントをみてみると、
ELBのログはヘッダーがないことや、ログエントリのすべてのフィールドはスペースで区切られているようです。

2014-02-15T23:39:43.945958Z my-test-loadbalancer 192.168.131.39:2817 10.0.0.1:80 0.000073 0.001048 0.000057 200 200 0 29 "GET http://www.example.com:80/HTTP/1.1"

それでは、新たに分かったことと、前節で分かったことを組み合わせて、生成されたconfig.ymlを編集してみます。
主な編集点としては、

  • delimiterとtrim_if_not_quotedとskip_header_linesを編集しています。
  • columnsもTreasureDataに入れやすい形に変更しています。
  • requestがTCPリスナーの場合にはURLが入力されない代わりに、スペースで区切られ、末尾がスペースである 3 個のダッシュが引用符で囲まれて表示されます(例: "- - - ")。これを無視するためにallow_optional_columnsをtrueにし、カラムが足りない行をスキップする代わりにnullをセットさせています。
config.yml
in:
  type: s3
  bucket: my-s3-bucket
  path_prefix: AWSLogs/9192993993/elasticloadbalancing/
  access_key_id: ABCXYZ123ABCXYZ123
  secret_access_key: AbCxYz123aBcXyZ123
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ' '
    quote: ''
    escape: ''
    trim_if_not_quoted: false
    skip_header_lines: 0
    allow_extra_columns: true
    allow_optional_columns: false
    columns:
      - name: timestamp
        type: timestamp
        format: '%Y-%m-%dT%H:%M:%S.%NZ'
      - name: elb
        type: string
      - name: client_port
        type: string
      - name: backend_port
        type: string
      - name: request_processing_time
        type: double
      - name: backend_processing_time
        type: double
      - name: response_processing_time
        type: double
      - name: elb_status_code
        type: string
      - name: backend_status_code
        type: string
      - name: received_bytes
        type: double
      - name: sent_bytes
        type: double
      - name: request
        type: string

なんとなく設定ファイルができたような気がします。それではPreviewコマンドで見てみましょう。
なんとなーくよさそうですね。

$ embulk preview config.yml
2015-06-26 17:57:00.210 -0700: Embulk v0.6.13
2015-06-26 17:57:01.910 -0700 [INFO] (preview): Loaded plugin embulk-input-s3 (0.2.0)
+-------------+---------+----+--+-------------+-------------+--------------+-----+---------+----++------------------+
|  timestamp:timestamp |  elb:string |    client_port:string | backend_port:string | request_processing_time:double | backend_processing_time:double | response_processing_time:double | elb_status_code:string | backend_status_code:string | received_bytes:double | sent_bytes:double |    request:string |
+-------------+---------+----+--+-------------+-------------+--------------+-----+---------+----++------------------+
| 2015-06-10 23:50:48.421631 UTC | plector |   66.249.82.184:63668 |  10.153.176.66:5140 |4.1E-5 |   0.013111 | 2.1E-5 |200 |    200 | 1289.0 |    16.0 |  POST http://treasuredata.com/js/v3/event/ HTTP/1.1 |

+-------------+---------+----+--+-------------+-------------+--------------+-----+---------+----++------------------+

embulk-output-tdの設定

次にembulk-output-tdの設定方法を見てみます。

out:
  type: td
  apikey: <your apikey>
  endpoint: api.treasuredata.com
  database: my_db
  table: my_table
  time_column: created_at

それではこれを元に、config.ymlに追記をします。

config.yml
in:
  type: s3
  bucket: event-collect-access
  path_prefix: AWSLogs/9192993993/elasticloadbalancing/
  access_key_id: ABCXYZ123ABCXYZ123
  secret_access_key: AbCxYz123aBcXyZ123
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ' '
    quote: ''
    escape: ''
    trim_if_not_quoted: false
    skip_header_lines: 0
    allow_extra_columns: true
    allow_optional_columns: false
    columns:
      - name: timestamp
        type: timestamp
        format: '%Y-%m-%dT%H:%M:%S.%NZ'
      - name: elb
        type: string
      - name: client_port
        type: string
      - name: backend_port
        type: string
      - name: request_processing_time
        type: double
      - name: backend_processing_time
        type: double
      - name: response_processing_time
        type: double
      - name: elb_status_code
        type: string
      - name: backend_status_code
        type: string
      - name: received_bytes
        type: double
      - name: sent_bytes
        type: double
      - name: request
        type: string
out:
  type: td
  apikey: b0747190f~4b4dbbd43c32a
  database: support
  table: embulk
  time_column: timestamp

実行

$ embulk run config.yml
2015-06-26 18:04:17.628 -0700: Embulk v0.6.13
2015-06-26 18:04:19.673 -0700 [INFO] (transaction): Loaded plugin embulk-input-s3 (0.2.0)
2015-06-26 18:04:19.720 -0700 [INFO] (transaction): Loaded plugin embulk-input-s3 (0.2.0)
2015-06-26 18:04:19.721 -0700 [INFO] (transaction): Loaded plugin embulk-output-td (0.1.0)
2015-06-26 18:04:21.636 -0700 [INFO] (transaction): Logging initialized @7310ms
2015-06-26 18:04:22.843 -0700 [INFO] (transaction): Duplicating timestamp:timestamp column to 'time' column for the data partitioning
2015-06-26 18:04:22.848 -0700 [INFO] (transaction): Create bulk_import session embulk_20150627_010419_568000000
2015-06-26 18:04:23.201 -0700 [INFO] (transaction): {done:  0 / 4, running: 0}
2015-06-26 18:04:23.219 -0700 [INFO] (task-0001): Duplicating timestamp:timestamp column to 'time' column for the data partitioning
2015-06-26 18:04:23.219 -0700 [INFO] (task-0002): Duplicating timestamp:timestamp column to 'time' column for the data partitioning
2015-06-26 18:04:23.222 -0700 [INFO] (task-0000): Duplicating timestamp:timestamp column to 'time' column for the data partitioning
2015-06-26 18:04:23.228 -0700 [INFO] (task-0003): Duplicating timestamp:timestamp column to 'time' column for the data partitioning
2015-06-26 18:04:57.753 -0700 [INFO] (task-0000): {uploading: {rows: 100000, size: 12,314,336 bytes (compressed)}}
2015-06-26 18:04:58.084 -0700 [INFO] (task-0001): {uploading: {rows: 100000, size: 12,486,764 bytes (compressed)}}
2015-06-26 18:04:58.156 -0700 [INFO] (task-0003): {uploading: {rows: 100000, size: 12,446,634 bytes (compressed)}}
2015-06-26 18:04:58.202 -0700 [INFO] (task-0002): {uploading: {rows: 100000, size: 12,434,280 bytes (compressed)}}
2015-06-26 18:05:13.977 -0700 [INFO] (transaction): {done:  3 / 4, running: 1}
2015-06-26 18:05:13.977 -0700 [INFO] (transaction): {done:  3 / 4, running: 1}
2015-06-26 18:05:13.977 -0700 [INFO] (transaction): {done:  3 / 4, running: 1}
2015-06-26 18:05:18.304 -0700 [INFO] (transaction): {done:  4 / 4, running: 0}
2015-06-26 18:05:18.892 -0700 [INFO] (transaction): Performing bulk import session 'embulk_20150627_010419_568000000'
2015-06-26 18:05:53.696 -0700 [INFO] (transaction):     job id: 27749060
2015-06-26 18:05:53.697 -0700 [INFO] (transaction): Committing bulk import session 'embulk_20150627_010419_568000000'
2015-06-26 18:05:53.697 -0700 [INFO] (transaction):     valid records: 400000
2015-06-26 18:05:53.697 -0700 [INFO] (transaction):     error records: 0
2015-06-26 18:05:53.697 -0700 [INFO] (transaction):     valid parts: 4
2015-06-26 18:05:53.697 -0700 [INFO] (transaction):     error parts: 0
2015-06-26 18:06:03.733 -0700 [INFO] (transaction): Deleting bulk import session 'embulk_20150627_010419_568000000'
2015-06-26 18:06:04.447 -0700 [INFO] (main): Committed.
2015-06-26 18:06:04.447 -0700 [INFO] (main): Next config diff: {"in":{"last_path":"AWSLogs/9192993993/elasticloadbalancing/us-east-1/2015/06/10/52390_elasticloadbalancing_us-east-1_production-event-collector_2015064.10.178_xVZNKUd1.log"},"out":{"last_session":"embulk_20150627_010419_568000000"}}

ログの途中でembulkからbulkimportのジョブが実行され、
joblist

インポート完了後にテーブルにインポートができていることがわかります。
Preview window

これにより様々なデータソースと組み合わせて、TreasureDataにもデータをインポートすることが可能になりますね!

おわりに

これでELBのログをEmbulkをつかって簡単にTreasureDataにインポートすることができましたね。
様々なデータソースからTreasureDataにデータを入れてみたいと思います。

14
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
12