はじめに
この記事では、IBM提供のKafkaソリューションであるEvent Streamsを使用してAWS S3との連携を実現する方法を紹介します。Event Streamsでは様々なKafka Connectorを使用することができ、こちらにリストがまとまっています。今回はAmazon S3のSink Connectorを使用していきます。
基本的な流れは、以前の記事 - Event StreamsとMQ連携と同様になります。
1. 事前準備
1.1 Amazon S3環境
メッセージの連携先となるAmazon S3環境を設定します。
AWSの無料利用枠で検証しています。
事前にバケットを作成しておきます。
また、Kafkaが連携で使用するAWSのセキュリティ認証情報を設定します。
今回は検証目的のため、ルートユーザーのアクセスキーを使用します。
1.2 Event Streams環境
メッセージの連携元となるEvent Streams環境を設定します。
今回は、IBM CloudにCloud Pak for Integration 2023.2.1を導入して、Event Streams v11.2.0インスタンスをデプロイしています。
Event Streamsのコンソールに接続します。
1.2.1 トピック作成
ホーム画面の「トピックの作成」からデータ収集に使用するトピックを作成します。
今回は「test-topic」というトピックを作成しています。
1.2.2 クラスター接続設定
ホーム画面の「このクラスターに接続」から接続に使用する資格情報を確認・登録します。
Kafkaリスナーおよび資格情報欄で外部に対する「SCRAM資格情報の設定」を実施します。
今回は「kafka-user」という名前で設定し、以降はガイドに沿って権限を設定していきます。
2. コネクター環境セットアップ
「ツールボックス」を選択すると、Event Streamsに接続するための様々なツールが示されています。
今回は、コネクターを使用していきます。以下の順序で実施していきます。
2.1 Kafka Connect環境のセットアップ
2.2 Kafka Connect環境へのコネクターの追加
2.3 Kafka Connectとコネクターを開始
2.1 Kafka Connect環境のセットアップ
ここで示されている手順に沿って準備していきます。
Kafka Connect ZIPをダウンロードすると以下のような中身になっています。
kafkaconnect:
Dockerfile
kafka-connect.yaml
my-plugins
まずは、kafka-connect.yamlに今回の環境用の認証情報を設定していきます。
環境に合わせて以下の値を設定していきます。証明書の詳細はOCPコンソールから確認ができます。
・bootstrapServers
・image
・tls
・authentication
apiVersion: eventstreams.ibm.com/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
annotations:
eventstreams.ibm.com/use-connector-resources: "true"
spec:
replicas: 1
bootstrapServers: es-demo-kafka-bootstrap-cp4i.***.cloud:443
image: image-registry.openshift-image-registry.svc:5000/cp4i/my-connect-cluster-image
template:
pod:
imagePullSecrets: []
metadata:
annotations:
eventstreams.production.type: CloudPakForIntegrationNonProduction
productID: 2a79e49111f44ec3acd89608e56138f5
productName: IBM Event Streams for Non Production
productVersion: 11.2.0
productMetric: VIRTUAL_PROCESSOR_CORE
productChargedContainers: my-connect-cluster-connect
cloudpakId: c8b82d189e7545f0892db9ef2731b90d
cloudpakName: IBM Cloud Pak for Integration
productCloudpakRatio: "2:1"
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
config.storage.replication.factor: 3
offset.storage.replication.factor: 3
status.storage.replication.factor: 3
tls:
trustedCertificates:
- secretName: es-demo-cluster-ca-cert
certificate: ca.crt
authentication:
type: scram-sha-512
username: kafka-user
passwordSecret:
secretName: kafka-user
password: password
2.2 Kafka Connect環境へのコネクター追加
次にmy-pluginsフォルダーに今回使用したいConnector用のプラグインファイルを配置します。
今回は、s3 sink connectorを使用しますので、こちらのGithubで提供される情報をもとにjarファイルを取得し、配置します。リリースv2.13.0での一通りのjarファイルを含めて以降のような配置となります。jarファイルが複数ある場合は、my-pluginsフォルダー下にディレクトリーを作成して配置する必要があります。
./kafkaconnect
|--Dockerfile
|--kafka-connect.yaml
|--my-plugins
| |--s3
| | |--audience-annotations-0.11.0.jar
| | |--avro-1.11.0.jar
| | |--aws-java-sdk-core-1.12.479.jar
| | |--aws-java-sdk-kms-1.12.461.jar
| | |--aws-java-sdk-s3-1.12.461.jar
| | |--aws-java-sdk-sts-1.12.479.jar
| | |--checker-qual-3.12.0.jar
| | |--common-utils-7.2.2.jar
| | |--commons-beanutils-1.9.4.jar
| | |--commons-codec-1.15.jar
| | |--commons-collections-3.2.2.jar
| | |--commons-compress-1.21.jar
| | |--commons-configuration2-2.8.0.jar
| | |--commons-for-apache-kafka-connect-0.9.0.jar
| | |--commons-lang3-3.12.0.jar
| | |--commons-logging-1.2.jar
| | |--commons-pool-1.6.jar
| | |--commons-text-1.10.0.jar
| | |--dnsjava-2.1.7.jar
| | |--error_prone_annotations-2.11.0.jar
| | |--failureaccess-1.0.1.jar
| | |--guava-31.1-jre.jar
| | |--hadoop-annotations-3.3.5.jar
| | |--hadoop-common-3.3.5.jar
| | |--hadoop-shaded-guava-1.1.1.jar
| | |--httpclient-4.5.13.jar
| | |--httpcore-4.4.13.jar
| | |--ion-java-1.0.2.jar
| | |--j2objc-annotations-1.3.jar
| | |--jackson-annotations-2.13.2.jar
| | |--jackson-core-2.13.2.jar
| | |--jackson-databind-2.13.2.2.jar
| | |--jackson-dataformat-cbor-2.13.2.jar
| | |--jakarta.activation-api-1.2.1.jar
| | |--javax.annotation-api-1.3.2.jar
| | |--jaxb-api-2.2.2.jar
| | |--jaxb-impl-2.2.3-1.jar
| | |--jersey-json-1.20.jar
| | |--jettison-1.1.jar
| | |--jmespath-java-1.12.479.jar
| | |--joda-time-2.8.1.jar
| | |--jsr305-3.0.2.jar
| | |--kafka-avro-serializer-7.2.2.jar
| | |--kafka-connect-avro-converter-7.2.2.jar
| | |--kafka-connect-avro-data-7.2.2.jar
| | |--kafka-schema-converter-7.2.2.jar
| | |--kafka-schema-registry-client-7.2.2.jar
| | |--kafka-schema-serializer-7.2.2.jar
| | |--listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
| | |--logredactor-1.0.10.jar
| | |--logredactor-metrics-1.0.10.jar
| | |--minimal-json-0.9.5.jar
| | |--parquet-avro-1.11.1.jar
| | |--parquet-column-1.11.1.jar
| | |--parquet-common-1.11.1.jar
| | |--parquet-encoding-1.11.1.jar
| | |--parquet-format-structures-1.11.1.jar
| | |--parquet-hadoop-1.11.1.jar
| | |--parquet-jackson-1.11.1.jar
| | |--re2j-1.6.jar
| | |--reload4j-1.2.22.jar
| | |--s3-connector-for-apache-kafka-2.13.0.jar
| | |--slf4j-api-1.7.36.jar
| | |--slf4j-reload4j-1.7.36.jar
| | |--snappy-java-1.1.9.1.jar
| | |--stax-api-1.0-2.jar
| | |--stax2-api-4.2.1.jar
| | |--swagger-annotations-2.1.10.jar
| | |--woodstox-core-5.4.0.jar
| | |--zstd-jni-1.5.5-2.jar
これで準備が終わりましたので、これらのファイルをDockerコンテナーとしてビルドし、OCP環境にPushしていきます。
>docker login cp.icr.io -u cp -p ***
WARNING! Using --password via the CLI is insecure. Use --password-stdin.
Login Succeeded
# dockerイメージビルド
>docker build -t my-connect-cluster-image:latest kafkaconnect/
[+] Building 6.3s (8/8) FINISHED
=> [internal] load build definition from Dockerfile 0.0s
=> => transferring dockerfile: 134B 0.0s
=> [internal] load .dockerignore 0.0s
=> => transferring context: 2B 0.0s
=> [internal] load metadata for cp.icr.io/cp/ibm-eventstreams-kafka:11.2.0 5.9s
=> [auth] cp/ibm-eventstreams-kafka:pull token for cp.icr.io 0.0s
=> [internal] load build context 0.2s
=> => transferring context: 13.54MB 0.1s
=> CACHED [1/2] FROM cp.icr.io/cp/ibm-eventstreams-kafka:11.2.0@sha256:*** 0.0s
=> [2/2] COPY ./my-plugins/ /opt/kafka/plugins/ 0.1s
=> exporting to image 0.1s
=> => exporting layers 0.1s
=> => writing image sha256:c09376a99a08679869c2a24524bd9fd20a08730e88655868b202cd3b9c38029c 0.0s
=> => naming to docker.io/library/my-connect-cluster-image:latest 0.0s
# OCP環境レジストリーパスでタグ付け
> docker tag my-connect-cluster-image:latest default-route-openshift-image-registry.***.cloud/cp4i/my-connect-cluster-image:latest
# OCP環境にログイン
> docker login -u *** -p sha256~*** default-route-openshift-image-registry.***.cloudWARNING! Using --password via the CLI is insecure. Use --password-stdin.
Login Succeeded
# OCP環境のイメージレジストリーにプッシュ
> docker push default-route-openshift-image-registry.***.cloud/cp4i/my-connect-cluster-image:latest
The push refers to repository [default-route-openshift-image-registry.***.us-east.containers.appdomain.cloud/cp4i/my-connect-cluster-image]
9bb3680d1518: Pushed
afbaf216b515: Pushed
604ea310d3b8: Pushed
c885a267df03: Pushed
d41c9d214c08: Pushed
1d4211b8047a: Pushed
b6b5e54a9d61: Pushed
5b0e52ab72d6: Pushed
b9c8807c07f0: Pushed
abd5331ad63d: Pushed
1d36f4b724f1: Pushed
28647eedfe80: Pushed
28d152ef0c4c: Pushed
a5a3a5497180: Pushed
b642d9aa2f45: Pushed
8b86f8fd3a7d: Pushed
99c6ecde084d: Pushed
28a458282a1e: Pushed
b66994f69e49: Pushed
f536fae8f746: Pushed
cd1cd3ddb935: Pushed
61094c2c6010: Pushed
1ff1dbf9158b: Pushed
f922a018877b: Pushed
52cbfc36b72b: Pushed
latest: digest: sha256:***7f632e size: 5555
2.3 Kafka Connectとコネクターを開始
2.3.1 kafka connect起動
以下の手順でKafkaConnectを開始します。
> oc apply -f kafka-connect.yaml
kafkaconnect.eventstreams.ibm.com/my-connect-cluster created
OCP環境からPodが開始され、エラーが出ていないことを確認します。
2.3.2 Kafka Connectorの開始
次にKafkaConnectorのリソースファイルを作成します。GitHubで提供しているS3 Connectorの情報や事前準備で確認した内容をもとに、編集しデプロイします。
> cat s3-sink.yaml
apiVersion: eventstreams.ibm.com/v1beta2
kind: KafkaConnector
metadata:
name: s3-sink-connector
labels:
eventstreams.ibm.com/cluster: my-connect-cluster
spec:
class: io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector
tasksMax: 1
config:
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false
format.output.type: jsonl
topics: test-topic
aws.access.key.id: ***
aws.secret.access.key: ***
aws.s3.region: us-east-1
file.name.template: dir1/dir2/{{topic}}-{{partition:padding=true}}-{{start_offset:padding=true}}.gz
aws.s3.bucket.name: showroom-s3
> oc apply -f s3-sink.yaml
kafkaconnector.eventstreams.ibm.com/s3-sink-connector created
OCPコンソールからkafkaconnectorのリソースが作成されていることを確認できます。
特にエラーが出ていないことを確認します。
3. 動作確認
以上でEvent Streams-Amazon S3の連携が実現できました。
Event Streamsのtest-topicにメッセージが到着すると、即時、Amazon S3に連携されることを確認していきます。
Event Streamsへのメッセージ登録には、ツールボックスのスターター・アプリケーションを使用します。
ローカル端末上で起動することができ、以下がテストツールの表示画面になります。左側がProducer用のテスト機能です。
ここでは、「{"test":"test message"}」というメッセージを生成しました。
このツールにより、Event Streamsのtest-topicにメッセージが到着していることが確認できます。
Amazon S3にも、Kafka Connectorの設定に基づいた形式でファイルが作成されていることが確認できます。
以上です。