LoginSignup
16
18

More than 5 years have passed since last update.

Python Pandasからトレジャーデータを叩く

Last updated at Posted at 2014-12-23

Python and Treasure Data

トレジャーデータでは、何よりも分析者、意思決定者のデータ分析を楽にすることをビジョンとしています。SQLで生データを触れる当社サービスですが、より込み入った分析をしたり、既存のスクリプトやツール群を活かそうと考えると、どうしてもRやPythonに手を伸ばしたくなります。

Rに関してはJDBCドライバーから接続することができ、実際お客様にもご利用いただいているのですが、そういえばPythonからどうやるんだったっけなと先日ふと思いまして、ちょっと調べてみたところ、衝撃的な事実が発覚しました。

トレジャーデータにはPython Clientがない!(UPDATE: うちのエースpythonistaが作りました)

サードパーティーのがあるかなと探してみたりもしましたがありません。これでは、Python/Pandasを十八番としているデータサイエンティストの方々から背を向けられてしまいます。

しかしここで更なる衝撃的な事実が発覚しました。

たった60数行のpythonコードを書くだけで、トレジャーデータに投げたクエリの結果がPandasのdataframeとして取れることがわかったのです。

以下が、APIの挙動確認、Pythonのおさらい、そしてPandasの学習を含め4時間で書かれた拙作スクリプトとなります。これもトレジャーデータのREST APIのお陰…じゃなかったPythonのrequestsライブラリのおかげです。

td-client-pandas in 60 lines

import requests
import msgpack
from time import sleep
import json
import pandas

class TreasureData:
    def __init__(self, apikey):
        self._apikey = apikey
        self._endpoint = 'https://api.treasuredata.com'
        self._base_headers = {"Authorization": "TD1 %s"%self._apikey}

    def query(self, database, query, **opts):
        job_id = self._query(database, query, **opts)

        while True:
            job = self._get_job(job_id)
            if job["status"] == 'success':
                break
            sleep(5)
        return {"cursor": self.fetch_result(job_id),
                "schema": json.loads(job['hive_result_schema'])}

    def _get_job(self, job_id):
        request_url = "%s/v3/job/show/%d"%(self._endpoint, job_id)
        response = requests.get(request_url, headers=self._base_headers)
        return response.json()

    def fetch_result(self, job_id):
        request_url = "%s/v3/job/result/%d"%(self._endpoint, job_id)
        response = requests.get(request_url,
                                params={"format":"msgpack"},
                                headers=self._base_headers)

        unpacker = msgpack.Unpacker()
        for chunk in response.iter_content(8192):
            unpacker.feed(chunk)
        for unpacked in unpacker:
            yield unpacked

    def _query(self, database, query, **opts):
        if opts.has_key("engine") and opts["engine"] == "tqa":
            engine = "presto"
        else:
            engine = "hive"
        request_url = "%s/v3/job/issue/%s/%s"%(self._endpoint, engine, database)
        response = requests.post(request_url,
                                 params={"query":query},
                                 headers=self._base_headers)
        job_id = int(response.json()["job_id"])
        return job_id

class TreasureDataConnector:
    def __init__(self, td_apikey):
        self._td_client = TreasureData(td_apikey)
        self._last_job_id = None

    def query(self, database, query, **opts):
        result = self._td_client.query(database, query, **opts)
        columns, _ = zip(*result['schema'])
        data = dict(zip(columns, [[] for _ in columns]))
        for row in result["cursor"]:
            for k, v in zip(columns, row):
                data[k].append(v)
        return pandas.DataFrame(data)

実際にはこんな感じになります。

In [4]: conn = TreasureDataConnector("a05a6256ec6f32949d55271276777f502b53f7a2")

In [5]: df = conn.query('demo', 'select count(1) as foo, host from syslog group by host')

In [6]: df
Out[6]:
   foo               host
0    1        pulchritude
1  782  pulchritude.local

ヤッタネ!

Result.next()

正直ぼくはRとExcel人間なので、あまりPandas(というかPython自体4年ぶりに書きました)に馴染みがありません。ただそんな僕でもサクッとできたので、Pandasなデータサイエンティストの方々にも、ぜひトレジャーデータを試していただきたいと思います!

16
18
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
16
18