Python
Elasticsearch

PythonでElasticSearchのSearch APIを使って検索するときのめも

ElasticSearchのAPIで10000件以上のデータを検索するとなると 一回じゃデータを取れないから分割して検索するのか〜面倒だなということでpythonで書きました。

これですねー

#!/usr/bin/env python3

import urllib.request
import json
import sys
from optparse import OptionParser

def print_message(msg):
    print(msg, file=sys.stderr)

def do_request(url, query):

    json_data = json.dumps(query).encode('utf-8')

    req = urllib.request.Request(url, data=json_data, method='GET')
    with urllib.request.urlopen(req) as res:
        body = res.read()
        return json.loads(body)

def dump_to_file(data, prefix, index):
    name = "%s_%02d.json" % (prefix, index)

    with open(name, mode="w") as f:
        json.dump(data, f, indent=2)

def build_query(options, scroll_id):
    if scroll_id is not None:
        return {
            "scroll_id": scroll_id,
            "scroll": "10m",
        }

    with open(options.jsonfile, "r") as f:
        return json.load(f)

def build_url(options, scroll_id):
    if scroll_id is not None:
        return "{}/_search/scroll".format(options.url)

    params = {
        "size": 10000,
        "scroll": "10m",
    }
    return "{}/{}/_search?{}".format(options.url, options.index, urllib.parse.urlencode(params))

def parse_arguments(args):
    parser = OptionParser()
    parser.add_option("-j", "--json", dest="jsonfile",
                      help="query file", metavar="JSON")
    parser.add_option("-u", "--url", dest="url",
                      help="e.g. https://es.example.com", metavar="URL")
    parser.add_option("-i", "--index", dest="index",
                      help="index name", metavar="INDEX")
    parser.add_option("-p", "--prefix", dest="prefix",
                      help="output file name prefix", metavar="PREFIX",
                      default="log")
    parser.add_option("-m", "--maxpages", dest="maxpages",
                      help="max pages", metavar="MAXPAGES",
                      default=20)

    (options, args) = parser.parse_args()

    if options.jsonfile is None or options.url is None or options.index is None:
        print("invalid options: run %s -h to see more information" % sys.argv[0])
        sys.exit(0)

    return options

if __name__ == "__main__":
    scroll_id = None

    options = parse_arguments(sys.argv)

    for i in range(1, options.maxpages):
        print_message("[*]fetch log:%d" % i)
        url = build_url(options, scroll_id)
        query = build_query(options, scroll_id)

        result = do_request(url, query)
        if len(result["hits"]["hits"]) == 0:
            print_message("[*]no more data")
            break

        dump_to_file(result, options.prefix, i)

        scroll_id = result["_scroll_id"]
        result = None

    print_message("[*]Done")

1回目のURLは /index名/_search/ なんだけど2回目からは /_search/scroll にしないといけないんですね。あと、bodyで渡すデータも1回目は通常のjson形式の検索条件だけど2回目からはscroll_idとscrollを渡すだけになる。

scroll_idは1回目の検索結果に_scroll_idというキーにデータがあるのでこれを使う。
上記のスクリプトで試した感じ値はforループが何回か回っても変化なかったのでscrollで指定した期間内は同じ値なんでしょうね(未確認)。