LoginSignup
103
73

More than 3 years have passed since last update.

Python Elasticsearch 基本的な使い方まとめ

Last updated at Posted at 2020-07-29

はじめに

PythonでElasticsearchを使う機会があったため情報を収集していましたが、サイトで使われているElasticsearchのバージョンが古かったり、そもそも、情報が少なかったりしたので、今回、メモとして簡単な例と共に基本的な使い方をまとめました。

この記事を読むことで、低レベルクライアント(elasticsearch-py)を使ったインデックスおよびドキュメントCRUD操作ができるようになります。

環境

前提として、Elasticsearchはすでに立ち上がっていることとします。

また、実行環境は以下の通りです。特殊なライブラリを使っていないので、どの環境でも基本的には問題ないと思います。

  • 利用環境
Ubuntuバージョン
Ubuntu 20.04 LTS
  • Elasticsearch環境
Elasticserchバージョン
7.7.0
  • Python環境
Pythonバージョン
$ python -V
Python 3.8.4

準備

  • pipでElasticsearchクライアントのインストール
$ python -m pip install elasticsearch

基本操作

ここでは、基本的なPythonからElasticsearchを操作する方法を紹介します。

接続

インストールしたelasticsearchパッケージを利用してElasticsearchが起動しているホスト(localhost)へ接続する方法は以下の通りです。

基本的な接続方法

特にElasticsearch側で認証を設定していなければ、この方法で接続できます。

サンプル
from elasticsearch import Elasticsearch

# Elasticsearchクライアント作成
es = Elasticsearch("http://localhost:9200")

ポートや、httpまたはhttpsを指定することもできます。

サンプル
# Elasticsearchインスタンスを作成
es = Elasticsearch(
    ["localhost", "otherhost"],
    scheme="http",
    port=9200
)

HTTP認証を利用した接続

ElasticsearchにIDやパスワードを設定している場合は、この方法で接続できます。

サンプル
es = Elasticsearch(
    "http://localhost:9200",
    http_auth=("user_id", "password")
)

user_idpasswordは設定されたID、パスワードを表しています。

接続の解除

上記で確立した内部接続をclose()で閉じることができます。

サンプル
# Elasticsearchインスタンスを作成
es = Elasticsearch("http://localhost:9200")

# 内部接続を閉じる
es.close()

余談ですが、close()しないと、インスタンスがガーベージコレクションされた際に、例外が発生します。これを防ぐためにも、明示的に書いたほうが良いでしょう。

This warning is created by aiohttp when an open HTTP connection is garbage collected. You’ll typically run into this when closing your application. To resolve the issue ensure that close() is called before the AsyncElasticsearch instance is garbage collected.
https://elasticsearch-py.readthedocs.io/en/master/async.html?highlight=close#receiving-unclosed-client-session-connector-warning

インデックスの基本操作

インデックスを扱うための基本的な操作について紹介します。

インデックスの操作には、indicesという属性を使います。

インデックスの作成

studentsというインデックスを作成します。
タイプやドキュメントが入っていない空のインデックスです。

サンプル
es.indices.create(index='students')

マッピングを使った方法

データタイプやインデックスの構造を指定して作成できます。

サンプル
mapping = {
    "mappings": {
        "properties": {
            "name": {"type": "text"},
            "age": {"type": "long"},
            "email": {"type": "text"}
        }
    }
}

es.indices.create(index="students", body=mapping)

インデックス情報の収集

ここでは、インデックスの情報を取得する方法を紹介します。

インデックス一覧の取得

接続しているElasticsearchでは、どのようなインデックスがあるのか確認したい場合は、cat属性のindices(index="*", h="index")を利用します。

indicesはインデックス情報を返すメソッドです。
今回は、全インデックスを一覧で取得したいので、引数のindexにはワイルドカードを指定します。

また、引数hには、列名を指定することで列情報が改行区切りで返ってきます。

サンプル
# インデックス一覧の取得
indices = es.cat.indices(index='*', h='index').splitlines()
# インデックスの表示
for index in indices:
    print(index)

実行結果は、以下の通りです。

実行結果
.apm-custom-link
.kibana_task_manager_1
.apm-agent-configuration
students
.kibana_1

作成したstudentsだけではなく、デフォルトで入っているインデックスも表示されました。

インデックスのマッピングの確認

特定のインデックスのマッピングを確認する場合は、get_mapping(index="インデックス名")を利用します。

サンプル
print(es.indices.get_mapping(index="students"))

実行結果は以下の通りです。

実行結果
{'students': {'mappings': {'properties': {'age': {'type': 'long'}, 'email': {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}, 'name': {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}}}}}

また、引数にindexを指定しない場合、全インデックスのマッピングを取得することができます。

サンプル
print(es.indices.get_mapping())

実行結果は割愛します。

インデックスの更新

マッピングを変更してインデックスの構造を更新する場合は、put_mapping(index="インデックス名", body="変更分のマッピング")を利用します。

例えば、studentsに新たに学籍番号を追加する場合は以下のようになります。

サンプル
mapping = {
    "properties": {
        "student_number": {"type": "long"}
    }
}

es.indices.put_mapping(index="students", body=mapping)

指定するマッピングは全て渡す必要はなくて、差分だけで良いです。

また、インデックス作成の際にはmappings配下にネストしましたが、更新の場合は、そうではないことに注意しましょう。

現在のマッピングは、以下のようになっています。

現在のマッピング
{
  "students": {
    "mappings": {
      "properties": {
        "age": {
          "type": "long"
        },
        "email": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "name": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "student_number": {
          "type": "long"
        }
      }
    }
  }
}

見ずらいですが、最後に新たに追加したstudent_numberが表示されています

インデックスの削除

特定のインデックスを削除したい場合は、delete(index="インデックス名")を利用します。

今回は、今まで操作してきたstudentsを消してみます。

サンプル
es.indices.delete(index="students")

インデックスの存在確認

エラーハンドリングを追加するにあたって、インデックスが存在するか確認したい。

そのような場合は、exists(index="インデックス名")を利用します。

サンプル
print(es.indices.exists(index="students"))

実行結果は、以下のようになります

実行結果
False

先程、インデックスを消したのでFalseが返ってきました。
存在する場合は、Trueが返ってきます。

ドキュメントの基本操作

次は、ドキュメントを扱うための基本的な操作について紹介します。

ドキュメントの作成

ドキュメントを新しく登録するには、create(index="インデックス名", id="ドキュメントID", body="新規ドキュメント")を利用します。

※ インデックスが無い場合は、登録するドキュメントから自動的に型を判断して作成されます。

サンプル
# 登録したいドキュメント
student = {
    "name": "Taro",
    "age": 36,
    "email": "taro@example.com"
}

# ドキュメントの登録
es.create(index='students', id=1, body=student)

登録に成功すると、以下のような結果が出力されます。

実行結果
{'took': 1, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 0, 'relation': 'eq'}, 'max_score': None, 'hits': []}}

バルクインサート

createを複数回呼び出すことで、複数ドキュメントを登録できますが、bulk(インスタンス, データ)を使うことで、一度に登録できます。

引数で渡すデータの構造は、以下の通りで、少々複雑です。

{
  "_op_type": "createやdelete、updateといったアクションを指定"
  "_index": "インデックス名"
  "_id": "ドキュメントのIDを指定(createの場合はなくても良い)"
  "_source": "登録したいドキュメント"
}

上記のデータを配列に格納して、bulkに渡すことで複数ドキュメントを操作することができます。

今回は、配列ではなくyieldを使った方法をサンプルとして紹介します。

サンプル
from elasticsearch import Elasticsearch, helpers # bulkを使うために追加


# Elasticsearchインスタンスを作成
es = Elasticsearch("http://localhost:9200")

def gendata():
    # 登録したいドキュメント
    students = [
        {
            "name": "Jiro",
            "age": 25,
            "email": "jiro@example.com"
        },
        {
            "name": "Saburo",
            "age": 20,
            "email": "saburo@example.com"
        }
    ]

    # bulkで扱えるデータ構造に変換します
    for student in students:
        yield {
            "_op_type": "create",
            "_index": "students",
            "_source": student
        }

# 複数ドキュメント登録
helpers.bulk(es, gendata())

100MBを超える大量のドキュメントをバルクインサートを実行する場合、エラーが発生します。このようなときは、記事下部に記載した非同期で大量のドキュメントをバルクインサートを参照ください。

ドキュメントの検索

ドキュメントを検索する方法を2つ紹介します。

クエリを使って検索

登録したドキュメントを検索するには、search(index="インデックス名", body="検索クエリ", size=検索数)を利用します。

bodysizeは指定しない場合は、全件表示されます。

サンプル
# ageの値が20より大きいドキュメントを検索するためのクエリ
query = {
    "query": {
        "range": {
            "age": {
                "gt": 20
            }
        }
    }
}

# ドキュメントを検索
result = es.search(index="students", body=query, size=3)
# 検索結果からドキュメントの内容のみ表示
for document in result["hits"]["hits"]:
    print(document["_source"])

実行結果は以下のようになります。

実行結果
{'name': 'Taro', 'age': 36, 'email': 'taro@example.com'}
{'name': 'Jiro', 'age': 25, 'email': 'jiro@example.com'}

Saburoの歳は20歳なので、検索から外れ表示されていません。

IDを使って検索

ドキュメントIDを指定して直接検索する場合は、get_source(index="インデックス名", id="ドキュメントID")を利用します。

サンプルでは、idが1のドキュメントを検索しています。

サンプル
print(es.get_source(index="students", id=1))

実行結果は以下の通りです。

実行結果
{'name': 'Taro', 'age': 36, 'email': 'taro@example.com'}

idが1のTaroが表示されました。

ドキュメント数の取得

インデックスの中にドキュメント数がいくつあるのか確認したい場合は、count(index="インデックス名")を利用します。

サンプル
print(es.count(index="students"))

実行結果は以下の通りです。

実行結果
{'count': 3, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}}

countの値が3となっていいるので、インデックスstudentsには現在3つのドキュメントが存在していることがわかります。

ドキュメントの更新

ドキュメント更新にはupdate(index="インデックス名", id="ドキュメントID", body="変更内容")を利用します。

サンプル
# doc配下に変更したいパラメータを記述
student = {
    "doc": {
        "age": 40
    }
}

# ドキュメントIDを指定して更新
es.update(index="students", id=1, body=student)

# 更新されているか確認
print(es.get_source(index="students", id=1))

get_sourceを使って更新されているか確認しました。

実行結果
{'name': 'Taro', 'age': 40, 'email': 'taro@example.com'}

ageが40に変更されています。

ドキュメントの削除

特定のドキュメントを削除する場合は、delete(index="インデックス名", id="ドキュメントID")を利用します。

サンプルでは、idが1のドキュメントを削除します。

サンプル
es.delete(index="students", id=1)

このドキュメントが本当に削除されているかの確認は次で行います。

ドキュメントの存在確認

ドキュメントの存在を確認するためには、exists(index="インデックス名", id="ドキュメントID")を利用します。

サンプル
print(es.exists(index="students", id=1))
実行結果
False

先程、deleteでidが1のドキュメントを消しているので、Falseが返却され存在しないと表示されました。
存在する場合は、Trueなので、存在と表示されるはずです。

※ 余談ですが、インデックスには存在しているものの、_sourceがあるのか調べるにはexists_sourceを利用します。

アドバンス

ここでは、通常あまり使わないようなメソッドや小ネタの紹介をします。

非同期で検索

ドキュメント数が何万とある場合、検索に時間がかかってしまいます。
このような場合、v7.8.0からサポートされたAsyncElasticsearchを使うことで非同期でリソースを効率的に検索することが出来ます。

準備

機能を使うためにはasyncioをインストールする必要がありあす。

インストール
$ python -m pip install elasticsearch [ async ] > = 7 .8.0

非同期の検索サンプル

非同期で検索するためのサンプルを用意しました。
asyncawaitを付け加えただけで、基本的にはsearchとやることは変わりません。

サンプル
import asyncio
from elasticsearch import AsyncElasticsearch


# 非同期対応したElasticsearchインスタンスを作成
es = AsyncElasticsearch("http://localhost:9200")


async def main():
    # 非同期検索
    result = await es.search(
        index="students",
        body={"query": {"match_all": {}}},
        size=20
    )
    # 検索結果の表示
    for student in result['hits']['hits']:
        print(student['_source'])

    # セッションをクローズ
    await es.close()

# イベントループを取得
loop = asyncio.get_event_loop()
# 並列に実行して終るまで待つ
loop.run_until_complete(main())

動きとしては、

  • asyncio.get_event_loop()でイベントループを取得
  • 並列で動かしたい関数main()asyncをつけて定義
  • 時間がかかる処理searchawaitをつけて宣言
  • イベントループのrun_until_complete で並列的に実行しつつ終るまで待つ

このような感じです。

非同期でバルクインサート

次は、非同期でバルクインサートを実行する方法を紹介します。
コードはbulkとほとんど変わらず、asyncawaitおよび非同期に対応したasync_bulkを利用するだけです。

サンプル
import asyncio
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_bulk

# 非同期対応したElasticsearchインスタンスを作成
es = AsyncElasticsearch("http://localhost:9200")


async def gendata():
    # 登録したいドキュメント
    students = [
        {
            "name": "Siro",
            "age": 19,
            "email": "siro@example.com"
        },
        {
            "name": "Goro",
            "age": 13,
            "email": "goro@example.com"
        }
    ]

    # bulkで扱えるデータ構造に変換します
    for student in students:
        yield {
            "_op_type": "create",
            "_index": "students",
            "_source": student
        }


async def main():
    # 非同期でバルクインサートを実行
    await async_bulk(es, gendata())
    # セッションをクローズ
    await es.close()

# イベントループを取得
loop = asyncio.get_event_loop()
# 並列に実行して終るまで待つ
loop.run_until_complete(main())

実行後、登録されたドキュメントを確認すると

ドキュメント一覧
{'name': 'Jiro', 'age': 25, 'email': 'jiro@example.com'}
{'name': 'Saburo', 'age': 20, 'email': 'saburo@example.com'}
{'name': 'Siro', 'age': 19, 'email': 'siro@example.com'}
{'name': 'Goro', 'age': 13, 'email': 'goro@example.com'}

SaburoGoroが追加されていることがわかります。

非同期で大量のドキュメントをバルクインサート

100MBを超えるドキュメントを一度にバルクインサートすることは出来ません。
これは、Elasticsearch側で上限が設定されているためです。

http.max_content_length
The max content of an HTTP request. Defaults to 100MB.
https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-http.html

このような大量のデータを扱う場合、async_streaming_bulk(client, actions, その他パラメータ)を利用して、非同期でドキュメントを複数(チャンク)に分割して登録を行います。

パラメータ

指定できるパラメータについて紹介します。

  • chunk_size
    • 型:整数
    • Elasticsearchへ一度に送信するドキュメント数(デフォルト:500)
  • max_chunk_bytes
    • 型:整数
    • リクエストの最大バイトサイズ(デフォルト:100MB)
  • raise_on_error
    • 型:Bool
    • exceptでBulkIndexError発生時に、エラーリストを取得できるようになる(デフォルト: True)
  • raise_on_exception
    • 型:Bool
    • Falseにすると、bulk失敗時に例外を発生させず、最後に失敗したアイテムの報告のみ行う(デフォルト:True)
  • max_retries
    • 型:整数
    • ステータスエラー429発生時に再試行を行う回数(デフォルト:0)
  • initial_backoff
    • 型:整数
    • 再試行まで待機する秒数、2回以降は待機秒数 x 2となる(デフォルト:2)
  • max_backoff
    • 型:整数
    • 再試行が待機する最大秒数(デフォルト:600)
  • yield_ok
    • 型:Bool
    • Falseにすると、出力結果からbulkに成功したドキュメントが表示されなくなる(デフォルト:True)

サンプル

以下に簡単なサンプルを紹介します。

サンプル
import asyncio
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_streaming_bulk, BulkIndexError

# 非同期対応したElasticsearchインスタンスを作成
es = AsyncElasticsearch("http://localhost:9200")


async def gendata():
    # 登録したいドキュメント
    students = [
        {
            "name": "Siro",
            "age": 19,
            "email": "siro@example.com"
        },
        {
            "name": "Goro",
            "age": 13,
            "email": "goro@example.com"
        }
    ]

    # bulkで扱えるデータ構造に変換します
    for student in students:
        yield {
            "_op_type": "create",
            "_index": "students",
            "_source": student
        }


async def main():
    try:
        # ドキュメントを複数(チャンク)に分けてバルクインサート
        async for ok, result in async_streaming_bulk(client=es,
                                                     actions=gendata(),
                                                     chunk_size=50,  # 一度に扱うドキュメント数
                                                     max_chunk_bytes=52428800  # 一度に扱うバイト数
                                                     ):
            # 各チャンクごとの実行結果を取得
            action, result = result.popitem()
            # バルクインサートに失敗した場合
            if not ok:
                print(f"failed to {result} document {action}")
    # 例外処理
    except BulkIndexError as bulk_error:
        # エラーはリスト形式
        print(bulk_error.errors)

    # セッションのクローズ
    await es.close()

# イベントループを取得
loop = asyncio.get_event_loop()
# 並列に実行して終るまで待つ
loop.run_until_complete(main())

今回指定した、チャンクに関するパラメータは、

  • チャンク数:50
  • チャンクの最大バイト数:50MB

こんな感じです。

これらは、登録したいデータに応じてチューニングすると良いと思います。

特定のステータスによるエラーを無視

引数ignoreに無視したい特定のステータスコードを指定することでエラーを無視することができます。

例えば、存在しないインデックスを削除しようとした場合、404エラーが発生します。

404エラー
elasticsearch.exceptions.NotFoundError: NotFoundError(404, 'index_not_found_exception', 'no such index [test-index]', test-index, index_or_alias)

サンプルではこれを無視してみます。

サンプル
# 404と400で発生するエラーを無視
es.indices.delete(index='test-index', ignore=[400, 404])

タイムアウト

es側ではデフォルトでタイムアウトが設定されていますが、自分で設定することも出来ます。

方法は簡単で、引数にrequest_timeout=秒数(浮動小数点)を渡すだけです。
あえて短い秒数を指定して、タイムアウトさせるサンプルを用意しました。

サンプル
print(es.cluster.health(wait_for_status='yellow', request_timeout=0.001))

実行すると、指定した0.001秒以内に処理が終了しなかったため、以下のようなタイムアウトのエラーが表示されました。

タイムアウトエラー
elasticsearch.exceptions.ConnectionError: ConnectionError((<urllib3.connection.HTTPConnection object at 0x7f11297c5520>, 'Connection to localhost timed out. (connect timeout=0.001)')) caused by: ConnectTimeoutError((<urllib3.connection.HTTPConnection object at 0x7f11297c5520>, 'Connection to localhost timed out. (connect timeout=0.001)'))

レスポンスの整形

Elasticsearchからのレスポンスをそのまま表示すると

レスポンス
{'students': {'mappings': {'properties': {'age': {'type': 'long'}, 'email': {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}, 'name': {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}}}}}

このようにとても見づらくなってしまいます。

そこで、jsonパッケージを使って整形することで見やすく表示できます。

サンプル
import json
from elasticsearch import Elasticsearch

# Elasticsearchインスタンスを作成
es = Elasticsearch("http://locahost:9200")

# マッピング情報の取得
response = es.indices.get_mapping(index="students")
# レスポンスの整形
print(json.dumps(response, indent=2))

動きとしては、
json.dumpsに整形したいデータと、インデントのスペース数を指定するだけです。

今回はレスポンスのネストが深くなることが予想されたためインデントは2を指定しました。

実行結果
{
  "students": {
    "mappings": {
      "properties": {
        "age": {
          "type": "long"
        },
        "email": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "name": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        }
      }
    }
  }
}

見やすく整形されました。

おわりに

Python Elasticserachのクライアントの基本的な使い方を紹介しました。
直接APIを叩くよりも、用意された便利なメソッドを使うことで、直感的に操作できるのでとても便利に感じました。

使い方が間違っていたり、その他便利な方法があればコメントお願いします!

参考サイト

103
73
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
103
73