Posted at
DiverseDay 4

Treasure Data の集計結果を Elasticsearch に保存して利用する

More than 3 years have passed since last update.

毎日いろんなデータを取り、集計し、可視化し、分析し、日頃の業務改善のアイディアを考えたりすると、仕事してる気になって良いですね。

データの保存と集計を Treasure Data 様にお任せできるのは、本当にすごいし便利です。

最初は hive や presto のクエリを実行するスクリプトを LL で書いて cron で回して自前の MySQL に保存する等やっていたのですが、なんかそれは色々面倒でした。実は Treasure Data のクエリは、定期実行スケジュールや、集計結果の保存先が指定できることを知りました。

そこで、実行結果を HTTP PUT で WebDAV に保存した上で、json を変換して Elasticsearch に送り、最終的に Kibana で見れるようにしてみました。


クエリの設定

毎時実行されるようにスケジューリングする

31d768db806d02dd048c15c0a3e98d5c.png

WebDAV に HTTP PUT する

6a7c125812e589ddb77e40b5c2e2a541.png


受け取ったデータを投げるスクリプト(cronで定期実行)


put-to-elasticsearch.pl

#!/usr/bin/env perl

use strict;
use warnings;
use JSON qw(decode_json);
use LWP::UserAgent;
use JSON::XS;

my $elasticsearch_url = "http://elasticsearch.example.com";
my $webdav_dir = "/path/to/webdav";
my $_json = JSON::XS->new->allow_nonref->convert_blessed->allow_blessed->utf8;
sub as_json {
$_json->encode(shift);
}

sub new_head {
my ($idx, $id) = @_;
return as_json({
index => {
_index => $idx,
_type => 'event',
_id => filter_int($id),
},
});
}

sub filter_datetime {
my $x = shift;
return unless $x;
$x =~ s/ /T/;
$x =~ s/$/+09:00/;
$x;
}

sub filter_int {
my $x = shift;
$x + 0;
}

sub main {
my $idx = shift(@ARGV) or die "require param";

my $ua = LWP::UserAgent->new(agent => 'Importer');

open(my $fh, "< $webdav_dir/$idx") or die "open $idx failed : $!";
my $json = do { local $/; <$fh>; };
close($fh);

my $data = decode_json($json);

my @buffer;
my @names = @{ $data->{column_names} };
my $id = time();
for my $array (@{ $data->{data} }) {
my $row;
@{$row}{@names} = @$array;

# 一時間単位の時間を day_hour で出力しているので、形式を変換
# TD_TIME_FORMAT(time, 'yyyy-MM-dd_HH', 'JST') AS day_hour
my $dt = delete($row->{day_hour});
$dt =~ s/(\d+-\d+-\d+)_(\d+)/$1 $2:00:00/;
push(@buffer, new_head( $idx, ++$id ));
$row->{'@timestamp'} = filter_datetime($dt);
push(@buffer, as_json($row));
}

my $res = $ua->post("$elasticsearch_url/_bulk", Content => join("\n", @buffer));
warn $res->content;
}

main;



おわりに

Elasticsearch にデータを入れてしまえば、使い慣れた kibana でがんがん可視化して新たな発見をするだけですね!

データ分析を楽にしましょう!