watsonx.dataはオープンなデータレイクハウス・アーキテクチャに基づいて構築されており、大容量のデータを安価に保存できるオブジェクトストレージにテーブルを作成したり、他のデータベースを統合することができ、それらのデータベースのデータをwatsonx.dataに組み込まれているオープンソースのPrestoエンジンを使って横断的にSQL文でアクセスできます。
Kafkaは大規模なイベント・ストリーム・データを公開(パブリッシュ)/購読(サブスクライブ)して複数のシステム間でデータを連携することができるオープンソースで、一定期間データを保持したり、複数のサーバーに分散かつ冗長化して処理できるため、高速で耐障害性が高いプラットフォームとなっています。
利用例としては、ホストシステムでトランザクション処理を行い、そのデータをKafkaで別システムに連携し参照系を行うといった、ホストシステムから参照系の負荷を軽減させ、別システムでフレキシブルに参照を行う CQRS (Command Query Responsibility Segregation) で使用されたりしています。
本記事は、watsonx.dataにKafkaを組み込んでPrestoエンジンでアクセスしてみた内容になります。一部、まだ watsonx.data でサポートされていない機能について、オープンソースのPrestoのドキュメントを参照しながら試行錯誤して得られた結果も記載しています。(今後のサポートを期待)
この記事の内容
- 今回試した環境
-
IBM Cloud Pak for Data (CP4D) 4.8.0 上に以下のサービスを導入
- watsonx.data (SW版) 1.1.0
- Event Streams (SaaS) (Kafka 3.3)
-
IBM Cloud Pak for Data (CP4D) 4.8.0 上に以下のサービスを導入
- 今回試した内容
- Kafkaにトピックを作成する
- watsonx.dataに上記Kafkaを組み込む
- PCにKafkaを導入し、上記のKafkaにJSON形式のデータを投入する
- watsonx.dataのPrestoエンジンを使ってSQLでKafkaに投入されたデータを取得する
- 投入されたデータは1つの列(
_message
)にセットされる - 投入された時間も別の列(
_timestamp
)にセットされる
- 投入されたデータは1つの列(
- watsonx.dataのPrestoエンジンに対して、Kafkaから取得したデータの列定義に関する構成情報ファイルを作成・適用し、JSON形式データの各キーをテーブルの別々の列にマッピングする
- SQL文でJSONデータの各キーの値が別々の列にセットされるので、データ処理が容易になる。また、それらの列を使って条件指定や他のテーブルとの結合ができる
- 考慮点
- この機能は、まだ watsonx.data で対応されていない。今回、この対応を行うために、OpenShift上で動作しているPrestoエンジンのPodに直接構成情報ファイルを作成した。また、その後Prestoエンジンを再起動させるために、KafkaのPrestoへの関連を一旦切り離し、再度関連付けを行っている
- PrestoエンジンのPodを削除してPodを再起動させると、作成した構成情報ファイルが削除されてしまう
この記事の流れ
- Kafkaにトピックを作成する
- watsonx.dataにKafkaを組み込む
- PCからKafkaにJSON形式のデータを投入してみる
- watsonx.dataのPrestoエンジンでSQL文を使ってKafkaに投入されたデータを取得してみる
- 追加でPCからKafkaにJSON形式のデータを投入し、そのデータをwatsonx.dataのPrestoエンジンで取得してみる
- watsonx.dataのPrestoエンジンに対して、Kafkaから取得したデータの列定義に関する構成情報ファイルを適用してみる
1. Kafkaにトピックを作成する
Kafka は IBM Cloud のカタログから Event Streams
(Kafkaが組み込まれている) を選択し、作成する。(Kafka
で検索するとEvent Streams
がヒットする)
次にリソース・リストで、統合
のカテゴリに作成されたEvent Streams
をクリックして開く。
左のメニューからトピック
をクリックし、右にあるトピックの作成
をクリックする。
トピック名
にtpch.customer
を入力し、次へ
をクリックする。
今回、パーティション
は1
のままで、次へ
をクリックする。
今回、メッセージ保存
期間は1日
のままとして、トピックの作成
をクリックする。
トピックが作成されると、以下のようにリストされる。
2. watsonx.dataにKafkaを組み込む
Event Streams
の左のメニューからサービス資格情報
をクリックし、右にある新規資格情報
をクリックし、資格情報を作成する。役割
は、管理者
またはライター
を選択する。
資格情報のうち、今回は以下の情報が必要となる。
{
...
"kafka_brokers_sasl": [
"broker-0-34gz6msg5ws2h333.kafka.svc08.us-south.eventstreams.cloud.ibm.com:9093",
"broker-1-34gz6msg5ws2h333.kafka.svc08.us-south.eventstreams.cloud.ibm.com:9093",
"broker-2-34gz6msg5ws2h333.kafka.svc08.us-south.eventstreams.cloud.ibm.com:9093",
"broker-3-34gz6msg5ws2h333.kafka.svc08.us-south.eventstreams.cloud.ibm.com:9093",
"broker-4-34gz6msg5ws2h333.kafka.svc08.us-south.eventstreams.cloud.ibm.com:9093",
"broker-5-34gz6msg5ws2h333.kafka.svc08.us-south.eventstreams.cloud.ibm.com:9093"
],
...
"password": "xxx",
"user": "token"
}
次に、watsonx.dataのインフラストラクチャー・マネージャー
画面の右にあるコンポーネントの追加
をクリックし、プルダウンされたリストにあるデータベースの追加
をクリックする。
以下の情報を入力して、Register
をクリックする。
項目 | 値 |
---|---|
データベース・タイプ | Apache Kafka |
表示名 | kafka01 |
ホスト名 | サービス資格情報に記載されているホスト名 |
ポート | サービス資格情報に記載されているポート名 |
SASL connection | 有効にする |
ユーザー名 | サービス資格情報に記載されているユーザー名 |
API key/Password | サービス資格情報に記載されているパスワード |
Topics | tpch.customer |
カタログ名 | kafka1 |
結果として、Kafkaの情報が以下の右側に追加される。
下図のように、kafka1
カタログの上にある関連付けの管理
をクリックする。
関連付けの管理
でpresto-01
のチェックボックスをチェックし、保存してエンジンを再始動する
をクリックする。
しばらく待つとPrestoエンジンの再始動が終了し、以下のようになる。
3. PCからKafkaにJSON形式のデータを投入してみる
KafkaをPCにインストールし、Kafkaにメッセージを投入できるようにする。Macの場合、以下のコマンドでKafkaを組み込むことができる。
$ brew install kafka
tpch.customerのデータとして、以下のJSONデータを使用する。
{"rowNumber":1,"customerKey":1,"name":"Customer#000000001","address":"IVhzIApeRb ot,c,E","nationKey":15,"phone":"25-989-741-2988","accountBalance":711.56,"marketSegment":"BUILDING","comment":"to the even, regular platelets. regular, ironic epitaphs nag e"}
{"rowNumber":2,"customerKey":2,"name":"Customer#000000002","address":"XSTf4,NCwDVaWNe6tEgvwfmRchLXak","nationKey":13,"phone":"23-768-687-3665","accountBalance":121.65,"marketSegment":"AUTOMOBILE","comment":"l accounts. blithely ironic theodolites integrate boldly: caref"}
{"rowNumber":3,"customerKey":3,"name":"Customer#000000003","address":"MG9kdTD2WBHm","nationKey":1,"phone":"11-719-748-3364","accountBalance":7498.12,"marketSegment":"AUTOMOBILE","comment":" deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov"}
{"rowNumber":4,"customerKey":4,"name":"Customer#000000004","address":"XxVSJsLAGtn","nationKey":4,"phone":"14-128-190-5944","accountBalance":2866.83,"marketSegment":"MACHINERY","comment":" requests. final, regular ideas sleep final accou"}
{"rowNumber":5,"customerKey":5,"name":"Customer#000000005","address":"KvpyuHCplrB84WgAiGV6sYpZq7Tj","nationKey":3,"phone":"13-750-942-6364","accountBalance":794.47,"marketSegment":"HOUSEHOLD","comment":"n accounts will have to unwind. foxes cajole accor"}
最初の3つをKafkaのtpch.customer
トピックに投入してみる。
まずは、以下のような認証情報ファイルを作成する。
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password="xxx";
sasl.mechanism=PLAIN
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS
以下のコマンドで最初の3つをKafkaのtpch.customer
トピックに投入する。
$ kafka-console-producer --producer.config kafka.properties --topic tpch.customer --bootstrap-server broker-0-34gz6msg5ws2h333.kafka.svc08.us-south.eventstreams.cloud.ibm.com:9093
>{"rowNumber":1,"customerKey":1,"name":"Customer#000000001","address":"IVhzIApeRb ot,c,E","nationKey":15,"phone":"25-989-741-2988","accountBalance":711.56,"marketSegment":"BUILDING","comment":"to the even, regular platelets. regular, ironic epitaphs nag e"}
>{"rowNumber":2,"customerKey":2,"name":"Customer#000000002","address":"XSTf4,NCwDVaWNe6tEgvwfmRchLXak","nationKey":13,"phone":"23-768-687-3665","accountBalance":121.65,"marketSegment":"AUTOMOBILE","comment":"l accounts. blithely ironic theodolites integrate boldly: caref"}
>{"rowNumber":3,"customerKey":3,"name":"Customer#000000003","address":"MG9kdTD2WBHm","nationKey":1,"phone":"11-719-748-3364","accountBalance":7498.12,"marketSegment":"AUTOMOBILE","comment":" deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov"}
>^C
4. watsonx.dataのPrestoエンジンでSQL文を使ってKafkaに投入されたデータを取得してみる
まず照会ワークスペース
で、追加したKafkaのトピックに対応するwatsonx.dataのテーブルの列を確認してみる。下図に表示されている列のうち、_message
列にKafkaの該当トピックに投入されたメッセージがセットされ、_timestamp
列に投入した時間がセットされる。
それでは、投入した時間順にJSONデータを取得してみる。
以下のSQL文を実行する。
select
_timestamp,
_message
from kafka1.tpch.customer
order by _timestamp;
(*) KafkaはパーティションごとにFIFOでメッセージが取得できるので、複数のパーティションに跨ったメッセージを、投入した時間順に取得したい場合はorder by _timestamp
を付ける。今回はパーティションを1にしているので、order by _timestamp
と指定しなくても時間順にメッセージが取得できる。
結果として、以下のように3件のJSONデータが取得できる。
5. 追加でPCからKafkaにJSON形式のデータを投入し、そのデータをwatsonx.dataのPrestoエンジンで取得してみる
さらに残りの2件のJSONデータをコマンドから投入してみる。
$ kafka-console-producer --producer.config kafka.properties --topic tpch.customer --bootstrap-server broker-0-34gz6msg5ws2h333.kafka.svc08.us-south.eventstreams.cloud.ibm.com:9093
>{"rowNumber":4,"customerKey":4,"name":"Customer#000000004","address":"XxVSJsLAGtn","nationKey":4,"phone":"14-128-190-5944","accountBalance":2866.83,"marketSegment":"MACHINERY","comment":" requests. final, regular ideas sleep final accou"}
>{"rowNumber":5,"customerKey":5,"name":"Customer#000000005","address":"KvpyuHCplrB84WgAiGV6sYpZq7Tj","nationKey":3,"phone":"13-750-942-6364","accountBalance":794.47,"marketSegment":"HOUSEHOLD","comment":"n accounts will have to unwind. foxes cajole accor"}
>^C
先ほど実行したSQL文を再度実行すると、以下のように5件のJSONデータが取得できる。
6. watsonx.dataのPrestoエンジンに対して、Kafkaから取得したデータの列定義に関する構成情報ファイルを適用してみる
オープンソースのPrestoのKafka Connectorについての情報が記載されている Kafka Connector Tutorial の "Step 6: Map all the values from the topic message onto columns" を参照すると、etc/kafka/tpch.customer.json
に以下の情報を保存すると、JSONデータの各キーの値を列にマッピングすることができると記載されている。
(この定義は、今回使用したtpch.customerデータに合わせて定義されている)
{
"tableName": "customer",
"schemaName": "tpch",
"topicName": "tpch.customer",
"key": {
"dataFormat": "raw",
"fields": [
{
"name": "kafka_key",
"dataFormat": "LONG",
"type": "BIGINT",
"hidden": "false"
}
]
},
"message": {
"dataFormat": "json",
"fields": [
{
"name": "row_number",
"mapping": "rowNumber",
"type": "BIGINT"
},
{
"name": "customer_key",
"mapping": "customerKey",
"type": "BIGINT"
},
{
"name": "name",
"mapping": "name",
"type": "VARCHAR"
},
{
"name": "address",
"mapping": "address",
"type": "VARCHAR"
},
{
"name": "nation_key",
"mapping": "nationKey",
"type": "BIGINT"
},
{
"name": "phone",
"mapping": "phone",
"type": "VARCHAR"
},
{
"name": "account_balance",
"mapping": "accountBalance",
"type": "DOUBLE"
},
{
"name": "market_segment",
"mapping": "marketSegment",
"type": "VARCHAR"
},
{
"name": "comment",
"mapping": "comment",
"type": "VARCHAR"
}
]
}
}
これをwatsonx.dataのPrestoエンジンに適用してみる。
手順としては、
- 上記ファイルを用意する
- PrestoエンジンのPodに上記ファイルをコピーする
- Prestoエンジンを再起動するために、一旦、KafkaをPrestoエンジンから切り離し、その後、再度Prestoエンジンに関連付ける
まず、上記ファイルtpch.customer.json
ファイルを用意しておく。
次に、PrestoエンジンのPodに入り、コピーする場所を特定する。
# oc login --token=xxx --server=https://c115-e.jp-tok.containers.cloud.ibm.com:31009
Logged into "https://c115-e.jp-tok.containers.cloud.ibm.com:31009" as "IAM#mohkawa@jp.ibm.com" using the token provided.
...
Using project "cpd-instance".
# oc get pods | grep presto
ibm-lh-lakehouse-presto-01-single-blue-0 1/1 Running 0 9h
# oc exec -it ibm-lh-lakehouse-presto-01-single-blue-0 -- /bin/bash
bash-4.4$ cd /opt/presto/etc
bash-4.4$ ls
access-control.properties cert jmx.properties log4j-events.properties query-governance.properties
catalog config.properties jvm.config node.properties setvendor.zip
catalog-secret.properties event-listener.properties liblts-masking.so password-authenticator.properties system-schema.properties
catalog-table-exclusions.properties jmx-exporter-config.yaml log.properties preload_secrets.env
(*) oc login
コマンドは、OpenShift Web コンソール
画面右にあるユーザー名をクリックして得られるリストからログインコマンドのコピー
をクリック、次にDisplay Token
をクリックすると表示される。
調査した結果、/opt/presto/etc
の配下に上記ファイルをコピーすれば良いことがわかった。etc
ディレクトリの配下にkafka
ディレクトリがないので作成する。
bash-4.4$ pwd
/opt/presto/etc
bash-4.4$ mkdir kafka
bash-4.4$ ls /opt/presto/etc/kafka
bash-4.4$ exit
作成したtpch.customer.json
を上記にコピーする。
# oc cp tpch.customer.json ibm-lh-lakehouse-presto-01-single-blue-0:/opt/presto-server-0.282/etc/kafka/.
念の為、コピーされたことを確認する。
# oc exec -it ibm-lh-lakehouse-presto-01-single-blue-0 -- /bin/bash
bash-4.4$ ls /opt/presto/etc/kafka
tpch.customer.json
bash-4.4$ exit
次に、Prestoエンジンを再起動させて上記の情報を反映させるために、PrestoエンジンからKafkaを一旦切り離し、その後、再度、Kafkaを関連付ける。
Prestoエンジンの再起動が終了したら、照会ワークスペースで、Kafkaのトピックに対応するwatsonx.dataのテーブルの列を確認してみる。
以下のように列情報が変わっている。
ここで、以下のSQL文を発行してみる。
select
_timestamp,
row_number,
customer_key,
name,
address,
nation_key,
phone,
account_balance,
market_segment,
comment
from kafka1.tpch.customer
order by _timestamp;
結果として、以下のように、JSONデータの各キーの値が、別々の列の値として取得できる。
JSONデータの各キーの値が、別々の列の値として扱えるようになったので、それらの列を使って条件指定や他のテーブルとの結合が可能となる。
例えば以下のように、IBM Cloud Object Storage (ICOS) 上に作成したtpch.nation
テーブルと結合し、さらにrow_number
列の条件を付加したSQL文を実行してみる。
select
_timestamp,
row_number,
customer_key,
customer.name,
address,
nation.name nation,
phone,
account_balance,
market_segment,
customer.comment
from kafka1.tpch.customer,icos1.tpch.nation
where customer.nation_key=nation.nationkey and row_number > 3
order by _timestamp;
結果として、以下が得られる。
(nation_key
列がnation
列に置き換わり、国名が表示されている)
補足:
ICOSのicos1.tpch.nation
テーブルは、以下のSQL文で作成した。
(tpch.tiny.nation
テーブルは watsonx.data 導入時に構成されている)
create table icos1.tpch.nation
with (
format='PARQUET'
)
as select * from tpch.tiny.nation;