6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Event Streams と MQ連携 ~kafka connectorを使用してMQメッセージをKafkaに収集~

Last updated at Posted at 2023-05-09

はじめに

IBM Event StreamsApache Kafkaをベースにしたイベント・ストリームプラットフォームです。Event Streamsでは様々なサービスとデータ連携するためのConnectorを提供されています。今回は、MQ source connectorを使用して、MQのキューにメッセージが到着したことをイベントとして検知し、Event Streamsのtopicに収集する方法を紹介します。

1. 事前準備

1.1 MQ環境

メッセージの収集元となるMQを設定します。
今回は、IBM Cloudのフリー環境を使用しています。

image.png

接続用の資格情報も登録しておきます。

image.png

1.2 Event Streams環境

メッセージの収集先となるEvent Streamsを設定します。
今回は、IBM CloudにCloud Pak for Integration 2022.4.1を導入して使用しています。

Event Streamsインスタンスに接続すると、以下のようなコンソールが表示されます。

image.png

1.2.1 トピック作成

ホーム画面の「トピックの作成」からデータ収集に使用するトピックを作成します。
今回は「user」というトピックを作成しています。

image.png

1.2.2 クラスター接続設定

ホーム画面の「このクラスターに接続」から接続に使用する資格情報を確認・登録します。
Kafkaリスナーおよび資格情報欄で内部に対する「TLS資格情報の生成」を実施します。

image.png

以下のような流れで設定していきます。

image.png

image.png

image.png

image.png

ここで設定した証明書は、OCPコンソールからも確認ができます。

image.png

2. コネクター環境セットアップ

「ツールボックス」を選択すると、Event Streamsに接続するための様々なツールが示されています。
今回は、コネクターを使用していきます。以下の順序で実施していきます。

2.1 Kafka Connect環境のセットアップ
2.2 Kafka Connect環境へのコネクターの追加
2.3 Kafka Connectとコネクターを開始

image.png

2.1. Kafka Connect環境のセットアップ

ここで示されている手順に沿って準備していきます。

image.png

Kafka Connect ZIPをダウンロードすると以下のような中身になっています。

kafkaconnect: 
 Dockerfile
 kafka-connect.yaml
 my-plugins

まずは、kafka-connect.yamlに今回の環境用の認証情報を設定していきます。
環境に合わせて以下の値を設定していきます。証明書の詳細はOCPコンソールから確認ができます。
・bootstrapServers
・image
・tls
・authentication

kafka-connect.yaml
apiVersion: eventstreams.ibm.com/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    eventstreams.ibm.com/use-connector-resources: "true"
spec:
  replicas: 1
  bootstrapServers: es-demo-kafka-bootstrap.cp4i.svc:9093
  image: image-registry.openshift-image-registry.svc:5000/cp4i/my-connect-cluster-image:latest
  template:
    pod:
      imagePullSecrets: []
      metadata:
        annotations:
          eventstreams.production.type: CloudPakForIntegrationNonProduction
          productID: 2a79e49111f44ec3acd89608e56138f5
          productName: IBM Event Streams for Non Production
          productVersion: 11.1.3
          productMetric: VIRTUAL_PROCESSOR_CORE
          productChargedContainers: my-connect-cluster-connect
          cloudpakId: c8b82d189e7545f0892db9ef2731b90d
          cloudpakName: IBM Cloud Pak for Integration
          cloudpakVersion: 2022.4.1
          productCloudpakRatio: "2:1"
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    config.storage.replication.factor: 3
    offset.storage.replication.factor: 3
    status.storage.replication.factor: 3
  tls:
    trustedCertificates:
      - secretName: es-demo-cluster-ca-cert
        certificate: ca.crt
  authentication:
    type: tls
    certificateAndKey:
        certificate: user.crt
        key: user.key
        secretName: kafka-user

2.2 Kafka Connect環境へのコネクターの追加

次にmy-pluginsフォルダーに今回使用したいConnector用のプラグインファイルを配置します。
今回は、mq-source connectorを使用しますので、こちらのGithubで提供される情報をもとにjarファイルを作成し、配置します。

これで準備が終わりましたので、これらのファイルをDockerコンテナーとしてビルドし、OCP環境にPushしていきます。

>docker login cp.icr.io -u cp -p ***
WARNING! Using --password via the CLI is insecure. Use --password-stdin.
Login Succeeded

# dockerイメージビルド
>docker build -t my-connect-cluster-image:latest C:\kafkaconnect
[+] Building 6.3s (8/8) FINISHED
 => [internal] load build definition from Dockerfile                                                0.0s
 => => transferring dockerfile: 134B                                                                0.0s
 => [internal] load .dockerignore                                                                   0.0s
 => => transferring context: 2B                                                                     0.0s
 => [internal] load metadata for cp.icr.io/cp/ibm-eventstreams-kafka:11.1.3                         5.9s
 => [auth] cp/ibm-eventstreams-kafka:pull token for cp.icr.io                                       0.0s
 => [internal] load build context                                                                   0.2s
 => => transferring context: 13.54MB                                                                0.1s
 => CACHED [1/2] FROM cp.icr.io/cp/ibm-eventstreams-kafka:11.1.3@sha256:c914ec93fd1131a429f08b538e  0.0s
 => [2/2] COPY ./my-plugins/ /opt/kafka/plugins/                                                    0.1s
 => exporting to image                                                                              0.1s
 => => exporting layers                                                                             0.1s
 => => writing image sha256:c09376a99a08679869c2a24524bd9fd20a08730e88655868b202cd3b9c38029c        0.0s
 => => naming to docker.io/library/my-connect-cluster-image:latest                                  0.0s

# OCP環境レジストリーパスでタグ付け
> docker tag my-connect-cluster-image:latest default-route-openshift-image-registry.***.us-east.containers.appdomain.cloud/cp4i/my-connect-cluster-image:latest

# OCP環境にログイン
> docker login -u *** -p sha256~*** default-route-openshift-image-registry.***.us-east.containers.appdomain.cloudWARNING! Using --password via the CLI is insecure. Use --password-stdin.
Login Succeeded

# OCP環境のイメージレジストリーにプッシュ
> docker push default-route-openshift-image-registry.***.us-east.containers.appdomain.cloud/cp4i/my-connect-cluster-image:latest
The push refers to repository [default-route-openshift-image-registry.***.us-east.containers.appdomain.cloud/cp4i/my-connect-cluster-image]
9bb3680d1518: Pushed
afbaf216b515: Pushed
604ea310d3b8: Pushed
c885a267df03: Pushed
d41c9d214c08: Pushed
1d4211b8047a: Pushed
b6b5e54a9d61: Pushed
5b0e52ab72d6: Pushed
b9c8807c07f0: Pushed
abd5331ad63d: Pushed
1d36f4b724f1: Pushed
28647eedfe80: Pushed
28d152ef0c4c: Pushed
a5a3a5497180: Pushed
b642d9aa2f45: Pushed
8b86f8fd3a7d: Pushed
99c6ecde084d: Pushed
28a458282a1e: Pushed
b66994f69e49: Pushed
f536fae8f746: Pushed
cd1cd3ddb935: Pushed
61094c2c6010: Pushed
1ff1dbf9158b: Pushed
f922a018877b: Pushed
52cbfc36b72b: Pushed
latest: digest: sha256:ba6fd8d172c8e379c873d6dc9a811f8d390a8f0047923099ba211d06bd7f632e size: 5555

2.3 Kafka Connectとコネクターを開始

2.3.1 kafka connectビルド

以下の手順でKafkaConnectを開始します。

> oc apply -f kafka-connect.yaml
kafkaconnect.eventstreams.ibm.com/my-connect-cluster created

OCP環境からPodが開始され、エラーが出ていないことを確認します。

image.png

image.png

2.3.2 Kafka Connectorの開始

GitHubで提供しているConnectorのリソースファイルを接続先の環境に合わせて編集し、開始します。

> cat mq-source.yaml
apiVersion: eventstreams.ibm.com/v1beta2
kind: KafkaConnector
metadata:
  name: mq-source
  labels:
    eventstreams.ibm.com/cluster: my-connect-cluster
spec:
  class: com.ibm.eventstreams.connect.mqsource.MQSourceConnector
  tasksMax: 1
  config:
    mq.queue.manager: "USER.MGMT"
    mq.connection.name.list: "user***.appdomain.cloud(30684)"
    mq.channel.name: "CLOUD.APP.SVRCONN"
    mq.queue: "DEV.QUEUE.1"
    mq.user.name: "showroomapp"
    mq.password: "***"
    topic: "user"
    mq.connection.mode: client
    mq.record.builder: com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.converters.ByteArrayConverter

> oc apply -f mq-source.yaml
kafkaconnector.eventstreams.ibm.com/mq-source created

OCPコンソールからkafkaconnectorのリソースが作成されていることを確認できます。
特にエラーが出ていないことを確認します。

image.png

image.png

また、Kafka ConnectのPodログにMQとの接続が確立さえたメッセージが表示されることも確認できます。

image.png

3. 動作確認

以上でMQメッセージをKafkaに連携する準備が整いました。

MQ Consoleから今回接続しているキューに対してメッセージを登録します。

image.png

Event StreamsのConsoleから対象のトピックを確認すると、メッセージを受信できていることが確認できます。

image.png

以上です。

6
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?