0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

はじめに

Elasticsearchにはbulk insertというデータを一括でindexに登録する仕組みがありますが,登録するためのjsonフォーマットが少々複雑で用意するのがちょっと面倒です.
そこで調べた方法を3つほど共有します.

  • jq
  • yield
  • list

元となるデータ形式や,CLIで使用するのかなどによって使い分けて頂ければと思います.

対象者

この記事は下記のような項目について知りたい人を対象にしています。

  • Elasticsearchのbulk-insertの手法について
  • pythonアプリケーションからのbulk-insertについて
  • pythonアプリケーションとElasticsearchの連携について

1. jq を使用した方法

datasetがjsonファイルであること,cliからファイルを生成したい という場合はこちら

cat dataset.json | jq -c '.[] | ({"index":{"_index": "{index_name}", "_id": {id_data}}}, .)' > bulk_dataset.json

{index_name} などは入力したい形式に合わせて変更してください.

以下の記事を参考にさせていただきました.

2. yield を使用した方法

アプリケーション内で という場合はこちら.
txt,csv,tsvファイル => 読み込んでiterate という流れなのでデータファイルに依存せず使えます.

# bulk_insertのためのフォーマット変換
def yield_bulk_insert_dataset(data: list):
    for idx, text in enumerate(data):
        yield {
            "_op_type": "create",
            "_index": "{作成したindex名称}",
            "_source": {"id": idx, "texts": text},
        }


# txtファイルからの読み込み
def load_txt(filepath: str) -> list:
    with open(filepath, "r", encoding="utf-8") as f:
        lines = f.read().splitlines()
    return lines


# load
data = load_txt("data/filenameXXX.txt")
# bulk_insert実行
helpers.bulk(es, yield_bulk_insert_dataset(data))

今回はtxtファイルを1行ずつ読み込んで,バルクインサートするというcodeになってます.
tsv, csvファイルからの読み込みの場合はDataFrameで読み込んでから整形というのがよさそうです.

def csv2dict(filepath: str) -> list:
    # csvからdataframeで読み込み
    df = pd.read_csv(filepath, header=None)
    # 使用するカラム以外をdrop
    usecols = ["col1", "col2"]
    df = df[usecols]

    # dict型に変換
    col1 = df.col1.values.tolist()
    col2 = df.col2.values.tolist()
    data =[{"col1": c1, "col2": c2 } for c1, c2 in zip(col1, col2)]

    return data

後で思いついたのですが,dict変換はdf.to_dict(orient='records') と書くのが良かったみたいです.
何はともあれ,テキストファイルの時と同様に

data = csv2dict("data/filenameXXX.csv")
helpers.bulk(es, yield_bulk_insert_dataset(data))

のように使える形式になった(はず)です.

以下の記事の バルクインサートについての項目を参考にさせていただきました.

3. list

[{
    "_op_type": "create",
    "_index": "{作成したindex名称}",
    "_source": {"id": idx1, "texts": text1},
},
{
    "_op_type": "create",
    "_index": "{作成したindex名称}",
    "_source": {"id": idx2, "texts": text2},
},
...
]

入力するdict形式のlistを作成する手法です.

  • 大量のデータをlistとして保持するのがあんまり筋が良くない
  • 多分これなら yield でいい

ということで小さめのデータなら使用することもあるかもという印象です.

その他

pythonアプリケーションとElasticsearchをdockerで構築

今回は上の記事を参考に若干versionを上げて実験してみました.

  • elasticsearch 7.4.2 => 7.16.2
elasticsearch/Dockerfile
# Version 7.16.2 を選択してイメージを取得する
FROM docker.elastic.co/elasticsearch/elasticsearch:7.16.2
# 日本語を扱う場合はplugin「analysis-kuromoji」をインストールする
RUN elasticsearch-plugin install analysis-kuromoji
  • python 3.8 => 3.9
python3/Dockerfile
FROM python:3.9
...
# clientである「elasticsearch」をインストール
RUN pip install  numpy \
                 matplotlib \
                 pandas \
                 elasticsearch == 8.1.0 \

elasticsearch client は古めのものをversion指定してあげると上手くいきやすいかもしれません

まとめ

  • jsonファイルからすぐにbulk_insertを試してみたい => jq
  • jsonファイル以外 && アプリケーションに組み込む => yield

ということになるかなと思います.

誤りなどあればご指摘,編集リクエストなどいただけると大変助かります.

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?