python-tcptest で Elasticsearch のテスト

  • 9
    いいね
  • 0
    コメント
この記事は最終更新日から1年以上が経過しています。

昨日 python-tcptest のバージョンが上がりPython3系でも使えるようになりました。
(といっても自分でぷるりくん投げたんだけど)

その記念にtcptestを使ってElasticsearchを使ってるPythonのコードをテストする方法を紹介します。

tcptestって?

  1. 空いてるポートを探して
  2. そのポートで指定したサーバーソフトを立ち上げて
  3. テストが終わったらそのサーバーソフトを落とす

というものです。
以下の3点がありがたいんじゃないかなと思います。

  • サーバーソフトの実物を使うのでモックと実物の違いを気にしなくていい
  • DBにテスト用以外のデータが混じってて間違えて一緒に消しちゃったみたいなミスがなくなる
  • わざわざテスト用に自分でサーバーソフトを立てたり落としたりしなくていい (人手の温もりがない)

python-tcptestの入れ方


$ pip install tcptest

python-tcptestの流れ

開始時

  1. TestServer.__init__: タイムアウトとか設定する (起動に時間掛かるものは長めにするとよいです)
  2. TestServer._before_start: ここにサーバー立ち上げ前にやる処理 (一時ディレクトリ作成とか) を書きます (デフォルトでは何もしない)
  3. TestServer.start: 空いてるポートを探して TestServer.build_commandのコマンドを叩きます。
  4. TestServer._after_start: ここにサーバー立ち上げ直後にやる処理 (初期設定をしたりとか) を書きます (デフォルトでは何もしない)

終了時

  1. TestServer._before_stop: ここにサーバー終了前にやる処理を書きます (デフォルトでは何もしない)
  2. TestServer.stop: サーバーを終了します。
  3. TestServer._after_start: ここにサーバー終了後にやる処理 (一時ディレクトリを消したり) を書きます (デフォルトでは何もしない)

Elasticsearch用に書いてみよう

python-tcptest は、最初からRedisとFluentdとmemcachedのテスト用クラスが入ってます。今回はElasticsearchをテストしたいので自分でクラスを作ってみます。


import os
import shutil
import tempfile
import tcptest


class ESTestServer(tcptest.TestServer):

    def build_command(self):
        return ('elasticsearch',
                '-Des.network.bind_host=127.0.0.1',
                '-Des.network.host=127.0.0.1',
                '-Des.http.port=%s' % self.port,
                "-Des.node.master=true",
                "-Des.node.local=true",
                '-Des.path.data=%s' % self.data_dir,
                '-Des.path.logs=%s' % self.logs_dir
               )

    def _before_start(self):
        self.data_dir = tempfile.mkdtemp(prefix='esdata')
        self.logs_dir = tempfile.mkdtemp(prefix='eslogs')

    def _after_stop(self):
        for path in filter(os.path.exists, (self.data_dir, self.logs_dir)):
            shutil.rmtree(path)

tcptest.TestServer を継承して3つメソッドを書き換えるだけ!

確認用コード

elasticsearch-pyのexample をベースに確認してみます。


from datetime import datetime
import os
from elasticsearch import Elasticsearch


with ESTestServer(timeout=30) as server:
    es = Elasticsearch(['localhost:%s' % server.port])
    print(es.transport.hosts)

    # from https://elasticsearch-py.readthedocs.org/en/master/#example-usage
    doc = {
        'author': 'kimchy',
        'text': 'Elasticsearch: cool. bonsai cool.',
        'timestamp': datetime.now(),
    }
    res = es.index(index="test-index", doc_type='tweet', id=1, body=doc)
    print(res['created'])

    res = es.get(index="test-index", doc_type='tweet', id=1)
    print(res['_source'])

    es.indices.refresh(index="test-index")

    res = es.search(index="test-index", body={"query": {"match_all": {}}})
    print("Got %d Hits:" % res['hits']['total'])
    for hit in res['hits']['hits']:
        print("%(timestamp)s %(author)s: %(text)s" % hit["_source"])

    print('data_dir contains:', os.listdir(server.data_dir))
    print('logs_dir contains:', os.listdir(server.logs_dir))
    data_dir = server.data_dir
    logs_dir = server.logs_dir

print('data_dir exists:', os.path.exists(data_dir))
print('logs_dir exists:', os.path.exists(logs_dir))

上のコードを実行するとこんな風になります↓

[{'port': 55003, 'host': 'localhost'}]
True
{'timestamp': '2015-10-30T14:10:16.703556', 'author': 'kimchy', 'text': 'Elasticsearch: cool. bonsai cool.'}
Got 1 Hits:
2015-10-30T14:10:16.703556 kimchy: Elasticsearch: cool. bonsai cool.
data_dir contains: ['elasticsearch_brew']
logs_dir contains: ['elasticsearch_brew.log', 'elasticsearch_brew_index_indexing_slowlog.log', 'elasticsearch_brew_index_search_slowlog.log']
data_dir exists: False
logs_dir exists: False

なんかよくわかんない空いてるポートにElasticsearchを立ち上げて、てきとーなところにデータディレクトリとログのディレクトリを作って、ちゃんと動作して、終わったら綺麗になってることがわかりました。

以上です。