Python and Treasure Data
トレジャーデータでは、何よりも分析者、意思決定者のデータ分析を楽にすることをビジョンとしています。SQLで生データを触れる当社サービスですが、より込み入った分析をしたり、既存のスクリプトやツール群を活かそうと考えると、どうしてもRやPythonに手を伸ばしたくなります。
Rに関してはJDBCドライバーから接続することができ、実際お客様にもご利用いただいているのですが、そういえばPythonからどうやるんだったっけなと先日ふと思いまして、ちょっと調べてみたところ、衝撃的な事実が発覚しました。
トレジャーデータにはPython Clientがない!(UPDATE: うちのエースpythonistaが作りました)
サードパーティーのがあるかなと探してみたりもしましたがありません。これでは、Python/Pandasを十八番としているデータサイエンティストの方々から背を向けられてしまいます。
しかしここで更なる衝撃的な事実が発覚しました。
たった60数行のpythonコードを書くだけで、トレジャーデータに投げたクエリの結果がPandasのdataframeとして取れることがわかったのです。
以下が、APIの挙動確認、Pythonのおさらい、そしてPandasの学習を含め4時間で書かれた拙作スクリプトとなります。これもトレジャーデータのREST APIのお陰…じゃなかったPythonのrequestsライブラリのおかげです。
td-client-pandas in 60 lines
import requests
import msgpack
from time import sleep
import json
import pandas
class TreasureData:
def __init__(self, apikey):
self._apikey = apikey
self._endpoint = 'https://api.treasuredata.com'
self._base_headers = {"Authorization": "TD1 %s"%self._apikey}
def query(self, database, query, **opts):
job_id = self._query(database, query, **opts)
while True:
job = self._get_job(job_id)
if job["status"] == 'success':
break
sleep(5)
return {"cursor": self.fetch_result(job_id),
"schema": json.loads(job['hive_result_schema'])}
def _get_job(self, job_id):
request_url = "%s/v3/job/show/%d"%(self._endpoint, job_id)
response = requests.get(request_url, headers=self._base_headers)
return response.json()
def fetch_result(self, job_id):
request_url = "%s/v3/job/result/%d"%(self._endpoint, job_id)
response = requests.get(request_url,
params={"format":"msgpack"},
headers=self._base_headers)
unpacker = msgpack.Unpacker()
for chunk in response.iter_content(8192):
unpacker.feed(chunk)
for unpacked in unpacker:
yield unpacked
def _query(self, database, query, **opts):
if opts.has_key("engine") and opts["engine"] == "tqa":
engine = "presto"
else:
engine = "hive"
request_url = "%s/v3/job/issue/%s/%s"%(self._endpoint, engine, database)
response = requests.post(request_url,
params={"query":query},
headers=self._base_headers)
job_id = int(response.json()["job_id"])
return job_id
class TreasureDataConnector:
def __init__(self, td_apikey):
self._td_client = TreasureData(td_apikey)
self._last_job_id = None
def query(self, database, query, **opts):
result = self._td_client.query(database, query, **opts)
columns, _ = zip(*result['schema'])
data = dict(zip(columns, [[] for _ in columns]))
for row in result["cursor"]:
for k, v in zip(columns, row):
data[k].append(v)
return pandas.DataFrame(data)
実際にはこんな感じになります。
In [4]: conn = TreasureDataConnector("a05a6256ec6f32949d55271276777f502b53f7a2")
In [5]: df = conn.query('demo', 'select count(1) as foo, host from syslog group by host')
In [6]: df
Out[6]:
foo host
0 1 pulchritude
1 782 pulchritude.local
ヤッタネ!
Result.next()
正直ぼくはRとExcel人間なので、あまりPandas(というかPython自体4年ぶりに書きました)に馴染みがありません。ただそんな僕でもサクッとできたので、Pandasなデータサイエンティストの方々にも、ぜひトレジャーデータを試していただきたいと思います!