はじめに
AWS s3のデータをElasticsearchにインデックスするためのConnector Client - Elastic S3 ConnectorがElastic Stack v8.7でベータリリースされました。
今後いくつかの記事に渡り、データの取り込みから検索までを紹介していきたいと思います。
まず最初の本記事ではElastic S3 Connectorを使ったS3バケットに含まれるオブジェクトデータの取り込みです。
アーキテクチャー
S3データの取り込み(今回の記事の範囲)
Elasticsearch <= Python Connector Client => AWS S3
- Elasticsearchは今回はElastic Cloudを利用していますが、ダウンロード版のElasticsearch v8.7でも動作します。
- Connector ClientはPython 3.10で動くPythonプログラムです。今回は自分のMacで試していますが、Python 3.10の環境であればどこでも動くと思います。(最初、EC2 Amazon Linuxで試そうとしましがpython 3.10のインストールに苦労したので、今回はMacでやることにしました。
AWS側用意
-
s3バケットを作成します。今回はConnector Clientが自分のマシンにあるので、パブリックアクセスは許可しておきます。
-
IAMポリシーを作成します。こちらのドキュメントに書いてある通り、ListAllMyBuckets、ListBucket、GetBucketLocation、GetObjectのパーミッションをポリシーに割り当てます。
-
ユーザーを作成し、上記のIAMポリシーを割り当てます。また、アクセスキーを作成し、Access Key IDとSecret access keyをメモします。
-
Connector Clientを実行するマシン上で、AWS CLIのインストールをして、
aws configure
コマンドで上記アクセスキーを設定してください。
Connector Clientの構築
ここからは以下のConnector Clientのサンプル手順(Postgreコネクタ)を参考に、s3コネクタ用にカスタマイズします。
https://www.elastic.co/guide/en/enterprise-search/8.7/postgresql-connector-client-tutorial.html
わかりやすくするため、上記ドキュメントのステップの題名とこちらのステップの題名を合わせておきました。
Create an Elasticsearch Index
Enteprise Search-> Indicesから
Build a connectorを選択し、Index名を設定します。
ここでユーザーのアプリケーションが今回構築するこのEnteprise Search APIにアクセスするためのAPI Keyを取得します。
後でコネクターに設定するAPI KeyとConnector idをメモします。
Setup the connector
-
以下のConnector Client のGitHubのリポジトリを、実行するマシンにダウンロードします。
https://github.com/elastic/connectors-python/
今回は8.7のブランチを使いました。git clone https://github.com/elastic/connectors-python -b 8.7
と実行しました。 -
コードのconnectors/kibana.pyのファイルを編集し、is_nativeの箇所をFalseに変更します。
-
コードのconfig.yamlのファイルを編集し、host, api_key, connector_id, service_type, sourcesを編集します。
XXXの箇所は自分の環境のものを使います。
elasticsearch:
host: https://XXX
# host: https://xxx.es.asia-northeast1.gcp.cloud.es.io # example
api_key: 'XXX' # Your API key
ssl: true
bulk:
queue_max_size: 1024
queue_max_mem_size: 25
display_every: 100
chunk_size: 1000
max_concurrency: 5
chunk_max_mem_size: 5
concurrent_downloads: 10
request_timeout: 120
max_wait_duration: 120
initial_backoff_duration: 1
backoff_multiplier: 2
log_level: info
service:
idling: 30
heartbeat: 300
max_errors: 20
max_errors_span: 600
max_concurrent_syncs: 1
job_cleanup_interval: 300
log_level: INFO
native_service_types:
- mongodb
- mysql
- network_drive
- s3
- google_cloud_storage
- azure_blob_storage
- postgresql
- oracle
- dir
# - mssql # temporary not working, migrating it to a different library
# Connector client settings
connector_id: 'XXX' # Your connector ID
service_type: 's3' # The service type for your connector
sources:
# mongodb: connectors.sources.mongo:MongoDataSource
s3: connectors.sources.s3:S3DataSource
# dir: connectors.sources.directory:DirectoryDataSource
# mysql: connectors.sources.mysql:MySqlDataSource
# network_drive: connectors.sources.network_drive:NASDataSource
# google_cloud_storage: connectors.sources.google_cloud_storage:GoogleCloudStorageDataSource
# azure_blob_storage: connectors.sources.azure_blob_storage:AzureBlobStorageDataSource
# postgresql: connectors.sources.postgresql:PostgreSQLDataSource
# oracle: connectors.sources.oracle:OracleDataSource
# mssql: connectors.sources.mssql:MSSQLDataSource # temporary not working, migrating it to a different library
Run the connector service
コードのルートフォルダ配下で以下のコマンドを実行します。(必要に応じてmakeのインストールを先にしてください)
make run
以下のようなログが出力されます。Enterprise Searchtの接続が確立され、ジョブの指令をリスニングしているのがわかります。
Finished processing dependencies for elasticsearch-connectors==8.7.0.0
bin/elastic-ingest
[FMWK][14:25:34][INFO] Loading config from /XXX/connectors-python/connectors/../config.yml
[FMWK][14:25:34][INFO] Preflight checks...
[FMWK][14:25:34][INFO] Waiting for NodeConfig(scheme='https', host='XXX.es.asia-northeast1.gcp.cloud.es.io', port=443, path_prefix='', headers={}, connections_per_node=10, request_timeout=10.0, http_compress=False, verify_certs=True, ca_certs=None, client_cert=None, client_key=None, ssl_assert_hostname=None, ssl_assert_fingerprint=None, ssl_version=None, ssl_context=None, ssl_show_warn=True, _extras={}) (so far: 0 secs)
[FMWK][14:25:35][INFO] Service started, listening to events from https://XXX.es.asia-northeast1.gcp.cloud.es.io
[FMWK][14:25:35][INFO] Successfully started Job cleanup task...
[FMWK][14:25:35][INFO] Start cleaning up orphaned jobs...
[FMWK][14:25:35][INFO] No orphaned jobs found. Skipping...
[FMWK][14:25:35][INFO] Start cleaning up idle jobs...
[FMWK][14:25:35][INFO] No idle jobs found. Skipping...
[FMWK][14:30:35][INFO] Start cleaning up orphaned jobs...
[FMWK][14:30:35][INFO] No orphaned jobs found. Skipping...
Sync your PostgreSQL S3 data source
Enter your PostgreSQL S3 data source details
Enterprise Searchの画面に戻り、検索対象のAWS Bucketの名前を設定し、Save Configurationをします。
最後に定期的なSyncのスケジュールの設定をしますが、先に右上の即時Syncを実行してうまくいくか確認しましょう。
SyncするとConnectorに設定が伝わり、そこからConnectorがS3バケットにアクセスしに行き、s3のファイルを取得しに行きます。
[FMWK][14:37:07][INFO] Validating Amazon S3 Configuration...
[FMWK][14:37:08][INFO] Successfully connected to AWS Server.
[FMWK][14:37:08][INFO] Starting doc lookups
[FMWK][14:37:09][INFO] Job reporting task is stopped.
[FMWK][14:37:09][INFO] [XXX] Sync done: 2 indexed, 0 deleted. (1 seconds)
最後、好きなスケジュールでSyncされるように設定して終わりです。
s3とのSyncができると、以下のようにドキュメントのテキストが取り込まれたことが確認できます。
Connector clientをコンテナで実行する手順
前のステップでは、マシン上でPythonのConnector Clientを動かしましたが、コンテナとして動かすこともできるのでやってみました。
以下の手順を参考にしました。今回はこちらを少しやりやすいようにカスタマイズしています。
https://github.com/elastic/connectors-python/blob/8.7/docs/DOCKER.md
- Connector Clientが動くコンテナにAWS CLIをインストールする必要があるので、前の手順でダウンロードしたconnectors-python/Dockerfileを以下のように追記します。#### AWS CLI INSTALL #### 以降が追加分です。(AWS CLIのインストールはこちらの手順を参考にさせていただきました:https://zenn.dev/tokku5552/articles/aws-container)
FROM python:3.10
RUN git clone https://github.com/elastic/connectors-python /app/
WORKDIR /app
RUN make clean install
#### AWS CLI INSTALL ####
# install pre-req of aws cli
RUN apt-get update && apt-get install -y less vim curl unzip sudo
# install aws cli v2
RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
RUN unzip awscliv2.zip
RUN sudo ./aws/install
2. Dockerfileと同じ場所に以下のdocker-compose ファイルを新規作成します。前の手順でconfig.ymlは設定しているので、そのままそれをマウントして使っています。
version: "3"
services:
s3-connector:
build: .
container_name: s3-connector
volumes:
- ./config.yml:/app/config.yml
env_file:
- .env
environment:
AWS_DEFAULT_REGION: ap-northeast-1
AWS_DEFAULT_OUTPUT: json
command: /app/bin/elastic-ingest -c /app/config.yml
3. 同じ場所に.envファイルを新規作成し、s3アクセスキーの設定を行います。
AWS_ACCESS_KEY_ID='YOUR_AWS_ACCESS_KEY_ID'
AWS_SECRET_ACCESS_KEY='YOUR_AWS_SECRET_ACCESS_KEY'
4. docker-compose up
で起動し、前の手順と同じような出力が出ればOKです。
[+] Running 1/0
⠿ Container s3-connector Created 0.0s
Attaching to s3-connector
s3-connector | [FMWK][06:44:26][INFO] Running connector service version 8.8.0.0
s3-connector | [FMWK][06:44:26][INFO] Loading config from /app/config.yml
s3-connector | [FMWK][06:44:26][INFO] Preflight checks...
s3-connector | [FMWK][06:44:26][INFO] Waiting for NodeConfig(scheme='https', host='XXX.es.asia-northeast1.gcp.cloud.es.io', port=443, path_prefix='', headers={}, connections_per_node=10, request_timeout=10.0, http_compress=False, verify_certs=True, ca_certs=None, client_cert=None, client_key=None, ssl_assert_hostname=None, ssl_assert_fingerprint=None, ssl_version=None, ssl_context=None, ssl_show_warn=True, _extras={}) (so far: 0 secs)
s3-connector | [FMWK][06:44:26][INFO] Service started, listening to events from https://XXX.es.asia-northeast1.gcp.cloud.es.io
s3-connector | [FMWK][06:44:26][INFO] Service started, listening to events from https://XXX.es.asia-northeast1.gcp.cloud.es.io
s3-connector | [FMWK][06:44:26][INFO] Successfully started Job cleanup task...
s3-connector | [FMWK][06:44:26][INFO] Start cleaning up orphaned jobs...
s3-connector | [FMWK][06:44:26][INFO] No orphaned jobs found. Skipping...
s3-connector | [FMWK][06:44:26][INFO] Start cleaning up idle jobs...
s3-connector | [FMWK][06:44:26][INFO] No idle jobs found. Skipping...
どのようにデータがインデックスされているか
注意は、Syncできるファイルは10MBまでとされているので、例えば20MBのpptをS3にアップすると...
[FMWK][08:43:29][INFO] Successfully connected to AWS Server.
[FMWK][08:43:29][INFO] Starting doc lookups
[FMWK][08:43:29][WARNING] File size for Elastic Cloud on GCP.pptx is larger than 10485760 bytes. Discarding the file content
[FMWK][08:43:30][INFO] Job reporting task is stopped.
[FMWK][08:43:30][INFO] [XXX] Sync done: 1 indexed, 0 deleted. (1 seconds)
終わり
今回は新しいS3 Connector Clientを使い、オブジェクトをElasticsearchに取り込むところまでやりました。
次回はこのインデックスに対する検索を試していきます。