3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

PipelineDBをセットアップして触ってみよう

Posted at

前回は、PipelineDBが何者かざっくりと書いてみましたが、今回はひとまずPipelineDBをセットアップしてサンプル・データを流し込むところまでをやってみましょう。

Installation

参考: http://docs.pipelinedb.com/installation.html

Linuxの場合

PipelineDBのインストールは主要なLinuxOSそれぞれに対してパッケージが用意されているのですぐに完了します。
AmazonLinuxの場合であれば、

$ wget https://www.pipelinedb.com/download/0.8.4/amazon-linux -O pipelinedb-0.8.4.rpm
$ sudo rpm -ivh pipelinedb-0.8.4.rpm

上記でインストール完了です。インストール後、データの保存場所の初期化のため、下記を実行して下さい。

$ pipeline-init -D <データを保存したいDirectory>

Macの場合

Macで使いたい方はダウンロードページよりpkgファイルを落としてダブルクリックするだけで利用開始できます。

Dockerの場合

Dockerでもイメージが公開されているため、そちらでの利用も可能です。


$ docker run -v /dev/shm:/dev/shm pipelinedb/pipelinedb

起動・停止

起動・停止はpipline-ctlコマンドで実行します。

$ pipeline-ctl -D <データ保存Directory> -l <ログファイル名> start
$ cat <ログファイル名>
    ____  _            ___            ____  ____
   / __ \(_)___  ___  / (_)___  ___  / __ \/ __ )
  / /_/ / / __ \/ _ \/ / / __ \/ _ \/ / / / __  |
 / ____/ / /_/ /  __/ / / / / /  __/ /_/ / /_/ /
/_/   /_/ .___/\___/_/_/_/ /_/\___/_____/_____/
       /_/

LOG:  database system was shut down at 2015-10-28 00:17:51 JST
LOG:  MultiXact member wraparound protections are now enabled
LOG:  autovacuum launcher started
LOG:  database system is ready to accept connections
LOG:  continuous query scheduler started
LOG:  registering background worker "worker0 [pipeline]"
LOG:  registering background worker "combiner0 [pipeline]"
LOG:  starting background worker process "combiner0 [pipeline]"
LOG:  starting background worker process "worker0 [pipeline]"
LOG:  continuous query process "combiner0 [pipeline]" running with pid 65289
LOG:  continuous query process "worker0 [pipeline]" running with pid 65290

停止時はstopを指定するだけです。

$ pipeline-ctl -D pipelinedb stop
waiting for server to shut down.... done
server stopped

Create Continuous View

PipelineDB上のデータストリームはContinuousViewという形で表現されますが、これはSQLとして扱う上ではテーブルと考えてもらって結構です。
ひとまず今回は後述するサンプルデータに合わせたContinuousViewを設定します。

$ pipline pipeline
pipeline (9.4.4)
Type "help" for help.

pipeline=# CREATE CONTINUOUS VIEW wiki_stats AS
pipeline-# SELECT hour::timestamp, project::text,
pipeline-#         count(*) AS total_pages,
pipeline-#         sum(view_count::bigint) AS total_views,
pipeline-#         min(view_count) AS min_views,
pipeline-#         max(view_count) AS max_views,
pipeline-#         avg(view_count) AS avg_views,
pipeline-#         percentile_cont(0.99) WITHIN GROUP (ORDER BY view_count) AS p99_views,
pipeline-#         sum(size::bigint) AS total_bytes_served
pipeline-# FROM wiki_stream
pipeline-# GROUP BY hour, project;
CREATE CONTINUOUS VIEW

ContinuousView自体の解説はまた別な投稿にしたいと思いますが、ざっくり言うとストリームデータを一定条件でサマライズしたデータを保持するテーブルです。FROM句の部分のwiki_streamはstreamとよばれ、実際のストリームデータ一行一行と対応するテーブルのようなものになります。これは事前に定義する必要自体はありません(実際は事前に型定義した方がパフォーマンスがでるとのことですが)。このstreamにはINSERTCOPYコマンドを通じてデータを投入していきます。

Sample Data

PipelineDBの公式がサンプルデータとしてWikipediaのページビューデータを利用しており、こちらを利用することである程度簡単にストリーム処理の中身を体感していただけるようになっています。
参考: http://dumps.wikimedia.org/other/pagecounts-raw/

先ほど作成したCONTINUOUS VIEWで元にしているwiki_streamというstreamにこれを流してみましょう。

$ curl -sL http://pipelinedb.com/data/wiki-pagecounts | gunzip | \
        psql -h localhost -p 5432 -d pipeline -c "
        COPY wiki_stream (hour, project, title, view_count, size) FROM STDIN"

ちなみにPostgreSQL互換なのでpsqlコマンドからもアクセス可能です。
サンプルデータ => 標準出力 => COPYコマンドという流れで、wiki_streamというstreamにタブ区切りのデータを流し込んでいます。

Query

いよいよクエリを流してみましょう。
またpipelineコマンドで繋いでみます。

$ pipeline pipeline
pipeline (9.4.4)
Type "help" for help.

pipeline=# select * from wiki_stats ORDER BY total_views DESC limit 3;
        hour         | project | total_pages | total_views | min_views | max_views |      avg_views       |    p99_views     | total_bytes_served
---------------------+---------+-------------+-------------+-----------+-----------+----------------------+------------------+--------------------
 2015-06-01 01:00:00 | en      |     2590639 |     7640042 |         1 |    368264 |   2.9490955706294856 | 28.8838758913487 |       247434016274
 2015-06-01 00:00:00 | en      |     2556148 |     7554038 |         1 |    406121 |   2.9552428106666750 | 29.1063268187446 |       243707663997
 2015-06-01 01:00:00 | en.mw   |           1 |     5560465 |   5560465 |   5560465 | 5560465.000000000000 |          5560465 |       143619712266
(3 rows)

pipeline=# select * from wiki_stats ORDER BY total_views DESC limit 3;
        hour         | project | total_pages | total_views | min_views | max_views |     avg_views      |    p99_views     | total_bytes_served
---------------------+---------+-------------+-------------+-----------+-----------+--------------------+------------------+--------------------
 2015-06-01 02:00:00 | en      |     2602704 |     7871005 |         1 |    371626 | 3.0241644843209216 | 29.2771876395043 |       255541204297
 2015-06-01 01:00:00 | en      |     2590639 |     7640042 |         1 |    368264 | 2.9490955706294856 | 28.8838758913487 |       247434016274
 2015-06-01 00:00:00 | en      |     2556148 |     7554038 |         1 |    406121 | 2.9552428106666750 | 29.1063268187446 |       243707663997
(3 rows)

pipeline=# select count(*) from wiki_stats;
 count
-------
  3970
(1 row)

データを流しながら簡単に同じクエリを何度かselectしてたり、全行数をカウントしてみましたが、実際に流しているアクセスログの行数に対して、保持しているデータサイズは圧倒的に少ない状態です。
ここまで来るとなんとなくPipelineDBのContinuousViewを通したストリーム集計についてわかってくるのではないでしょうか。

最後に

PipelineDB自体はセットアップも簡単ですし、ひとまず触れて見つつStream処理の中身に触れてみてはいかがでしょうか。

次回は、ContinuousViewの解説 or fluentdからPipelineDBへ接続しつつデータを流してみるところまでやってみます。

3
3
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?