1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

watsonx.dataにKafkaを組み込んでPrestoエンジンでアクセスしてみた

Last updated at Posted at 2023-12-21

watsonx.dataはオープンなデータレイクハウス・アーキテクチャに基づいて構築されており、大容量のデータを安価に保存できるオブジェクトストレージにテーブルを作成したり、他のデータベースを統合することができ、それらのデータベースのデータをwatsonx.dataに組み込まれているオープンソースのPrestoエンジンを使って横断的にSQL文でアクセスできます。

Kafkaは大規模なイベント・ストリーム・データを公開(パブリッシュ)/購読(サブスクライブ)して複数のシステム間でデータを連携することができるオープンソースで、一定期間データを保持したり、複数のサーバーに分散かつ冗長化して処理できるため、高速で耐障害性が高いプラットフォームとなっています。

利用例としては、ホストシステムでトランザクション処理を行い、そのデータをKafkaで別システムに連携し参照系を行うといった、ホストシステムから参照系の負荷を軽減させ、別システムでフレキシブルに参照を行う CQRS (Command Query Responsibility Segregation) で使用されたりしています。

本記事は、watsonx.dataにKafkaを組み込んでPrestoエンジンでアクセスしてみた内容になります。一部、まだ watsonx.data でサポートされていない機能について、オープンソースのPrestoのドキュメントを参照しながら試行錯誤して得られた結果も記載しています。(今後のサポートを期待)

この記事の内容

  • 今回試した環境
    • IBM Cloud Pak for Data (CP4D) 4.8.0 上に以下のサービスを導入
      • watsonx.data (SW版) 1.1.0
      • Event Streams (SaaS) (Kafka 3.3)
  • 今回試した内容
    • Kafkaにトピックを作成する
    • watsonx.dataに上記Kafkaを組み込む
    • PCにKafkaを導入し、上記のKafkaにJSON形式のデータを投入する
    • watsonx.dataのPrestoエンジンを使ってSQLでKafkaに投入されたデータを取得する
      • 投入されたデータは1つの列(_message)にセットされる
      • 投入された時間も別の列(_timestamp)にセットされる
    • watsonx.dataのPrestoエンジンに対して、Kafkaから取得したデータの列定義に関する構成情報ファイルを作成・適用し、JSON形式データの各キーをテーブルの別々の列にマッピングする
      • SQL文でJSONデータの各キーの値が別々の列にセットされるので、データ処理が容易になる。また、それらの列を使って条件指定や他のテーブルとの結合ができる
      • 考慮点
        • この機能は、まだ watsonx.data で対応されていない。今回、この対応を行うために、OpenShift上で動作しているPrestoエンジンのPodに直接構成情報ファイルを作成した。また、その後Prestoエンジンを再起動させるために、KafkaのPrestoへの関連を一旦切り離し、再度関連付けを行っている
        • PrestoエンジンのPodを削除してPodを再起動させると、作成した構成情報ファイルが削除されてしまう

この記事の流れ

  1. Kafkaにトピックを作成する
  2. watsonx.dataにKafkaを組み込む
  3. PCからKafkaにJSON形式のデータを投入してみる
  4. watsonx.dataのPrestoエンジンでSQL文を使ってKafkaに投入されたデータを取得してみる
  5. 追加でPCからKafkaにJSON形式のデータを投入し、そのデータをwatsonx.dataのPrestoエンジンで取得してみる
  6. watsonx.dataのPrestoエンジンに対して、Kafkaから取得したデータの列定義に関する構成情報ファイルを適用してみる

1. Kafkaにトピックを作成する

Kafka は IBM Cloud のカタログから Event Streams(Kafkaが組み込まれている) を選択し、作成する。(Kafkaで検索するとEvent Streamsがヒットする)

次にリソース・リストで、統合のカテゴリに作成されたEvent Streamsをクリックして開く。

左のメニューからトピックをクリックし、右にあるトピックの作成をクリックする。

kafka_add_topic_1.png

トピック名tpch.customerを入力し、次へをクリックする。

kafka_add_topic_2.png

今回、パーティション1のままで、次へをクリックする。

kafka_add_topic_3.png

今回、メッセージ保存期間は1日のままとして、トピックの作成をクリックする。

kafka_add_topic_4.png

トピックが作成されると、以下のようにリストされる。

kafka_add_topic_5.png

2. watsonx.dataにKafkaを組み込む

Event Streamsの左のメニューからサービス資格情報をクリックし、右にある新規資格情報をクリックし、資格情報を作成する。役割は、管理者またはライターを選択する。

kafka_create_service_credential.png

資格情報のうち、今回は以下の情報が必要となる。

{
  ...
  "kafka_brokers_sasl": [
    "broker-0-34gz6msg5ws2h333.kafka.svc08.us-south.eventstreams.cloud.ibm.com:9093",
    "broker-1-34gz6msg5ws2h333.kafka.svc08.us-south.eventstreams.cloud.ibm.com:9093",
    "broker-2-34gz6msg5ws2h333.kafka.svc08.us-south.eventstreams.cloud.ibm.com:9093",
    "broker-3-34gz6msg5ws2h333.kafka.svc08.us-south.eventstreams.cloud.ibm.com:9093",
    "broker-4-34gz6msg5ws2h333.kafka.svc08.us-south.eventstreams.cloud.ibm.com:9093",
    "broker-5-34gz6msg5ws2h333.kafka.svc08.us-south.eventstreams.cloud.ibm.com:9093"
  ],
  ...
  "password": "xxx",
  "user": "token"
}

次に、watsonx.dataのインフラストラクチャー・マネージャー画面の右にあるコンポーネントの追加をクリックし、プルダウンされたリストにあるデータベースの追加をクリックする。

watsonx_data_add_kafka_1.png

以下の情報を入力して、Registerをクリックする。

項目
データベース・タイプ Apache Kafka
表示名 kafka01
ホスト名 サービス資格情報に記載されているホスト名
ポート サービス資格情報に記載されているポート名
SASL connection 有効にする
ユーザー名 サービス資格情報に記載されているユーザー名
API key/Password サービス資格情報に記載されているパスワード
Topics tpch.customer
カタログ名 kafka1

watsonx_data_add_kafka_2.png

結果として、Kafkaの情報が以下の右側に追加される。

watsonx_data_add_kafka_3.png

下図のように、kafka1カタログの上にある関連付けの管理をクリックする。

watsonx_data_add_kafka_4.png

関連付けの管理presto-01のチェックボックスをチェックし、保存してエンジンを再始動するをクリックする。

watsonx_data_add_kafka_5.png

しばらく待つとPrestoエンジンの再始動が終了し、以下のようになる。

watsonx_data_add_kafka_6.png

3. PCからKafkaにJSON形式のデータを投入してみる

KafkaをPCにインストールし、Kafkaにメッセージを投入できるようにする。Macの場合、以下のコマンドでKafkaを組み込むことができる。

$ brew install kafka

tpch.customerのデータとして、以下のJSONデータを使用する。

{"rowNumber":1,"customerKey":1,"name":"Customer#000000001","address":"IVhzIApeRb ot,c,E","nationKey":15,"phone":"25-989-741-2988","accountBalance":711.56,"marketSegment":"BUILDING","comment":"to the even, regular platelets. regular, ironic epitaphs nag e"}
{"rowNumber":2,"customerKey":2,"name":"Customer#000000002","address":"XSTf4,NCwDVaWNe6tEgvwfmRchLXak","nationKey":13,"phone":"23-768-687-3665","accountBalance":121.65,"marketSegment":"AUTOMOBILE","comment":"l accounts. blithely ironic theodolites integrate boldly: caref"}
{"rowNumber":3,"customerKey":3,"name":"Customer#000000003","address":"MG9kdTD2WBHm","nationKey":1,"phone":"11-719-748-3364","accountBalance":7498.12,"marketSegment":"AUTOMOBILE","comment":" deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov"}
{"rowNumber":4,"customerKey":4,"name":"Customer#000000004","address":"XxVSJsLAGtn","nationKey":4,"phone":"14-128-190-5944","accountBalance":2866.83,"marketSegment":"MACHINERY","comment":" requests. final, regular ideas sleep final accou"}
{"rowNumber":5,"customerKey":5,"name":"Customer#000000005","address":"KvpyuHCplrB84WgAiGV6sYpZq7Tj","nationKey":3,"phone":"13-750-942-6364","accountBalance":794.47,"marketSegment":"HOUSEHOLD","comment":"n accounts will have to unwind. foxes cajole accor"}

最初の3つをKafkaのtpch.customerトピックに投入してみる。
まずは、以下のような認証情報ファイルを作成する。

kafka.properties
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password="xxx";
sasl.mechanism=PLAIN
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS

以下のコマンドで最初の3つをKafkaのtpch.customerトピックに投入する。

$ kafka-console-producer --producer.config kafka.properties --topic tpch.customer --bootstrap-server broker-0-34gz6msg5ws2h333.kafka.svc08.us-south.eventstreams.cloud.ibm.com:9093
>{"rowNumber":1,"customerKey":1,"name":"Customer#000000001","address":"IVhzIApeRb ot,c,E","nationKey":15,"phone":"25-989-741-2988","accountBalance":711.56,"marketSegment":"BUILDING","comment":"to the even, regular platelets. regular, ironic epitaphs nag e"}
>{"rowNumber":2,"customerKey":2,"name":"Customer#000000002","address":"XSTf4,NCwDVaWNe6tEgvwfmRchLXak","nationKey":13,"phone":"23-768-687-3665","accountBalance":121.65,"marketSegment":"AUTOMOBILE","comment":"l accounts. blithely ironic theodolites integrate boldly: caref"}
>{"rowNumber":3,"customerKey":3,"name":"Customer#000000003","address":"MG9kdTD2WBHm","nationKey":1,"phone":"11-719-748-3364","accountBalance":7498.12,"marketSegment":"AUTOMOBILE","comment":" deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov"}
>^C

4. watsonx.dataのPrestoエンジンでSQL文を使ってKafkaに投入されたデータを取得してみる

まず照会ワークスペースで、追加したKafkaのトピックに対応するwatsonx.dataのテーブルの列を確認してみる。下図に表示されている列のうち、_message列にKafkaの該当トピックに投入されたメッセージがセットされ、_timestamp列に投入した時間がセットされる。

watsonx_data_kafka_table_1.png

それでは、投入した時間順にJSONデータを取得してみる。
以下のSQL文を実行する。

select
  _timestamp,
  _message
from kafka1.tpch.customer
order by _timestamp;

(*) KafkaはパーティションごとにFIFOでメッセージが取得できるので、複数のパーティションに跨ったメッセージを、投入した時間順に取得したい場合はorder by _timestampを付ける。今回はパーティションを1にしているので、order by _timestampと指定しなくても時間順にメッセージが取得できる。

結果として、以下のように3件のJSONデータが取得できる。

watsonx_data_kafka_query_1.png

5. 追加でPCからKafkaにJSON形式のデータを投入し、そのデータをwatsonx.dataのPrestoエンジンで取得してみる

さらに残りの2件のJSONデータをコマンドから投入してみる。

$ kafka-console-producer --producer.config kafka.properties --topic tpch.customer --bootstrap-server broker-0-34gz6msg5ws2h333.kafka.svc08.us-south.eventstreams.cloud.ibm.com:9093
>{"rowNumber":4,"customerKey":4,"name":"Customer#000000004","address":"XxVSJsLAGtn","nationKey":4,"phone":"14-128-190-5944","accountBalance":2866.83,"marketSegment":"MACHINERY","comment":" requests. final, regular ideas sleep final accou"}
>{"rowNumber":5,"customerKey":5,"name":"Customer#000000005","address":"KvpyuHCplrB84WgAiGV6sYpZq7Tj","nationKey":3,"phone":"13-750-942-6364","accountBalance":794.47,"marketSegment":"HOUSEHOLD","comment":"n accounts will have to unwind. foxes cajole accor"}
>^C

先ほど実行したSQL文を再度実行すると、以下のように5件のJSONデータが取得できる。

watsonx_data_kafka_query_2.png

6. watsonx.dataのPrestoエンジンに対して、Kafkaから取得したデータの列定義に関する構成情報ファイルを適用してみる

オープンソースのPrestoのKafka Connectorについての情報が記載されている Kafka Connector Tutorial の "Step 6: Map all the values from the topic message onto columns" を参照すると、etc/kafka/tpch.customer.jsonに以下の情報を保存すると、JSONデータの各キーの値を列にマッピングすることができると記載されている。
(この定義は、今回使用したtpch.customerデータに合わせて定義されている)

etc/kafka/tpch.customer.json
{
  "tableName": "customer",
  "schemaName": "tpch",
  "topicName": "tpch.customer",
  "key": {
    "dataFormat": "raw",
    "fields": [
      {
        "name": "kafka_key",
        "dataFormat": "LONG",
        "type": "BIGINT",
        "hidden": "false"
      }
    ]
  },
  "message": {
    "dataFormat": "json",
    "fields": [
      {
        "name": "row_number",
        "mapping": "rowNumber",
        "type": "BIGINT"
      },
      {
        "name": "customer_key",
        "mapping": "customerKey",
        "type": "BIGINT"
      },
      {
        "name": "name",
        "mapping": "name",
        "type": "VARCHAR"
      },
      {
        "name": "address",
        "mapping": "address",
        "type": "VARCHAR"
      },
      {
        "name": "nation_key",
        "mapping": "nationKey",
        "type": "BIGINT"
      },
      {
        "name": "phone",
        "mapping": "phone",
        "type": "VARCHAR"
      },
      {
        "name": "account_balance",
        "mapping": "accountBalance",
        "type": "DOUBLE"
      },
      {
        "name": "market_segment",
        "mapping": "marketSegment",
        "type": "VARCHAR"
      },
      {
        "name": "comment",
        "mapping": "comment",
        "type": "VARCHAR"
      }
    ]
  }
}

これをwatsonx.dataのPrestoエンジンに適用してみる。

手順としては、

  1. 上記ファイルを用意する
  2. PrestoエンジンのPodに上記ファイルをコピーする
  3. Prestoエンジンを再起動するために、一旦、KafkaをPrestoエンジンから切り離し、その後、再度Prestoエンジンに関連付ける

まず、上記ファイルtpch.customer.jsonファイルを用意しておく。
次に、PrestoエンジンのPodに入り、コピーする場所を特定する。

# oc login --token=xxx --server=https://c115-e.jp-tok.containers.cloud.ibm.com:31009
Logged into "https://c115-e.jp-tok.containers.cloud.ibm.com:31009" as "IAM#mohkawa@jp.ibm.com" using the token provided.
...
Using project "cpd-instance".
# oc get pods | grep presto
ibm-lh-lakehouse-presto-01-single-blue-0                          1/1     Running     0             9h
# oc exec -it ibm-lh-lakehouse-presto-01-single-blue-0 -- /bin/bash
bash-4.4$ cd /opt/presto/etc
bash-4.4$ ls
access-control.properties	     cert			jmx.properties	   log4j-events.properties	      query-governance.properties
catalog				     config.properties		jvm.config	   node.properties		      setvendor.zip
catalog-secret.properties	     event-listener.properties	liblts-masking.so  password-authenticator.properties  system-schema.properties
catalog-table-exclusions.properties  jmx-exporter-config.yaml	log.properties	   preload_secrets.env

(*) oc loginコマンドは、OpenShift Web コンソール画面右にあるユーザー名をクリックして得られるリストからログインコマンドのコピーをクリック、次にDisplay Tokenをクリックすると表示される。

redhat_oc_login.png

調査した結果、/opt/presto/etcの配下に上記ファイルをコピーすれば良いことがわかった。etcディレクトリの配下にkafkaディレクトリがないので作成する。

bash-4.4$ pwd
/opt/presto/etc
bash-4.4$ mkdir kafka 
bash-4.4$ ls /opt/presto/etc/kafka
bash-4.4$ exit

作成したtpch.customer.jsonを上記にコピーする。

# oc cp tpch.customer.json ibm-lh-lakehouse-presto-01-single-blue-0:/opt/presto-server-0.282/etc/kafka/.

念の為、コピーされたことを確認する。

# oc exec -it ibm-lh-lakehouse-presto-01-single-blue-0 -- /bin/bash
bash-4.4$ ls /opt/presto/etc/kafka
tpch.customer.json
bash-4.4$ exit

次に、Prestoエンジンを再起動させて上記の情報を反映させるために、PrestoエンジンからKafkaを一旦切り離し、その後、再度、Kafkaを関連付ける。

watsonx_data_detach_kafka_1.png

Prestoエンジンの再起動が終了したら、照会ワークスペースで、Kafkaのトピックに対応するwatsonx.dataのテーブルの列を確認してみる。
以下のように列情報が変わっている。

watsonx_data_kafka_table_2.png

ここで、以下のSQL文を発行してみる。

select 
  _timestamp,
  row_number,
  customer_key,
  name,
  address,
  nation_key,
  phone,
  account_balance,
  market_segment,
  comment
from kafka1.tpch.customer
order by _timestamp;

結果として、以下のように、JSONデータの各キーの値が、別々の列の値として取得できる。

watsonx_data_kafka_query_3.png

JSONデータの各キーの値が、別々の列の値として扱えるようになったので、それらの列を使って条件指定や他のテーブルとの結合が可能となる。

例えば以下のように、IBM Cloud Object Storage (ICOS) 上に作成したtpch.nationテーブルと結合し、さらにrow_number列の条件を付加したSQL文を実行してみる。

select
  _timestamp,
  row_number,
  customer_key,
  customer.name,
  address,
  nation.name nation,
  phone,
  account_balance,
  market_segment,
  customer.comment
from kafka1.tpch.customer,icos1.tpch.nation
where customer.nation_key=nation.nationkey and row_number > 3
order by _timestamp;

結果として、以下が得られる。
(nation_key列がnation列に置き換わり、国名が表示されている)

watsonx_data_kafka_query_4.png

補足:
ICOSのicos1.tpch.nationテーブルは、以下のSQL文で作成した。
(tpch.tiny.nationテーブルは watsonx.data 導入時に構成されている)

create table icos1.tpch.nation
  with (
    format='PARQUET'
  )
  as select * from tpch.tiny.nation;
1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?