はじめに
Elasticsearchには、ドキュメント更新を1クエリで実行できるBulk APIなんてものがあります。テストデータを投入するときとかに便利なので、使っている方も多いかと思います。
アプリからElasticsearchにデータを投入するとき、Fluentdとかを経由すると自動でBulk APIを使ってくれるのですが、直にデータを投入するときは面倒だから1つずつで良いや、とかなったりします。性能差が大きいようなら積極的にBulk APIを使った方が良いよねということで、測ってみました。
測定条件
適当に、こんな感じでやってみます。
- インデックスは1つだけ。
 - ドキュメントは10フィールド * 50文字。
 
テストコード
Pythonと公式クライアントの組み合わせで、テストコードを書きます。
Bulkサイズを変えながら、10,000件のドキュメントを登録します。
import time
from elasticsearch.client import Elasticsearch
from elasticsearch.exceptions import TransportError
from elasticsearch.helpers import bulk
def generate_test_doc():
    """
    すごい適当にデータを作る。
    """
    return {
        "field1": "aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd",
        "field2": "aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd",
        "field3": "aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd",
        "field4": "aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd",
        "field5": "aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd",
        "field6": "aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd",
        "field7": "aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd",
        "field8": "aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd",
        "field9": "aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd",
        "field10": "aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd",
    }
class PerformanceTimer(object):
    def __init__(self, message):
        self._message = message
    def __enter__(self):
        self._start = time.clock()
    def __exit__(self, exc_type, exc_val, exc_tb):
        print("| {0:30} | {1:>6.3f} |".format(
                self._message,
                (time.clock() - self._start)))
if __name__ == "__main__":
    INDEX_NAME = "test_index"
    TYPE_NAME = "test_type"
    NUM_DOCS = 10000
    es = Elasticsearch([{"host": "192.168.99.100", "port": 9200}])
    try:
        es.indices.delete(INDEX_NAME)
        es.indices.flush()
    except TransportError:
        pass
    # Index API (create)
    with PerformanceTimer("Index API (create)"):
        for i in range(0, NUM_DOCS):
            es.create(INDEX_NAME, TYPE_NAME, generate_test_doc())
        es.indices.flush()
    assert es.count(INDEX_NAME, TYPE_NAME)["count"] == NUM_DOCS
    es.indices.delete(INDEX_NAME)
    es.indices.flush()
    # Bulk API (create)
    for bulk_size in [1, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]:
        with PerformanceTimer("Bulk API {0} (create)".format(bulk_size)):
            for i in range(0, NUM_DOCS - (NUM_DOCS % bulk_size), bulk_size):
                actions = []
                for j in range(0, bulk_size):
                    actions.append({
                        "_op_type": "create",
                        "_index": INDEX_NAME,
                        "_type": TYPE_NAME,
                        '_source': generate_test_doc()
                    })
                bulk(es, actions)
            # あまり。
            if NUM_DOCS % bulk_size != 0:
                actions = []
                for j in range(0, NUM_DOCS % bulk_size):
                    actions.append({
                        "_op_type": "create",
                        "_index": INDEX_NAME,
                        "_type": TYPE_NAME,
                        '_source': generate_test_doc()
                    })
                bulk(es, actions)
            es.indices.flush()
        assert es.count(INDEX_NAME, TYPE_NAME)["count"] == NUM_DOCS
        es.indices.delete(INDEX_NAME)
        es.indices.flush()
測定結果
結果はこんな感じ。
| op | time | 
|---|---|
| Index API (create) | 23.365 | 
| Bulk API 1 (create) | 20.791 | 
| Bulk API 10 (create) | 6.414 | 
| Bulk API 20 (create) | 4.148 | 
| Bulk API 30 (create) | 3.403 | 
| Bulk API 40 (create) | 3.017 | 
| Bulk API 50 (create) | 2.638 | 
| Bulk API 60 (create) | 2.474 | 
| Bulk API 70 (create) | 2.424 | 
| Bulk API 80 (create) | 2.258 | 
| Bulk API 90 (create) | 2.244 | 
| Bulk API 100 (create) | 2.071 | 
| Bulk API 110 (create) | 2.073 | 
| Bulk API 120 (create) | 1.998 | 
結論
Elasticsearchにデータを入れるときはBulk APIを使いましょう。
一度に入れるドキュメント数は、100くらいで十分。
