More than 1 year has passed since last update.

Jupyter + Pandas-TD について何か書こうと思っていたところ、Cookpad の有賀さんによる素晴らしい紹介記事が!流れに便乗して、ここでは Pandas-TD の使い方をいくつか紹介したいと思います。

データに素早くアクセスするために

Pandas と Treasure Data を組み合わせるためにスタートした Pandas-TD ですが、最近はどちらかというとインタラクティブなデータ探索を楽にするために開発を続けています。その典型がマジック関数で、Jupyter を開いてすぐクエリを実行したいときに重宝します。

Screen Shot 2015-12-13 at 10.35.44 PM.png

時間を掛けてデータ分析するなら、素の Pandas 関数を使ってプログラミングする方がいいのですが、ちょっとしたログの調査のたびに Python でコードを書くのも面倒です。自動化できるところは自動化し、なるべく簡潔に欲しい結果を得られるようにするのがマジック関数の目標です。

最近は HipChat や Slack による通知にも対応し、簡易的なワークフローを記述できるようにもなってきました。タスクを定期的にスケジュール実行する必要がないのであれば、最初からデータ処理の全てをノートブックに記述しておくことが増えてきました。

notification

例えば、Treasure Data 社では毎月一回、ストレージシステムの利用状況を確認しながら、近く問題になりそうな点や、次に改善すべき箇所を探っています。このプロセスは一つの Jupyter ノートブックになっていて、システムの様々なコンポーネントからデータを集めて集計、可視化するためのロジックが記述されています。

プロジェクト毎に環境を切り替える

Jupyter の起動直後にクエリを発行できるようにするには、あらかじめいくつかの準備が必要です。

まずは API キーの登録ですが、目的に応じて複数のプロジェクトを切り替えられるよう、direnv で環境変数を整理しておくと便利です。例えば、"love-pandas" というプロジェクトがあるなら、love-pandas/.envrc というファイルを作成して、次のような内容を記述しておきます。

# Python のパス設定。プロジェクトのルートを指定する
export PYTHONPATH="/Users/k24d/love-pandas"

# 各種の環境変数を設定
export TD_API_KEY="..."

# Python 仮想環境のアクティベート
source /Users/k24d/miniconda3/bin/activate love-pandas

これで "love-pandas" ディレクトリに移動する度に、指定した環境変数、及び Python 仮想環境がセットされ、ディレクトリを抜けるとクリアされます。ここでは Miniconda で仮想環境を切り替えていますが、この辺りはお好みのやり方で。

Direnv は Homebrew でインストールできます。Miniconda はオフィシャルサイトからダウンロードするか、pyenv 経由でインストールできます。

Conda と pip によるパッケージ管理

Python のパッケージ管理は pip コマンドで行なうのが基本ですが、ソースコードからビルドしようとしてエラーになったり、ビルドに時間が掛かることがあります。Pandas を利用するようなプロジェクトでは、Miniconda に含まれる conda コマンドを用いるのがお勧めです。いつでも環境を再構築できるよう、"conda-requirements.txt" や "requirements.txt" にパッケージのリストを記述しておきましょう。以下は Pandas-TD と共によく用いられるパッケージの例です。

# conda-requirements.txt

# Jupyter と Pandas
pandas
matplotlib
jupyter

# Excel フィアルの読み込み
xlrd
openpyxl

# MySQL や PostgreSQL への接続
sqlalchemy
pymysql
psycopg2
# requirements.txt

# 各プロジェクトで利用するパッケージ
tdclient
pandas-td
...
# パッケージのインストール
$ conda install --file conda-requirements.txt
$ pip install -r requirements.txt

IPython のカスタマイズ

Jupyter でよく利用する拡張モジュールやパッケージは、ノートブックを開いてすぐ使えるようにカスタマイズしておくと楽になります。~/.ipython/profile_default/ipython_config.py を作成し、例えば次のように記述しておきます。

c = get_config()

c.InteractiveShellApp.extensions = [
    # Pandas-TD のマジック関数
    'pandas_td.ipython',
]

c.InteractiveShellApp.exec_lines = [
    # 画像の埋め込みを有効にする
    '%matplotlib inline',
    # よく使うモジュールをロードする
    'import numpy as np',
    'import pandas as pd',
    'import pandas_td as td',
]

以上の準備を整えたら、次のコマンドで Jupyter を起動します。

$ cd love-pandas
direnv: loading .envrc
discarding /Users/k24d/miniconda3/bin from PATH
prepending /Users/k24d/miniconda3/envs/love-pandas/bin to PATH
direnv: export +PYTHONPATH +TD_API_KEY ~PATH

$ jupyter notebook
...
[I 01:23:45.678 NotebookApp] The IPython Notebook is running at: http://localhost:8888/

時系列データの集計

Treasure Data に入れらるのはほとんどが時系列データなので、データの時間推移を集計することがよくあります。マジック関数を使う場合、次のように記述するのが定番です。

In [1]: %%td_presto sample_datasets
   ...: select
   ...:     -- 最初のカラムは "time" にする。td_date_trunc() で切り捨て
   ...:     td_date_trunc('year', time) time,
   ...:
   ...:     -- 二つ目以降のカラムで集計
   ...:     count(1) cnt
   ...: from
   ...:     -- 対象となるテーブル名
   ...:     nasdaq
   ...: where
   ...:     -- td_time_range() で集計期間を指定
   ...:     td_time_range(time, '2000-01-01', '2005-01-01', 'UTC')
   ...: group by
   ...:     -- 一つ目のカラム(= time)でグループ化する
   ...:     1
   ...:
Out[1]:
               cnt
time
2000-01-01  298783
2004-01-01  367748
2001-01-01  311386
2002-01-01  325212
2003-01-01  351573

クエリの結果は Pandas のデータフレームとなり、後から Out[1] (または _1) のようにして参照できます。Pandas で更に集計を続けたいときに用います。

In [2]: df = _1

In [3]: df.sort_index()
Out[3]:
               cnt
time
2000-01-01  298783
2001-01-01  311386
2002-01-01  325212
2003-01-01  351573
2004-01-01  367748

時系列グラフの表示

集計した結果をグラフにするには、--plot オプションを使います。時系列のグラフを生成するには、一つ目のカラム名を time、二つ目のカラムを数値にします。

plot1

カラムが三つ以上ある場合、カラムの型や名前によって動作が変わります。二つ目以降が全て数値の場合、それらが全て一つのグラフに重ねて表示されます。

plot2

一方で、次のように timesymbol の二つで GROUP BY するようなケースでは、二つ目のカラムの中身で色分けされたグラフが表示されます。(暗黙的に pivot() が呼ばれます)

plot3

二つ目のカラムが数値の場合でも、カラム名が _id で終わっていれば後者の pivot グラフを生成します。あるいは cast(... AS varchar) などで明示的に文字列に変換した場合も pivot が実行されます。

クエリ実行結果の保存

データを細かく見ようとすると、毎回クエリを実行するのではなくて、まとまったデータを抽出してから手元で集計する方が高速です。その場合、次のように -o オプションでローカル変数に値を格納できます。

In [1]: %%td_presto sample_datasets -o df1
   ...: select
   ...:     *
   ...: from
   ...:     nasdaq
   ...: where
   ...:     td_time_range(time, '2000-01-01', '2005-01-01')

In [2]: df1.head()
Out[2]:
                    symbol     open  volume     high      low    close
time
2000-01-03 16:00:00   RIVR   6.0625     600   6.3125   6.0625   6.3125
2000-01-03 16:00:00   SMTC  25.9375  689400  28.8750  24.0000  28.6250
2000-01-03 16:00:00   REIS  16.7500    7100  16.7500  16.0000  16.2500
2000-01-03 16:00:00   SNAK   1.5156    6900   1.5156   1.3750   1.5000
2000-01-03 16:00:00   SGMA   6.5625    5800   6.6250   6.0000   6.0000

あまり大量のデータを抽出すると、ダウンロードに長い時間が掛かるため、td_time_range()LIMIT による絞り込みを忘れないで下さい。もし途中で処理を中断したければ Jupyter の Stop ボタンで停止できます。

マジック関数で取り出したデータは、time カラムの値が index に格納されます。デフォルトでは UTC ですが、タイムゾーンを指定したければ -T オプションで指定できます。

In [3]: %%td_presto -T Asia/Tokyo
   ...: select * from nasdaq limit 3
   ...:
Out[3]:
                          symbol  open  volume       high        low      close
time
1978-04-07 01:00:00+09:00   WSCI     0    1300     1.2500     1.0313     1.0313
1978-04-07 01:00:00+09:00   ZION     0     800     1.5821     1.5235     1.5235
1978-04-07 01:00:00+09:00   YRCW     0      98  3937.5000  3825.0000  3825.0000

index ではなくカラムとして扱いたい場合は、reset_index() で変換して下さい。

In [4]: _3.reset_index()
Out[4]:
                       time symbol  open  volume       high        low      close
0 1978-04-07 01:00:00+09:00   WSCI     0    1300     1.2500     1.0313     1.0313
1 1978-04-07 01:00:00+09:00   ZION     0     800     1.5821     1.5235     1.5235
2 1978-04-07 01:00:00+09:00   YRCW     0      98  3937.5000  3825.0000  3825.0000
...

ファイルへの保存

クエリの実行結果をファイルに保存したい場合は、-O オプションを利用します。

In [5]: %%td_presto sample_datasets -O ./nasdaq.csv
   ...: select
   ...:     *
   ...: from
   ...:     nasdaq
   ...: where
   ...:     td_time_range(time, '2000-01-01', '2005-01-01')
INFO: Saved to './nasdaq.csv'

あるいは変数に格納した内容をファイルに保存するなら、to_csv() を使います。

In [6]: df1.to_csv('./nasdaq.csv')

--plot オプションよりも複雑なデータの可視化では、自分はしばしば BI ツールを利用します。個人的には Tableau を愛用しており、CSV ファイルなら無償版の Tableau Public でも扱えます。

tableau.png

Treasure Data から抽出したデータを、Pandas で加工して CSV ファイルにいったん保存し、それをそのまま Jupyter 上で、あるいは外部の BI ツールで細かく見ていくのは一つの定番のパターンです。この方法だと、途中で中間テーブルを作成したりスキーマを定義する必要が一切ないので、アドホックな分析作業がはかどります。抽出するデータがざっと数百万レコードまでなら、このパターンで対応できます。

デフォルトデータベースの指定

%%td_presto sample_datasets のように毎回データベース名を入力する代わりに、%td_use を用いてデフォルトのデータベースを指定できます。

In [1]: %td_use sample_datasets
INFO: import nasdaq
INFO: import www_access

In [2]: %%td_presto
   ...: select count(1) cnt from nasdaq
   ...: 
Out[2]:
    cnt
0  5000

%td_use を利用すると、テーブル名がノートブックの名前空間に取り込まれ、カラム名の補完なども行なえるようになります。

completion

変数の代入

何度も実行するクエリでは、SQL に変数で値を埋め込めると便利です。マジック関数では、{name} という書式で変数の代入に対応しています。(内部的には、Python 標準の format() メソッドを利用しています。)


In [1]: TIME_START = '2000-01-01'

In [2]: TIME_END = '2005-01-01'

In [3]: %%td_presto sample_datasets
   ...: select
   ...:     td_date_trunc('year', time) time,
   ...:     count(1) cnt
   ...: from
   ...:     nasdaq
   ...: where
   ...:     -- 変数を代入するには {name} 表記を用いる
   ...:     td_time_range(time, '{TIME_START}', '{TIME_END}')
   ...: group by
   ...:     1

バックグラウンドジョブの実行

Hive を使うときは待ち時間が長くなりがちなので、キューを作成してバックグラウンドで走らせておくと、じっと待ち続けないで済みます。オプション -a に続けてキューの名前を指定します。

In [1]: %%td_hive sample_datasets -a q1
   ...: select * from www_access
Queued as q1[0]

status() でキューに格納されたジョブの状態を確認できます。wait() で全てのジョブが終了するまで待ちます。一度 wait() を呼ぶと、そのキューに新たなジョブを登録することは出来なくなります。

In [2]: q1.wait()

In [3]: q1.status()
Out[3]:
                  created_at      status    job_id
0  2015-12-13 01:23:45+09:00  downloaded  42492766

クエリの実行結果は、後から result() を呼ぶことで取得できます。

In [4]: q1[0].result()
Out[4]:
                     user             host                   path  ...
time
2014-10-03 23:59:53  None    76.45.175.151      /item/sports/4642  ...
2014-10-03 23:59:44  None  184.162.105.153      /category/finance  ...
2014-10-03 23:59:35  None    144.30.45.112  /item/electronics/954  ...

あるいは -o オプションと組み合わせると、ジョブの完了後に変数にアクセスできるようになります。

In [5]: %%td_hive sample_datasets -a q2 -o df1
   ...: select * from www_access
Queued as q2[0]

In [6]: # しばらく待つ

In [7]: df1
Out[7]:
                     user             host                   path  ...
time
2014-10-03 23:59:53  None    76.45.175.151      /item/sports/4642  ...
2014-10-03 23:59:44  None  184.162.105.153      /category/finance  ...
2014-10-03 23:59:35  None    144.30.45.112  /item/electronics/954  ...

Notification を設定すると、ジョブのエラーや完了の通知を受け取ることが出来ます。現時点では HipChatSlack に対応しています。

hipchat

キューの管理や通知は、ノートブックのプロセス内でマルチスレッド実行しているため、Jupyter を停止しないようにして下さい。

ワークフローの実行

Jupyter のメニューから "Cell > Run All" を選択すると、全てのセルを最初から順に実行してくれます。Treasure Data のクエリは、時間の掛かるものはキューに登録するようにしておくと、それらが並列で実行されて時間を短縮できます。

In [1]: date = '2015-01-01'

In [2]: %%td_hive -a q1 -o df1
        SELECT ... FROM table1 WHERE td_time_range(time, '{date}')

In [3]: %%td_hive -a q1 -o df2
        SELECT ... FROM table2 WHERE td_time_range(time, '{date}')

In [4]: q1.wait()

In [5]: # df1 と df2 の結果を表示
        ...

毎日スケジュール実行する必要のないワークフローは、こうしてノートブックにまとめておいて、必要なときにだけ "Run All" するのが簡単です。

まとめ

Jupyter を使うと、自分が実行したクエリをノートブックという形で保存しておけるので、よく使うクエリの整理や再実行に役立ちます。特にデータの探索や、試行錯誤しながらのアドホッククエリの実行が楽になります。お試しください。