2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Confluent Platform メモ - (3)Schema Registry簡易テスト

Last updated at Posted at 2021-08-17

はじめに

Confluent Platformを使ってみた時のメモ書きです。
今回はSchema Registryを使用したメッセージ送受信を試してみます。

関連記事

Confluent Platform メモ - (1)環境構築
Confluent Platform メモ - (2)メッセージ送受信簡易テスト
Confluent Platform メモ - (3)Schema Registry簡易テスト

参考情報

Schema Registry Overview
Avro Schema Serializer and Deserializer
Apache Avro™ 1.10.2 Specification
Avro,SchemaRegistryことはじめ

Schema Registryについて

なんだかんだでSchema Registryの動作イメージについては公式ドキュメントの以下の図が分かりやすいと思います。
Schema Registry Overviewからの抜粋
image.png

こんな感じの流れになると思います。

メッセージ送信時:
メッセージフォーマットを示すSchema(Avro, JSONなど)を準備します。
ProducerはSchemaに応じてデータをシリアライズします。
使用したSchemaをSchema Registryに登録すると、使用したSchemaを識別するidが返されるので、idとデータをセットでKafkaのTopicに送信します。
つまり、"このSchemaが使われていますよ"という情報(id)と、シリアライズされたデータがメッセージとして送信されることになります。

メッセージ受信時:
Schema idとシリアライズされたデータがセットで受信されるので、Consumer側ではidに紐づくSchemaをSchem Registryから取得してデータをデシリアライズします。

これで、Producer/Consumer両者共に意図したSchema(メッセージフォーマット)でメッセージを取り扱うことができます。

ちなみ、上の図だとSchema RegistryがSchemaを保持しているような概念図になっていますが、実はSchema Registryは裏ではKafka上のTopicをSchema情報の保存先として使用しているようです。
参考:Schema Registry Overview - Kafka Backend

Kafka is used as Schema Registry storage backend. The special Kafka topic (default _schemas), with a single partition, is used as a highly available write ahead log. All schemas, subject/version and ID metadata, and compatibility settings are appended as messages to this log. A Schema Registry instance therefore both produces and consumes messages under the _schemas topic.

Avro Schemaについて

Avroというのはデータフォーマットについての規約でAvro Schemaでデータの型が定義できるようです。JSON と JSON Schemaの関係と同じようにAvro と Avro Schemaの関係性を捉えるのがよいと思います。
(Avro SchemaはJSON形式で記述されるのでちょっとややこしいですが、AvroデータフォーマットとJSONデータフォーマットは別モノです。)

Schema Registryで使用できるデータタイプ(Schemaのタイプ)はいくつかあり、そのうちの1つとしてAvroがあります。Avroについては Producer, Consumer用にそれぞれ kafka-avro-console-producerkafka-avro-console-consumerというコマンドが提供されているので、これらを使用するとシェルから簡易的にメッセージ送受信の確認ができます。また、これらのコマンドはAvro<=>JSON変換をしてくれるので、実際にデータの投入や確認を行う際はJSON形式で扱うことができます(内部的にはAvroに変換される)。

kafka-avro-console-producer --help
This tool helps to read data from standard input and publish it to Kafka.
Option                                   Description
------                                   -----------
--batch-size <Integer: size>             Number of messages to send in a single
                                           batch if they are not being sent
                                           synchronously. (default: 200)
--bootstrap-server <String: server to    REQUIRED unless --broker-list
  connect to>                              (deprecated) is specified. The server
                                           (s) to connect to. The broker list
                                           string in the form HOST1:PORT1,HOST2:
                                           PORT2.
--broker-list <String: broker-list>      DEPRECATED, use --bootstrap-server
                                           instead; ignored if --bootstrap-
                                           server is specified.  The broker
                                           list string in the form HOST1:PORT1,
                                           HOST2:PORT2.
--compression-codec [String:             The compression codec: either 'none',
  compression-codec]                       'gzip', 'snappy', 'lz4', or 'zstd'.
                                           If specified without value, then it
                                           defaults to 'gzip'
--help                                   Print usage information.
--line-reader <String: reader_class>     The class name of the class to use for
                                           reading lines from standard in. By
                                           default each line is read as a
                                           separate message. (default: kafka.
                                           tools.
                                           ConsoleProducer$LineMessageReader)
--max-block-ms <Long: max block on       The max time that the producer will
  send>                                    block for during a send request
                                           (default: 60000)
--max-memory-bytes <Long: total memory   The total memory used by the producer
  in bytes>                                to buffer records waiting to be sent
                                           to the server. (default: 33554432)
--max-partition-memory-bytes <Long:      The buffer size allocated for a
  memory in bytes per partition>           partition. When records are received
                                           which are smaller than this size the
                                           producer will attempt to
                                           optimistically group them together
                                           until this size is reached.
                                           (default: 16384)
--message-send-max-retries <Integer>     Brokers can fail receiving the message
                                           for multiple reasons, and being
                                           unavailable transiently is just one
                                           of them. This property specifies the
                                           number of retries before the
                                           producer give up and drop this
                                           message. (default: 3)
--metadata-expiry-ms <Long: metadata     The period of time in milliseconds
  expiration interval>                     after which we force a refresh of
                                           metadata even if we haven't seen any
                                           leadership changes. (default: 300000)
--producer-property <String:             A mechanism to pass user-defined
  producer_prop>                           properties in the form key=value to
                                           the producer.
--producer.config <String: config file>  Producer config properties file. Note
                                           that [producer-property] takes
                                           precedence over this config.
--property <String: prop>                A mechanism to pass user-defined
                                           properties in the form key=value to
                                           the message reader. This allows
                                           custom configuration for a user-
                                           defined message reader. Default
                                           properties include:
                                                parse.key=true|false
                                                key.separator=<key.separator>
                                                ignore.error=true|false
--request-required-acks <String:         The required acks of the producer
  request required acks>                   requests (default: 1)
--request-timeout-ms <Integer: request   The ack timeout of the producer
  timeout ms>                              requests. Value must be non-negative
                                           and non-zero (default: 1500)
--retry-backoff-ms <Integer>             Before each retry, the producer
                                           refreshes the metadata of relevant
                                           topics. Since leader election takes
                                           a bit of time, this property
                                           specifies the amount of time that
                                           the producer waits before refreshing
                                           the metadata. (default: 100)
--socket-buffer-size <Integer: size>     The size of the tcp RECV size.
                                           (default: 102400)
--sync                                   If set message send requests to the
                                           brokers are synchronously, one at a
                                           time as they arrive.
--timeout <Integer: timeout_ms>          If set and the producer is running in
                                           asynchronous mode, this gives the
                                           maximum amount of time a message
                                           will queue awaiting sufficient batch
                                           size. The value is given in ms.
                                           (default: 1000)
--topic <String: topic>                  REQUIRED: The topic id to produce
                                           messages to.
--version                                Display Kafka version.
kafka-avro-console-consumer --help
This tool helps to read data from Kafka topics and outputs it to standard output.
Option                                   Description
------                                   -----------
--bootstrap-server <String: server to    REQUIRED: The server(s) to connect to.
  connect to>
--consumer-property <String:             A mechanism to pass user-defined
  consumer_prop>                           properties in the form key=value to
                                           the consumer.
--consumer.config <String: config file>  Consumer config properties file. Note
                                           that [consumer-property] takes
                                           precedence over this config.
--enable-systest-events                  Log lifecycle events of the consumer
                                           in addition to logging consumed
                                           messages. (This is specific for
                                           system tests.)
--formatter <String: class>              The name of a class to use for
                                           formatting kafka messages for
                                           display. (default: kafka.tools.
                                           DefaultMessageFormatter)
--from-beginning                         If the consumer does not already have
                                           an established offset to consume
                                           from, start with the earliest
                                           message present in the log rather
                                           than the latest message.
--group <String: consumer group id>      The consumer group id of the consumer.
--help                                   Print usage information.
--isolation-level <String>               Set to read_committed in order to
                                           filter out transactional messages
                                           which are not committed. Set to
                                           read_uncommitted to read all
                                           messages. (default: read_uncommitted)
--key-deserializer <String:
  deserializer for key>
--max-messages <Integer: num_messages>   The maximum number of messages to
                                           consume before exiting. If not set,
                                           consumption is continual.
--offset <String: consume offset>        The offset id to consume from (a non-
                                           negative number), or 'earliest'
                                           which means from beginning, or
                                           'latest' which means from end
                                           (default: latest)
--partition <Integer: partition>         The partition to consume from.
                                           Consumption starts from the end of
                                           the partition unless '--offset' is
                                           specified.
--property <String: prop>                The properties to initialize the
                                           message formatter. Default
                                           properties include:
                                          print.timestamp=true|false
                                          print.key=true|false
                                          print.offset=true|false
                                          print.partition=true|false
                                          print.headers=true|false
                                          print.value=true|false
                                          key.separator=<key.separator>
                                          line.separator=<line.separator>
                                          headers.separator=<line.separator>
                                          null.literal=<null.literal>
                                          key.deserializer=<key.deserializer>
                                          value.deserializer=<value.
                                           deserializer>
                                          header.deserializer=<header.
                                           deserializer>
                                         Users can also pass in customized
                                           properties for their formatter; more
                                           specifically, users can pass in
                                           properties keyed with 'key.
                                           deserializer.', 'value.
                                           deserializer.' and 'headers.
                                           deserializer.' prefixes to configure
                                           their deserializers.
--skip-message-on-error                  If there is an error when processing a
                                           message, skip it instead of halt.
--timeout-ms <Integer: timeout_ms>       If specified, exit if no message is
                                           available for consumption for the
                                           specified interval.
--topic <String: topic>                  The topic id to consume on.
--value-deserializer <String:
  deserializer for values>
--version                                Display Kafka version.
--whitelist <String: whitelist>          Regular expression specifying
                                           whitelist of topics to include for
                                           consumption.

また、Schema Registryは管理用のREST APIが提供されているのでcurlなどを使用してアクセスすることができるようです。
Schema Registry API Reference

操作例

以下に示されている例に倣って試してみます。
Avro Schema Serializer and Deserializer - Test Drive Avro Schema

スキーマ・タイプの確認

REST APIを使ってSchema Registryに登録されているスキーマ・タイプを確認します。

[root@test12 ~]# curl http://localhost:8081/schemas/types
["JSON","PROTOBUF","AVRO"]

AVROが扱えることが確認できます。
(デフォルトではSchema Registryに上の3つのタイプのプラグインが導入済みのようです。)

Schema指定でのメッセージ送受信例1 / シンプルケース

シンプルなSchemaを使用してメッセージを送受信してみます。

{
  "type": "record",
  "name": "myrecord",
  "fields": [
    {
      "name": "f1",
      "type": "string"
    }
  ]
}

上の例はメッセージ全体はrecord型(Complex typeの一種)として定義されています。"fileds"部分に具体的なフィールドの定義がされます。上の例は"f1"というString型の項目1つだけのメッセージを意味していま。

このschemaに従ったメッセージを t1-a というTopicに対して送信するProducerを起動するには、以下のコマンドを実行します。(Topicが作成されていなければ暗黙的に作成してくれるようです。)

kafka-avro-console-producer --broker-list localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic t1-a \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

kafka-console-producerコマンドと違ってプロンプトが出ないので、続けて送信したいメッセージを入力してEnterを押します。

{"f1": "value1-a"}

Ctrl+Cで抜けます。
次にConsumerを起動してみます。以下のようにTopic名とSchema Registryを指定します。

kafka-avro-console-consumer --bootstrap-server localhost:9092 \
--from-beginning --topic t1-a \
--property schema.registry.url=http://localhost:8081

すると、以下のメッセージが返されます。

{"f1":"value1-a"}

送信したメッセージがそのまま返されているのであまり面白みが無いですが、裏ではシリアライズ/デシリアライズが行われているはずです。

Consumer実行時に、kafka-avro-console-consumerコマンドで--property print.schema.ids=trueを指定してみます。

[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-avro-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic t1-a --property schema.registry.url=http://localhost:8081 --property print.schema.ids=true
{"f1":"value1-a"}       1

上のメッセージはSchema idとして1が紐づいていることが分かります。
curlでSchema Registryからschema id: 1 の情報を取得してみると、以下のように返されました。

[root@test12 ~]# curl http://localhost:8081/schemas/ids/1
{"schema":"{\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}"}

ちなみに、このTopicをkafka-console-consumerコマンドでデシリアライズせずに見てみるとどうなるかという言うと...

[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic t1-a
value1-a

こうなりました。

Schema指定でのメッセージ送受信例2 / キー指定

KafkaのメッセージにはKeyを持たせることもできます。KeyとValueそれぞれSchemaを使用したメッセージを送受信してみます。

Key用のスキーマ

{"type":"string"}

Value用のスキーマ(先の例と同じ)

{
  "type": "record",
  "name": "myrecord",
  "fields": [
    {
      "name": "f1",
      "type": "string"
    }
  ]
}

上のスキーマに従ったKey, Valueを持つメッセージをt2-aというTopicに送信するProducerを起動するには、以下のコマンドを実行します。

kafka-avro-console-producer --broker-list localhost:9092 \
  --topic t2-a \
  --property parse.key=true \
  --property "key.separator= "\
  --property key.schema='{"type":"string"}' \
  --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' \
  --property schema.registry.url=http://localhost:8081

送信メッセージとして以下を入力

"key1" {"f1": "value2-a"}
"AAAAA" {"f1": "TTTTT"}
"TEST01" {"f1": "XXXXXX"}

別シェルでConsumerを実行します。

kafka-avro-console-consumer --from-beginning --topic t2-a \
   --bootstrap-server localhost:9092 \
   --property print.key=true \
   --property print.schema.ids=true \
   --property schema.id.separator=: \
   --property schema.registry.url=http://localhost:8081

すると、以下の結果が返されます。

"key1":2        {"f1":"value2-a"}:1
"AAAAA":2       {"f1":"TTTTT"}:1
"TEST01":2      {"f1":"XXXXXX"}:1

curlでSchema id: 2の情報を見てみます。

[root@test12 ~]# curl http://localhost:8081/schemas/ids/2
{"schema":"\"string\""}

ちなみにこちらもkafka-console-consumerコマンドでデシリアライズせずに見てみると、こんな感じになりました。

[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic t2-a
value2-a

TTTTT

XXXXXX

Schema指定でのメッセージ送受信例3 / 複雑なSchema

もう少し複雑なスキーマを使用してみます。
Avro Schemaの定義方法はこちらを参考に。
Apache Avro™ 1.10.2 Specification

Key用のスキーマ

{"type":"string"}

Value用のスキーマ

avroSchema3.json
{
  "type": "record",
  "name": "userInfo",
    "fields" : [
      {"name": "username", "type": "string"},
      {"name": "age", "type": "int"},
      {"name": "phone", "type": "string"},
      {"name": "address", "type": {
        "type": "record",
        "name" : "user_address",
          "fields" : [
            {"name": "street", "type": "string"},
            {"name": "city", "type": "string"},
            {"name": "country", "type": "string"},
            {"name": "zip", "type": "string"}
          ]
        }
      }
    ]
}  

上のスキーマを使って`test3'というTopicに対してメッセージ送受信を行います。
スキーマが長いので、上のように整形した状態のSchemaをavroSchema3.jsonというファイルで作成しておいて、それをjqコマンドで1行に変換して環境変数にセットしておきます。

[root@test12 ~/Kafka]# avroSchema3=$(jq --compact-output . avroSchema3.json)

[root@test12 ~/Kafka]# echo ${avroSchema3}
{"type":"record","name":"userInfo","fields":[{"name":"username","type":"string"},{"name":"age","type":"int"},{"name":"phone","type":"string"},{"name":"address","type":{"type":"record","name":"user_address","fields":[{"name":"street","type":"string"},{"name":"city","type":"string"},{"name":"country","type":"string"},{"name":"zip","type":"string"}]}}]}

以下コマンドでProducerを起動します。

kafka-avro-console-producer --broker-list localhost:9092 --topic test3 \
  --property parse.key=true \
  --property "key.separator= "\
  --property key.schema='{"type":"string"}' \
  --property value.schema=${avroSchema3} \
  --property schema.registry.url=http://localhost:8081

別シェルでConsumerを起動します。

kafka-avro-console-consumer --from-beginning --topic test3 \
   --bootstrap-server localhost:9092 \
   --property print.key=true \
   --property print.schema.ids=true \
   --property schema.id.separator=: \
   --property schema.registry.url=http://localhost:8081

以下のProducerから以下のデータを送信してみます。

"A001" {"username": "Yamada Ichiro", "age": 31, "phone": "1234-5678", "address":{"street": "1-1", "city": "chiba", "country": "Japan", "zip": "111-2222"}}
"A002" {"username": "Tanaka Jiro", "age": 32, "phone": "2222-2222", "address":{"street": "2-2", "city": "tokyo", "country": "Japan", "zip": "111-2222"}}

Consumerでは以下のデータが受信されました。

"A001":2        {"username":"Yamada Ichiro","age":31,"phone":"1234-5678","address":{"street":"1-1","city":"chiba","country":"Japan","zip":"111-2222"}}:3
"A002":2        {"username":"Tanaka Jiro","age":32,"phone":"2222-2222","address":{"street":"2-2","city":"tokyo","country":"Japan","zip":"111-2222"}}:3

kafka-console-consumerコマンドを使ってデシリアライズを行わずにTopictest3のメッセージを受信してみるとこんな感じになりました。

[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic test3
Yamada Ichiro>1234-56781-1
chiba
Japan111-2222
Tanaka Jiro@2222-22222-2
tokyo
Japan111-2222

恐らくAvroはバイナリでデータ保持していると思われ、改行は正しく表示されていないのではないかと思います。これで、内部的にはきちんとSchemaが使われてシリアライズ/デシリアライズが行われているであろうということが分かります。

ちなみに、Avro用のProducer/Consumerに戻ってSchemaでの定義に対してフィールドが足りないデータを投入するとProducerでエラーになりました。

送信

"A003" {"username": "Suzuki Saburo"}

Producerがエラーで終了

org.apache.kafka.common.errors.SerializationException: Error deserializing json {"username": "Suzuki Saburo"} to Avro of schema {"type":"record","name":"userInfo","fields":[{"name":"username","type":"string"},{"name":"age","type":"int"},{"name":"phone","type":"string"},{"name":"address","type":{"type":"record","name":"user_address","fields":[{"name":"street","type":"string"},{"name":"city","type":"string"},{"name":"country","type":"string"},{"name":"zip","type":"string"}]}}]}
Caused by: org.apache.avro.AvroTypeException: Expected int. Got END_OBJECT
        at org.apache.avro.io.JsonDecoder.error(JsonDecoder.java:511)
        at org.apache.avro.io.JsonDecoder.readInt(JsonDecoder.java:165)
        at org.apache.avro.io.ValidatingDecoder.readInt(ValidatingDecoder.java:82)
        at org.apache.avro.generic.GenericDatumReader.readInt(GenericDatumReader.java:551)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:195)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils.toObject(AvroSchemaUtils.java:190)
        at io.confluent.kafka.formatter.AvroMessageReader.readFrom(AvroMessageReader.java:121)
        at io.confluent.kafka.formatter.SchemaMessageReader.readMessage(SchemaMessageReader.java:344)
        at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:51)
        at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)

スキーマ情報の整形

先にも示したように、スキーマの情報はSchema RegistryからRESTで取得することができます。
例えば、以下のようにスキーマ情報を取得したとします。(Schema id: 5 の情報をGETしている例)

[root@test12 ~]# curl http://localhost:8081/schemas/ids/5
{"schema":"{\"type\":\"record\",\"name\":\"FILEA\",\"namespace\":\"value.SOURCEDB.VSAM.TEST01\",\"fields\":[{\"name\":\"STAT\",\"type\":[{\"type\":\"string\",\"logicalType\":\"CHARACTER\",\"dbColumnName\":\"STAT\",\"length\":1},\"null\"],\"doc\":\"\",\"default\":\"\"},{\"name\":\"NUMB\",\"type\":[{\"type\":\"string\",\"logicalType\":\"CHARACTER\",\"dbColumnName\":\"NUMB\",\"length\":6},\"null\"],\"doc\":\"\",\"default\":\"\"},{\"name\":\"NAME\",\"type\":[{\"type\":\"string\",\"logicalType\":\"CHARACTER\",\"dbColumnName\":\"NAME\",\"length\":20},\"null\"],\"doc\":\"\",\"default\":\"\"},{\"name\":\"ADDRX\",\"type\":[{\"type\":\"string\",\"logicalType\":\"CHARACTER\",\"dbColumnName\":\"ADDRX\",\"length\":20},\"null\"],\"doc\":\"\",\"default\":\"\"},{\"name\":\"PHONE\",\"type\":[{\"type\":\"string\",\"logicalType\":\"CHARACTER\",\"dbColumnName\":\"PHONE\",\"length\":8},\"null\"],\"doc\":\"\",\"default\":\"\"},{\"name\":\"DATEX\",\"type\":[{\"type\":\"string\",\"logicalType\":\"CHARACTER\",\"dbColumnName\":\"DATEX\",\"length\":8},\"null\"],\"doc\":\"\",\"default\":\"\"},{\"name\":\"AMOUNT\",\"type\":[{\"type\":\"string\",\"logicalType\":\"CHARACTER\",\"dbColumnName\":\"AMOUNT\",\"length\":8},\"null\"],\"doc\":\"\",\"default\":\"\"},{\"name\":\"COMMENT\",\"type\":[{\"type\":\"string\",\"logicalType\":\"CHARACTER\",\"dbColumnName\":\"COMMENT\",\"length\":9},\"null\"],\"doc\":\"\",\"default\":\"\"}]}"}

このように、結果は JSONフォーマットで"schema" という要素に1行の文字列でスキーマ情報が返されます。こんな感じで=> {"schema":"xxx"}。 このxxx部分がJSON形式で表された求めるスキーマなのですが文字列としてJSONデータが1要素の値として返されるので、ダブルクォーテーションがエスケープされています(バックスラッシュ付き)。この例のように少し複雑なスキーマが返されると非常に見にくいですね。
そこで、以下の様にコマンドを発行すると、スキーマ情報だけをきれいに整形して出力することができます。

[root@test12 ~]# curl -s  http://localhost:8081/schemas/ids/5 | jq -r .schema | jq .
{
  "type": "record",
  "name": "FILEA",
  "namespace": "value.SOURCEDB.VSAM.TEST01",
  "fields": [
    {
      "name": "STAT",
      "type": [
        {
          "type": "string",
          "logicalType": "CHARACTER",
          "dbColumnName": "STAT",
          "length": 1
        },
        "null"
      ],
      "doc": "",
      "default": ""
    },
    {
      "name": "NUMB",
      "type": [
        {
          "type": "string",
          "logicalType": "CHARACTER",
          "dbColumnName": "NUMB",
          "length": 6
        },
        "null"
      ],
      "doc": "",
      "default": ""
    },
    {
      "name": "NAME",
      "type": [
        {
          "type": "string",
          "logicalType": "CHARACTER",
          "dbColumnName": "NAME",
          "length": 20
        },
        "null"
      ],
      "doc": "",
      "default": ""
    },
    {
      "name": "ADDRX",
      "type": [
        {
          "type": "string",
          "logicalType": "CHARACTER",
          "dbColumnName": "ADDRX",
          "length": 20
        },
        "null"
      ],
      "doc": "",
      "default": ""
    },
    {
      "name": "PHONE",
      "type": [
        {
          "type": "string",
          "logicalType": "CHARACTER",
          "dbColumnName": "PHONE",
          "length": 8
        },
        "null"
      ],
      "doc": "",
      "default": ""
    },
    {
      "name": "DATEX",
      "type": [
        {
          "type": "string",
          "logicalType": "CHARACTER",
          "dbColumnName": "DATEX",
          "length": 8
        },
        "null"
      ],
      "doc": "",
      "default": ""
    },
    {
      "name": "AMOUNT",
      "type": [
        {
          "type": "string",
          "logicalType": "CHARACTER",
          "dbColumnName": "AMOUNT",
          "length": 8
        },
        "null"
      ],
      "doc": "",
      "default": ""
    },
    {
      "name": "COMMENT",
      "type": [
        {
          "type": "string",
          "logicalType": "CHARACTER",
          "dbColumnName": "COMMENT",
          "length": 9
        },
        "null"
      ],
      "doc": "",
      "default": ""
    }
  ]
}

簡単に上のコマンド補足しておきます。
curlで取ってきた情報をjqコマンドで必要な部分を抽出しています。やっているのは以下の通りです。

  • curlで情報取得
  • jqでschema要素の値のみ抽出
  • schema要素の値を改めてjqで整形して表示

参考

スクリプト

listTopic.sh
#!/bin/bash

sudo /opt/confluent-6.2.0/bin/kafka-topics --list --bootstrap-server localhost:9092
getMessage.sh
#!/bin/bash

if [ $# -lt 1 ] ; then
        echo "Usage: "
        echo "  " $0 " <topicName>"
        exit 0
fi

topicName=$1

sudo /opt/confluent-6.2.0/bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic ${topicName} --property schema.registry.url=http://localhost:8081

#sudo /opt/confluent-6.2.0/bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic ${topicName} --property schema.registry.url=http://localhost:8081 --property print.schema.ids=true
getSchema.sh
#!/bin/bash

if [ $# -lt 1 ] ; then
        echo "Usage: "
        echo "  " $0 " <schemaID>"
        exit 0
fi

schemaID=$1

curl --silent  http://localhost:8081/schemas/ids/${schemaID} | jq -r .schema | jq .

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?