Edited at

Python Pandasからトレジャーデータを叩く

More than 3 years have passed since last update.


Python and Treasure Data

トレジャーデータでは、何よりも分析者、意思決定者のデータ分析を楽にすることをビジョンとしています。SQLで生データを触れる当社サービスですが、より込み入った分析をしたり、既存のスクリプトやツール群を活かそうと考えると、どうしてもRやPythonに手を伸ばしたくなります。

Rに関してはJDBCドライバーから接続することができ、実際お客様にもご利用いただいているのですが、そういえばPythonからどうやるんだったっけなと先日ふと思いまして、ちょっと調べてみたところ、衝撃的な事実が発覚しました。

トレジャーデータにはPython Clientがない!(UPDATE: うちのエースpythonistaが作りました)

サードパーティーのがあるかなと探してみたりもしましたがありません。これでは、Python/Pandasを十八番としているデータサイエンティストの方々から背を向けられてしまいます。

しかしここで更なる衝撃的な事実が発覚しました。

たった60数行のpythonコードを書くだけで、トレジャーデータに投げたクエリの結果がPandasのdataframeとして取れることがわかったのです。

以下が、APIの挙動確認、Pythonのおさらい、そしてPandasの学習を含め4時間で書かれた拙作スクリプトとなります。これもトレジャーデータのREST APIのお陰…じゃなかったPythonのrequestsライブラリのおかげです。


td-client-pandas in 60 lines

import requests

import msgpack
from time import sleep
import json
import pandas

class TreasureData:
def __init__(self, apikey):
self._apikey = apikey
self._endpoint = 'https://api.treasuredata.com'
self._base_headers = {"Authorization": "TD1 %s"%self._apikey}

def query(self, database, query, **opts):
job_id = self._query(database, query, **opts)

while True:
job = self._get_job(job_id)
if job["status"] == 'success':
break
sleep(5)
return {"cursor": self.fetch_result(job_id),
"schema": json.loads(job['hive_result_schema'])}

def _get_job(self, job_id):
request_url = "%s/v3/job/show/%d"%(self._endpoint, job_id)
response = requests.get(request_url, headers=self._base_headers)
return response.json()

def fetch_result(self, job_id):
request_url = "%s/v3/job/result/%d"%(self._endpoint, job_id)
response = requests.get(request_url,
params={"format":"msgpack"},
headers=self._base_headers)

unpacker = msgpack.Unpacker()
for chunk in response.iter_content(8192):
unpacker.feed(chunk)
for unpacked in unpacker:
yield unpacked

def _query(self, database, query, **opts):
if opts.has_key("engine") and opts["engine"] == "tqa":
engine = "presto"
else:
engine = "hive"
request_url = "%s/v3/job/issue/%s/%s"%(self._endpoint, engine, database)
response = requests.post(request_url,
params={"query":query},
headers=self._base_headers)
job_id = int(response.json()["job_id"])
return job_id

class TreasureDataConnector:
def __init__(self, td_apikey):
self._td_client = TreasureData(td_apikey)
self._last_job_id = None

def query(self, database, query, **opts):
result = self._td_client.query(database, query, **opts)
columns, _ = zip(*result['schema'])
data = dict(zip(columns, [[] for _ in columns]))
for row in result["cursor"]:
for k, v in zip(columns, row):
data[k].append(v)
return pandas.DataFrame(data)

実際にはこんな感じになります。

In [4]: conn = TreasureDataConnector("a05a6256ec6f32949d55271276777f502b53f7a2")

In [5]: df = conn.query('demo', 'select count(1) as foo, host from syslog group by host')

In [6]: df
Out[6]:
foo host
0 1 pulchritude
1 782 pulchritude.local

ヤッタネ!


Result.next()

正直ぼくはRとExcel人間なので、あまりPandas(というかPython自体4年ぶりに書きました)に馴染みがありません。ただそんな僕でもサクッとできたので、Pandasなデータサイエンティストの方々にも、ぜひトレジャーデータを試していただきたいと思います!