kafka-docker で作った kafka クラスタに Elixir クライアントから接続する - 駄文型より。
kafka-docker でローカルに kafka クラスタを構築する - Qiita の続き。kafkaex/kafka_exという Kafka クライアントで Producer を作ってみます。
ライブラリの取得
mix.exs
を編集して mix deps.get
するだけ。
参考: Introduction to Mix - Elixir
defmodule ProducerSampleEx.Mixfile do
# ...
def application do
[
applications: [
:kafka_ex,
:snappy
]
]
end
defp deps do
[
{:kafka_ex, "~> 0.6.5"},
{:snappy, git: "https://github.com/fdmanana/snappy-erlang-nif"}
]
end
end
$ mix deps.get
設定
まず、 https://idcf-developers.qiita.com/k_kimuraidcf/items/3bcf973f47439efceaf8 で作った Kafka コンテナのポートを確認。今回は 32776
32778
。
$ docker-compose ps
Name Command State Ports
-----------------------------------------------------------------------------------------------------------------------------------
66f23109d78a_kafkadocker_zookeeper_1 /bin/sh -c /usr/sbin/sshd ... Up 0.0.0.0:32777->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
kafkadocker_kafka_1 start-kafka.sh Up 0.0.0.0:32778->9092/tcp
kafkadocker_kafka_2 start-kafka.sh Up 0.0.0.0:32776->9092/tcp
次にKafkaEx.Config – kafka_ex を参考に config.exs
を書く。以下は最低限の設定。
config :kafka_ex,
brokers: [
{"192.168.145.65", 32776}, # {"hostname", port}
{"192.168.145.65", 32778}
],
consumer_group: :no_consumer_group,
use_ssl: false
iex で確認
iex -S mix
で立ち上げる。設定がおかしいとここでエラーがでる。
$ iex -S mix
Erlang/OTP 19 [erts-8.3] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]
11:53:23.055 [debug] Succesfully connected to broker "192.168.145.65":32776
11:53:23.056 [debug] Succesfully connected to broker "192.168.145.65":32778
11:53:23.075 [debug] Establishing connection to broker 1009: "192.168.145.65" on port 32778
11:53:23.076 [debug] Succesfully connected to broker "192.168.145.65":32778
11:53:23.076 [debug] Establishing connection to broker 1010: "192.168.145.65" on port 32776
11:53:23.077 [debug] Succesfully connected to broker "192.168.145.65":32776
Interactive Elixir (1.4.2) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)>
つながった! metadata/1
で確認できる。
iex(1)> KafkaEx.metadata(topic: "topic")
%KafkaEx.Protocol.Metadata.Response{brokers: [%KafkaEx.Protocol.Metadata.Broker{host: "192.168.145.65",
node_id: 1009, port: 32778, socket: nil},
%KafkaEx.Protocol.Metadata.Broker{host: "192.168.145.65", node_id: 1010,
port: 32776, socket: nil}],
topic_metadatas: [%KafkaEx.Protocol.Metadata.TopicMetadata{error_code: :no_error,
partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :leader_not_available,
isrs: [], leader: -1, partition_id: 0, replicas: []},
%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error,
isrs: [1010], leader: 1010, partition_id: 3, replicas: [1010]},
%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error,
isrs: [1010], leader: 1010, partition_id: 1, replicas: [1010]},
%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error,
isrs: [1009], leader: 1009, partition_id: 2, replicas: [1009]}],
topic: "topic"}]}
あとは produce/4
で送るだけ!トピック名、パーティション番号、メッセージを渡す。
iex(2)> KafkaEx.produce("topic", 0, "msg") # opt は省略
:leader_not_available
11:56:41.715 [error] Leader for topic topic is not available
あれ?どうやら、トピック名が topic
だとだめそう。別のトピックを指定すると通る。
iex(3)> KafkaEx.produce("new_topic", 0, "msg")
:ok
(追記) 使えなかったのはトピックではなくパーティションでした。リーダーが使用不可になっています。それがなぜかは不明ですが。。。
kafka-console-consumer.sh
で確認
前回同様、 Kafka Shell を起動して
$ start-kafka-shell.sh 192.168.145.65 192.168.145.65:32777 # Kafka Shell を起動
kafka-console-consumer.sh
を叩く。 KafkaEx.produce/4
でもう一度送り、メッセージが表示されればOK!!!
$ kafka-console-consumer.sh --topic new_topic --zookeeper $ZK
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by pas
sing [bootstrap-server] instead of [zookeeper].
msg
ハマったところ
-
config.exs
の設定-
brokers
を{"192.168.145.65", "32776"}
と設定してしまう凡ミスを犯していた。
-
- トピック名が
topic
だとだめそう。 -
mix.exs
の設定- kafka_ex の README.md には
mod: {MyApp, []},
という行があったので入れていた。 - これはモジュールのコールバックの設定を行うためのもの。
- 入れておくと
MyApp.start/2
(今回の場合ProducerSampleEx.start/2
)を実行しようとしてしまう。 - 今回は不要なので削除した。
- kafka_ex の README.md には