前回は、PipelineDBが何者かざっくりと書いてみましたが、今回はひとまずPipelineDBをセットアップしてサンプル・データを流し込むところまでをやってみましょう。
Installation
参考: http://docs.pipelinedb.com/installation.html
Linuxの場合
PipelineDBのインストールは主要なLinuxOSそれぞれに対してパッケージが用意されているのですぐに完了します。
AmazonLinuxの場合であれば、
$ wget https://www.pipelinedb.com/download/0.8.4/amazon-linux -O pipelinedb-0.8.4.rpm
$ sudo rpm -ivh pipelinedb-0.8.4.rpm
上記でインストール完了です。インストール後、データの保存場所の初期化のため、下記を実行して下さい。
$ pipeline-init -D <データを保存したいDirectory>
Macの場合
Macで使いたい方はダウンロードページよりpkgファイルを落としてダブルクリックするだけで利用開始できます。
Dockerの場合
Dockerでもイメージが公開されているため、そちらでの利用も可能です。
$ docker run -v /dev/shm:/dev/shm pipelinedb/pipelinedb
起動・停止
起動・停止はpipline-ctl
コマンドで実行します。
$ pipeline-ctl -D <データ保存Directory> -l <ログファイル名> start
$ cat <ログファイル名>
____ _ ___ ____ ____
/ __ \(_)___ ___ / (_)___ ___ / __ \/ __ )
/ /_/ / / __ \/ _ \/ / / __ \/ _ \/ / / / __ |
/ ____/ / /_/ / __/ / / / / / __/ /_/ / /_/ /
/_/ /_/ .___/\___/_/_/_/ /_/\___/_____/_____/
/_/
LOG: database system was shut down at 2015-10-28 00:17:51 JST
LOG: MultiXact member wraparound protections are now enabled
LOG: autovacuum launcher started
LOG: database system is ready to accept connections
LOG: continuous query scheduler started
LOG: registering background worker "worker0 [pipeline]"
LOG: registering background worker "combiner0 [pipeline]"
LOG: starting background worker process "combiner0 [pipeline]"
LOG: starting background worker process "worker0 [pipeline]"
LOG: continuous query process "combiner0 [pipeline]" running with pid 65289
LOG: continuous query process "worker0 [pipeline]" running with pid 65290
停止時はstop
を指定するだけです。
$ pipeline-ctl -D pipelinedb stop
waiting for server to shut down.... done
server stopped
Create Continuous View
PipelineDB上のデータストリームはContinuousViewという形で表現されますが、これはSQLとして扱う上ではテーブルと考えてもらって結構です。
ひとまず今回は後述するサンプルデータに合わせたContinuousViewを設定します。
$ pipline pipeline
pipeline (9.4.4)
Type "help" for help.
pipeline=# CREATE CONTINUOUS VIEW wiki_stats AS
pipeline-# SELECT hour::timestamp, project::text,
pipeline-# count(*) AS total_pages,
pipeline-# sum(view_count::bigint) AS total_views,
pipeline-# min(view_count) AS min_views,
pipeline-# max(view_count) AS max_views,
pipeline-# avg(view_count) AS avg_views,
pipeline-# percentile_cont(0.99) WITHIN GROUP (ORDER BY view_count) AS p99_views,
pipeline-# sum(size::bigint) AS total_bytes_served
pipeline-# FROM wiki_stream
pipeline-# GROUP BY hour, project;
CREATE CONTINUOUS VIEW
ContinuousView自体の解説はまた別な投稿にしたいと思いますが、ざっくり言うとストリームデータを一定条件でサマライズしたデータを保持するテーブルです。FROM句の部分のwiki_streamはstreamとよばれ、実際のストリームデータ一行一行と対応するテーブルのようなものになります。これは事前に定義する必要自体はありません(実際は事前に型定義した方がパフォーマンスがでるとのことですが)。このstreamにはINSERT
やCOPY
コマンドを通じてデータを投入していきます。
Sample Data
PipelineDBの公式がサンプルデータとしてWikipediaのページビューデータを利用しており、こちらを利用することである程度簡単にストリーム処理の中身を体感していただけるようになっています。
参考: http://dumps.wikimedia.org/other/pagecounts-raw/
先ほど作成したCONTINUOUS VIEWで元にしているwiki_stream
というstreamにこれを流してみましょう。
$ curl -sL http://pipelinedb.com/data/wiki-pagecounts | gunzip | \
psql -h localhost -p 5432 -d pipeline -c "
COPY wiki_stream (hour, project, title, view_count, size) FROM STDIN"
ちなみにPostgreSQL互換なのでpsqlコマンドからもアクセス可能です。
サンプルデータ => 標準出力 => COPY
コマンドという流れで、wiki_stream
というstreamにタブ区切りのデータを流し込んでいます。
Query
いよいよクエリを流してみましょう。
またpipeline
コマンドで繋いでみます。
$ pipeline pipeline
pipeline (9.4.4)
Type "help" for help.
pipeline=# select * from wiki_stats ORDER BY total_views DESC limit 3;
hour | project | total_pages | total_views | min_views | max_views | avg_views | p99_views | total_bytes_served
---------------------+---------+-------------+-------------+-----------+-----------+----------------------+------------------+--------------------
2015-06-01 01:00:00 | en | 2590639 | 7640042 | 1 | 368264 | 2.9490955706294856 | 28.8838758913487 | 247434016274
2015-06-01 00:00:00 | en | 2556148 | 7554038 | 1 | 406121 | 2.9552428106666750 | 29.1063268187446 | 243707663997
2015-06-01 01:00:00 | en.mw | 1 | 5560465 | 5560465 | 5560465 | 5560465.000000000000 | 5560465 | 143619712266
(3 rows)
pipeline=# select * from wiki_stats ORDER BY total_views DESC limit 3;
hour | project | total_pages | total_views | min_views | max_views | avg_views | p99_views | total_bytes_served
---------------------+---------+-------------+-------------+-----------+-----------+--------------------+------------------+--------------------
2015-06-01 02:00:00 | en | 2602704 | 7871005 | 1 | 371626 | 3.0241644843209216 | 29.2771876395043 | 255541204297
2015-06-01 01:00:00 | en | 2590639 | 7640042 | 1 | 368264 | 2.9490955706294856 | 28.8838758913487 | 247434016274
2015-06-01 00:00:00 | en | 2556148 | 7554038 | 1 | 406121 | 2.9552428106666750 | 29.1063268187446 | 243707663997
(3 rows)
pipeline=# select count(*) from wiki_stats;
count
-------
3970
(1 row)
データを流しながら簡単に同じクエリを何度かselectしてたり、全行数をカウントしてみましたが、実際に流しているアクセスログの行数に対して、保持しているデータサイズは圧倒的に少ない状態です。
ここまで来るとなんとなくPipelineDBのContinuousViewを通したストリーム集計についてわかってくるのではないでしょうか。
最後に
PipelineDB自体はセットアップも簡単ですし、ひとまず触れて見つつStream処理の中身に触れてみてはいかがでしょうか。
次回は、ContinuousViewの解説 or fluentdからPipelineDBへ接続しつつデータを流してみるところまでやってみます。