LoginSignup
0
0

PandasのData FrameとElasticsearchのindexを相互変換する

Last updated at Posted at 2023-11-17

はじめに

Elasticsearchの特定のインデックスにサンプルデータを投入したい。でも手元にあるのはキー項目は共通な別々のCSVファイル。えーこれ全部手でマージすんの? それかスクリプト書く? めんどくさ。。 そんな経験みなさんにもありますよね?

そんな時、(結局Pythonスクリプトは書きますが)Elandを使うとPandasのData FrameをそのままElasticsearchとやりとりできるので便利です。というかPandasが便利です。

依存ライブラリ

今回、Pythonのpandas, elasticsearch, elandの各ライブラリを使いますので、インストールされていない場合は以下のコマンドでインストールします。

$ pip install pandas elasticsearch eland

Elandの詳細についてはドキュメントを確認してください。

CSVデータをData Frameに読み込んで結合する

手元に次のようなCSVがあるとします。

dau.csv

date,daily_active_users
2023/08/01,1000
2023/08/02,1002
2023/08/03,980

newusers.csv

date,new_users
2023/08/01,23
2023/08/02,31
2023/08/03,7

この状態から日付ごとのdaily_active_usersとnew_usersをフィールドとしてもつElasticsearchのインデックスを作ることを考えます。

まずはこれらのCSVをData Frameとして読み込みます。

import pandas as pd

dau_df = pd.read_csv("dau.csv", encoding='utf-8', index_col=0, header=0, parse_dates=True)
newusers_df = pd.read_csv("newusers.csv", encoding='utf-8', index_col=0, header=0, parse_dates=True)

Pandasのread_csvは色々なオプションがあり、それだけですごい長い記事を書かれている方もいるようですのでここで詳しく説明はできませんが、使っているものだけかいつまんで説明します。

  • header: 数値をいじることで、例えば最初の行に余計な説明文とかが入っている場合に読み飛ばしたりできるので便利です。
  • index_col: どのカラムをData Frameのインデックス(ID)にするかを指定します。ここでは日付ごとにまとめるので最初のカラムを指定しています。
  • parse_dates: 日付を自動的にパースしてくれるオプションです。便利。

さて、読み込んだData Frameはconcat関数を使えば結合できます。今回はカラムを横に追加するので、axis=1を指定します。

all = pd.concat([dau_df, newusers_df], axis=1)
daily_active_users new_users
date
2023-08-01 1000 23
2023-08-02 1002 31
2023-08-03 980 7

おお、結合できました。Pandas便利ですね。

Elasticsearchに投入する

それではこのData FrameをElasticsearchのIndexに投入してみましょう。PandasのData FrameをElasticsearchに投入するにはElasticが公開しているライブラリのElandを使います。

ElasticsearchにData Frameを投入する関数はed.pandas_to_elandです。

import eland as ed
from elasticsearch import Elasticsearch

es = Elasticsearch(
    'http://localhost:9200',
    basic_auth=('username', 'password')
)
ed.pandas_to_eland(
  all,
  es,
  'daily_user_index',
  es_if_exists="replace",
  es_refresh=True,
  use_pandas_index_for_es_ids=True
)

Elasticsearch側で投入されたか調べてみます。KibanaのDev Toolsで以下のリクエストを実行します。

GET daily_user_index/_search

Response

{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 3,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "daily_user_index",
        "_id": "2023-08-01 00:00:00",
        "_score": 1,
        "_source": {
          "daily_active_users": 1000,
          "new_users": 23,
          "date": "2023-08-01T00:00:00"
        }
      },
      {
        "_index": "daily_user_index",
        "_id": "2023-08-02 00:00:00",
        "_score": 1,
        "_source": {
          "daily_active_users": 1002,
          "new_users": 31,
          "date": "2023-08-02T00:00:00"
        }
      },
      {
        "_index": "daily_user_index",
        "_id": "2023-08-03 00:00:00",
        "_score": 1,
        "_source": {
          "daily_active_users": 980,
          "new_users": 7,
          "date": "2023-08-03T00:00:00"
        }
      }
    ]
  }
}

投入できています。ただここで一つ注意なのが、Data Frameのindexになっていた日付情報が、ドキュメントIDとして格納されている点です。use_pandas_index_for_es_ids=Trueの設定(これがデフォルト)が効いているわけですが、これをFalseにすると、indexの項目(日付)が完全に失われてしまいます。

そこで通常のElasticsearchでのデータの持ち方に近くするのであれば、以下のように日付を通常のフィールドに変換して投入する方が良いかもしれません。

all['date'] = all.index
ed.pandas_to_eland(
  all,
  es,
  'daily_user_index',
  es_if_exists="replace",
  es_refresh=True,
  use_pandas_index_for_es_ids=False
)

この状態で検索すると、Elasticsearch上では以下のように登録されています。

{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 3,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "daily_user_index",
        "_id": "I2Cp3IsBCU3flpJPM4Go",
        "_score": 1,
        "_source": {
          "daily_active_users": 1000,
          "new_users": 23,
          "date": "2023-08-01T00:00:00"
        }
      },
      {
        "_index": "daily_user_index",
        "_id": "JGCp3IsBCU3flpJPM4Go",
        "_score": 1,
        "_source": {
          "daily_active_users": 1002,
          "new_users": 31,
          "date": "2023-08-02T00:00:00"
        }
      },
      {
        "_index": "daily_user_index",
        "_id": "JWCp3IsBCU3flpJPM4Go",
        "_score": 1,
        "_source": {
          "daily_active_users": 980,
          "new_users": 7,
          "date": "2023-08-03T00:00:00"
        }
      }
    ]
  }
}

Elasticsearch的にはこの方が自然ですね。

ElasticsearchのIndexをData Frameとしてロードする

ElandにはElasticsearchのインデックスをData Frameとして読み出す関数が用意されています。今登録したデータを再度読み出してみましょう。ed.DataFrame関数を使います。

ed_df = ed.DataFrame(es, 'daily_user_index')
ed_df
daily_active_users date new_users
I2Cp3IsBCU3flpJPM4Go 1000 2023-08-01 23
JGCp3IsBCU3flpJPM4Go 1002 2023-08-02 31
JWCp3IsBCU3flpJPM4Go 980 2023-08-03 7

3 rows × 3 columns

おお、ナチュラルにインデックスのデータをData Frameに変換できました。便利ですね。

この例ではインデックスの全件を読み込んでいますが、条件を絞り込んで読み込むなどの操作も可能なので一度ドキュメントを確認してください。

ただこのed_dfeland.dataframe.DataFrameクラスのオブジェクトになっています。場合によっては都合が悪いので、PandasのData Frameに変換してみましょう。eland_to_pandasを使います。

pd_df = ed.eland_to_pandas(ed_df)

こうするとこのpd_dfpandas.core.frame.DataFrameクラスのオブジェクトになります。

おわりに

Elandを使うとPandasのData FrameとElasticsearchのIndexを簡単に相互変換できることがわかりました。これで今回のように手元のCSVをPandasで読み込んで操作してからElasticsearchに流し込んだり、Elasticsearchに溜まっているデータをPandasに読み込んで分析するなど、さまざまな利用方法が考えられますね。

さらに進んだ使い方として、ElasticsearchのデータをPandasで読み込んだ後にScikit LearnのXGBoostなどで学習させて、出来上がった予測モデルをElasticsearchにアップロードして利用する、などの使い方もできます。機械学習が得意な方はぜひ試してみてください。

またElasticsearch上でデータフレーム分析を実装することも可能です。そちらについては別の記事を書きましたので興味がある方は参照してください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0