Treasure Data の集計結果を Elasticsearch に保存して利用する

  • 11
    いいね
  • 0
    コメント
この記事は最終更新日から1年以上が経過しています。

毎日いろんなデータを取り、集計し、可視化し、分析し、日頃の業務改善のアイディアを考えたりすると、仕事してる気になって良いですね。
データの保存と集計を Treasure Data 様にお任せできるのは、本当にすごいし便利です。
最初は hive や presto のクエリを実行するスクリプトを LL で書いて cron で回して自前の MySQL に保存する等やっていたのですが、なんかそれは色々面倒でした。実は Treasure Data のクエリは、定期実行スケジュールや、集計結果の保存先が指定できることを知りました。
そこで、実行結果を HTTP PUT で WebDAV に保存した上で、json を変換して Elasticsearch に送り、最終的に Kibana で見れるようにしてみました。

クエリの設定

毎時実行されるようにスケジューリングする
31d768db806d02dd048c15c0a3e98d5c.png

WebDAV に HTTP PUT する
6a7c125812e589ddb77e40b5c2e2a541.png

受け取ったデータを投げるスクリプト(cronで定期実行)

put-to-elasticsearch.pl
#!/usr/bin/env perl
use strict;
use warnings;
use JSON qw(decode_json);
use LWP::UserAgent;
use JSON::XS;

my $elasticsearch_url = "http://elasticsearch.example.com";
my $webdav_dir = "/path/to/webdav";
my $_json = JSON::XS->new->allow_nonref->convert_blessed->allow_blessed->utf8;
sub as_json {
    $_json->encode(shift);
}

sub new_head {
    my ($idx, $id) = @_;
    return as_json({
        index => {
            _index => $idx,
            _type => 'event',
            _id => filter_int($id),
        },
    });
}

sub filter_datetime {
    my $x = shift;
    return unless $x;
    $x =~ s/ /T/;
    $x =~ s/$/+09:00/;
    $x;
}

sub filter_int {
    my $x = shift;
    $x + 0;
}

sub main {
    my $idx = shift(@ARGV) or die "require param";

    my $ua = LWP::UserAgent->new(agent => 'Importer');

    open(my $fh, "< $webdav_dir/$idx") or die "open $idx failed : $!";
    my $json = do { local $/; <$fh>; };
    close($fh);

    my $data = decode_json($json);

    my @buffer;
    my @names = @{ $data->{column_names} };
    my $id = time();
    for my $array (@{ $data->{data} }) {
        my $row;
        @{$row}{@names} = @$array;

        # 一時間単位の時間を day_hour で出力しているので、形式を変換
        # TD_TIME_FORMAT(time, 'yyyy-MM-dd_HH', 'JST') AS day_hour
        my $dt = delete($row->{day_hour});
        $dt =~ s/(\d+-\d+-\d+)_(\d+)/$1 $2:00:00/;
        push(@buffer, new_head( $idx, ++$id ));
        $row->{'@timestamp'} = filter_datetime($dt);
        push(@buffer, as_json($row));
    }

    my $res = $ua->post("$elasticsearch_url/_bulk", Content => join("\n", @buffer));
    warn $res->content;
}

main;

おわりに

Elasticsearch にデータを入れてしまえば、使い慣れた kibana でがんがん可視化して新たな発見をするだけですね!
データ分析を楽にしましょう!

この投稿は Diverse Advent Calendar 20154日目の記事です。