2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

ElasticsearchにS3のデータを取り込んで検索できるようにする! (パート1)

Posted at

はじめに

AWS s3のデータをElasticsearchにインデックスするためのConnector Client - Elastic S3 ConnectorがElastic Stack v8.7でベータリリースされました。

今後いくつかの記事に渡り、データの取り込みから検索までを紹介していきたいと思います。
まず最初の本記事ではElastic S3 Connectorを使ったS3バケットに含まれるオブジェクトデータの取り込みです。

アーキテクチャー

S3データの取り込み(今回の記事の範囲)
Elasticsearch <= Python Connector Client  => AWS S3

  • Elasticsearchは今回はElastic Cloudを利用していますが、ダウンロード版のElasticsearch v8.7でも動作します。
  • Connector ClientはPython 3.10で動くPythonプログラムです。今回は自分のMacで試していますが、Python 3.10の環境であればどこでも動くと思います。(最初、EC2 Amazon Linuxで試そうとしましがpython 3.10のインストールに苦労したので、今回はMacでやることにしました。

AWS側用意

  1. s3バケットを作成します。今回はConnector Clientが自分のマシンにあるので、パブリックアクセスは許可しておきます。
    image.png

  2. IAMポリシーを作成します。こちらのドキュメントに書いてある通り、ListAllMyBuckets、ListBucket、GetBucketLocation、GetObjectのパーミッションをポリシーに割り当てます。
    image.png

  3. ユーザーを作成し、上記のIAMポリシーを割り当てます。また、アクセスキーを作成し、Access Key IDとSecret access keyをメモします。
    image.png

  4. Connector Clientを実行するマシン上で、AWS CLIのインストールをして、aws configureコマンドで上記アクセスキーを設定してください。

Connector Clientの構築

ここからは以下のConnector Clientのサンプル手順(Postgreコネクタ)を参考に、s3コネクタ用にカスタマイズします。
https://www.elastic.co/guide/en/enterprise-search/8.7/postgresql-connector-client-tutorial.html

わかりやすくするため、上記ドキュメントのステップの題名とこちらのステップの題名を合わせておきました。

Create an Elasticsearch Index

Enteprise Search-> Indicesから
Build a connectorを選択し、Index名を設定します。
image.png

ここでユーザーのアプリケーションが今回構築するこのEnteprise Search APIにアクセスするためのAPI Keyを取得します。
image.png

後でコネクターに設定するAPI KeyとConnector idをメモします。
image.png

Setup the connector

  1. 以下のConnector Client のGitHubのリポジトリを、実行するマシンにダウンロードします。
    https://github.com/elastic/connectors-python/
    今回は8.7のブランチを使いました。git clone https://github.com/elastic/connectors-python -b 8.7と実行しました。

  2. コードのconnectors/kibana.pyのファイルを編集し、is_nativeの箇所をFalseに変更します。

  3. コードのconfig.yamlのファイルを編集し、host, api_key, connector_id, service_type, sourcesを編集します。
    XXXの箇所は自分の環境のものを使います。

elasticsearch:
  host: https://XXX
  # host: https://xxx.es.asia-northeast1.gcp.cloud.es.io # example
  api_key: 'XXX' # Your API key
  ssl: true
  bulk:
    queue_max_size: 1024
    queue_max_mem_size: 25
    display_every: 100
    chunk_size: 1000
    max_concurrency: 5
    chunk_max_mem_size: 5
    concurrent_downloads: 10
  request_timeout: 120
  max_wait_duration: 120
  initial_backoff_duration: 1
  backoff_multiplier: 2
  log_level: info

service:
  idling: 30
  heartbeat: 300
  max_errors: 20
  max_errors_span: 600
  max_concurrent_syncs: 1
  job_cleanup_interval: 300
  log_level: INFO

native_service_types:
  - mongodb
  - mysql
  - network_drive
  - s3
  - google_cloud_storage
  - azure_blob_storage
  - postgresql
  - oracle
  - dir
  # - mssql # temporary not working, migrating it to a different library


# Connector client settings
connector_id: 'XXX' # Your connector ID
service_type: 's3'  # The service type for your connector

sources:
  # mongodb: connectors.sources.mongo:MongoDataSource
  s3: connectors.sources.s3:S3DataSource
  # dir: connectors.sources.directory:DirectoryDataSource
  # mysql: connectors.sources.mysql:MySqlDataSource
  # network_drive: connectors.sources.network_drive:NASDataSource
  # google_cloud_storage: connectors.sources.google_cloud_storage:GoogleCloudStorageDataSource
  # azure_blob_storage: connectors.sources.azure_blob_storage:AzureBlobStorageDataSource
  # postgresql: connectors.sources.postgresql:PostgreSQLDataSource
  # oracle: connectors.sources.oracle:OracleDataSource
  # mssql: connectors.sources.mssql:MSSQLDataSource # temporary not working, migrating it to a different library

Run the connector service

コードのルートフォルダ配下で以下のコマンドを実行します。(必要に応じてmakeのインストールを先にしてください)

make run

以下のようなログが出力されます。Enterprise Searchtの接続が確立され、ジョブの指令をリスニングしているのがわかります。

Finished processing dependencies for elasticsearch-connectors==8.7.0.0
bin/elastic-ingest
[FMWK][14:25:34][INFO] Loading config from /XXX/connectors-python/connectors/../config.yml
[FMWK][14:25:34][INFO] Preflight checks...
[FMWK][14:25:34][INFO] Waiting for NodeConfig(scheme='https', host='XXX.es.asia-northeast1.gcp.cloud.es.io', port=443, path_prefix='', headers={}, connections_per_node=10, request_timeout=10.0, http_compress=False, verify_certs=True, ca_certs=None, client_cert=None, client_key=None, ssl_assert_hostname=None, ssl_assert_fingerprint=None, ssl_version=None, ssl_context=None, ssl_show_warn=True, _extras={}) (so far: 0 secs)
[FMWK][14:25:35][INFO] Service started, listening to events from https://XXX.es.asia-northeast1.gcp.cloud.es.io
[FMWK][14:25:35][INFO] Successfully started Job cleanup task...
[FMWK][14:25:35][INFO] Start cleaning up orphaned jobs...
[FMWK][14:25:35][INFO] No orphaned jobs found. Skipping...
[FMWK][14:25:35][INFO] Start cleaning up idle jobs...
[FMWK][14:25:35][INFO] No idle jobs found. Skipping...
[FMWK][14:30:35][INFO] Start cleaning up orphaned jobs...
[FMWK][14:30:35][INFO] No orphaned jobs found. Skipping...

Sync your PostgreSQL S3 data source

Enter your PostgreSQL S3 data source details

Enterprise Searchの画面に戻り、検索対象のAWS Bucketの名前を設定し、Save Configurationをします。
image.png

最後に定期的なSyncのスケジュールの設定をしますが、先に右上の即時Syncを実行してうまくいくか確認しましょう。
image.png

SyncするとConnectorに設定が伝わり、そこからConnectorがS3バケットにアクセスしに行き、s3のファイルを取得しに行きます。

[FMWK][14:37:07][INFO] Validating Amazon S3 Configuration...
[FMWK][14:37:08][INFO] Successfully connected to AWS Server.
[FMWK][14:37:08][INFO] Starting doc lookups
[FMWK][14:37:09][INFO] Job reporting task is stopped.
[FMWK][14:37:09][INFO] [XXX] Sync done: 2 indexed, 0 deleted. (1 seconds)

最後、好きなスケジュールでSyncされるように設定して終わりです。
image.png

s3とのSyncができると、以下のようにドキュメントのテキストが取り込まれたことが確認できます。
image.png

Connector clientをコンテナで実行する手順

前のステップでは、マシン上でPythonのConnector Clientを動かしましたが、コンテナとして動かすこともできるのでやってみました。

以下の手順を参考にしました。今回はこちらを少しやりやすいようにカスタマイズしています。
https://github.com/elastic/connectors-python/blob/8.7/docs/DOCKER.md

  1. Connector Clientが動くコンテナにAWS CLIをインストールする必要があるので、前の手順でダウンロードしたconnectors-python/Dockerfileを以下のように追記します。#### AWS CLI INSTALL #### 以降が追加分です。(AWS CLIのインストールはこちらの手順を参考にさせていただきました:https://zenn.dev/tokku5552/articles/aws-container)
Dockerfile
FROM python:3.10

RUN git clone https://github.com/elastic/connectors-python /app/
WORKDIR /app
RUN make clean install

#### AWS CLI INSTALL ####
# install pre-req of aws cli
RUN apt-get update && apt-get install -y less vim curl unzip sudo

# install aws cli v2
RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
RUN unzip awscliv2.zip
RUN sudo ./aws/install

 
2. Dockerfileと同じ場所に以下のdocker-compose ファイルを新規作成します。前の手順でconfig.ymlは設定しているので、そのままそれをマウントして使っています。

docker-compose.yml
version: "3"
services:
  s3-connector:
    build: .
    container_name: s3-connector
    volumes:
      - ./config.yml:/app/config.yml
    env_file:
      - .env
    environment:
      AWS_DEFAULT_REGION: ap-northeast-1
      AWS_DEFAULT_OUTPUT: json
    command: /app/bin/elastic-ingest -c /app/config.yml

 
3. 同じ場所に.envファイルを新規作成し、s3アクセスキーの設定を行います。

.env
AWS_ACCESS_KEY_ID='YOUR_AWS_ACCESS_KEY_ID'
AWS_SECRET_ACCESS_KEY='YOUR_AWS_SECRET_ACCESS_KEY'

 
4. docker-compose upで起動し、前の手順と同じような出力が出ればOKです。

[+] Running 1/0
 ⠿ Container s3-connector  Created                                                                                                                  0.0s
Attaching to s3-connector
s3-connector  | [FMWK][06:44:26][INFO] Running connector service version 8.8.0.0
s3-connector  | [FMWK][06:44:26][INFO] Loading config from /app/config.yml
s3-connector  | [FMWK][06:44:26][INFO] Preflight checks...
s3-connector  | [FMWK][06:44:26][INFO] Waiting for NodeConfig(scheme='https', host='XXX.es.asia-northeast1.gcp.cloud.es.io', port=443, path_prefix='', headers={}, connections_per_node=10, request_timeout=10.0, http_compress=False, verify_certs=True, ca_certs=None, client_cert=None, client_key=None, ssl_assert_hostname=None, ssl_assert_fingerprint=None, ssl_version=None, ssl_context=None, ssl_show_warn=True, _extras={}) (so far: 0 secs)
s3-connector  | [FMWK][06:44:26][INFO] Service started, listening to events from https://XXX.es.asia-northeast1.gcp.cloud.es.io
s3-connector  | [FMWK][06:44:26][INFO] Service started, listening to events from https://XXX.es.asia-northeast1.gcp.cloud.es.io
s3-connector  | [FMWK][06:44:26][INFO] Successfully started Job cleanup task...
s3-connector  | [FMWK][06:44:26][INFO] Start cleaning up orphaned jobs...
s3-connector  | [FMWK][06:44:26][INFO] No orphaned jobs found. Skipping...
s3-connector  | [FMWK][06:44:26][INFO] Start cleaning up idle jobs...
s3-connector  | [FMWK][06:44:26][INFO] No idle jobs found. Skipping...

どのようにデータがインデックスされているか

  • CSVなどのテキストファイルはほぼそのままbodyフィールドにデータが取り込まれています
    image.png
  • ドキュメントファイル系は中の文字列がインデックスされていました (
    image.png
  • 画像ファイルはファイル名くらいです。
    image.png

注意は、Syncできるファイルは10MBまでとされているので、例えば20MBのpptをS3にアップすると...

[FMWK][08:43:29][INFO] Successfully connected to AWS Server.
[FMWK][08:43:29][INFO] Starting doc lookups
[FMWK][08:43:29][WARNING] File size for Elastic Cloud on GCP.pptx is larger than 10485760 bytes. Discarding the file content
[FMWK][08:43:30][INFO] Job reporting task is stopped.
[FMWK][08:43:30][INFO] [XXX] Sync done: 1 indexed, 0 deleted. (1 seconds)

bodyのないDocumentが作られました。
image.png

終わり

今回は新しいS3 Connector Clientを使い、オブジェクトをElasticsearchに取り込むところまでやりました。
次回はこのインデックスに対する検索を試していきます。

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?