LoginSignup
3
1

More than 5 years have passed since last update.

kafka-docker で作った kafka クラスタに Elixir クライアントから接続する

Posted at

kafka-docker で作った kafka クラスタに Elixir クライアントから接続する - 駄文型より。

kafka-docker でローカルに kafka クラスタを構築する - Qiita の続き。kafkaex/kafka_exという Kafka クライアントで Producer を作ってみます。

ライブラリの取得

mix.exs を編集して mix deps.get するだけ。

参考: Introduction to Mix - Elixir

mix.exs
defmodule ProducerSampleEx.Mixfile do
  # ...

  def application do
    [
      applications: [
        :kafka_ex,
        :snappy
      ]
    ]
  end

  defp deps do
    [
      {:kafka_ex, "~> 0.6.5"},
      {:snappy, git: "https://github.com/fdmanana/snappy-erlang-nif"}
    ]
  end
end
shell
$ mix deps.get

設定

まず、 https://idcf-developers.qiita.com/k_kimuraidcf/items/3bcf973f47439efceaf8 で作った Kafka コンテナのポートを確認。今回は 32776 32778

shell
$ docker-compose ps
                Name                              Command               State                          Ports
-----------------------------------------------------------------------------------------------------------------------------------
66f23109d78a_kafkadocker_zookeeper_1   /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:32777->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
kafkadocker_kafka_1                    start-kafka.sh                   Up      0.0.0.0:32778->9092/tcp
kafkadocker_kafka_2                    start-kafka.sh                   Up      0.0.0.0:32776->9092/tcp

次にKafkaEx.Config – kafka_ex を参考に config.exs を書く。以下は最低限の設定。

config.exs
config :kafka_ex,
  brokers: [
    {"192.168.145.65", 32776}, # {"hostname", port}
    {"192.168.145.65", 32778}
  ],
  consumer_group: :no_consumer_group,
  use_ssl: false

iex で確認

iex -S mix で立ち上げる。設定がおかしいとここでエラーがでる。

shell
$ iex -S mix
Erlang/OTP 19 [erts-8.3] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]


11:53:23.055 [debug] Succesfully connected to broker "192.168.145.65":32776

11:53:23.056 [debug] Succesfully connected to broker "192.168.145.65":32778

11:53:23.075 [debug] Establishing connection to broker 1009: "192.168.145.65" on port 32778

11:53:23.076 [debug] Succesfully connected to broker "192.168.145.65":32778

11:53:23.076 [debug] Establishing connection to broker 1010: "192.168.145.65" on port 32776

11:53:23.077 [debug] Succesfully connected to broker "192.168.145.65":32776
Interactive Elixir (1.4.2) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)>

つながった! metadata/1 で確認できる。

iex
iex(1)> KafkaEx.metadata(topic: "topic")
%KafkaEx.Protocol.Metadata.Response{brokers: [%KafkaEx.Protocol.Metadata.Broker{host: "192.168.145.65",
   node_id: 1009, port: 32778, socket: nil},
  %KafkaEx.Protocol.Metadata.Broker{host: "192.168.145.65", node_id: 1010,
   port: 32776, socket: nil}],
 topic_metadatas: [%KafkaEx.Protocol.Metadata.TopicMetadata{error_code: :no_error,
   partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :leader_not_available,
     isrs: [], leader: -1, partition_id: 0, replicas: []},
    %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error,
     isrs: [1010], leader: 1010, partition_id: 3, replicas: [1010]},
    %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error,
     isrs: [1010], leader: 1010, partition_id: 1, replicas: [1010]},
    %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error,
     isrs: [1009], leader: 1009, partition_id: 2, replicas: [1009]}],
   topic: "topic"}]}

あとは produce/4 で送るだけ!トピック名、パーティション番号、メッセージを渡す。

iex
iex(2)> KafkaEx.produce("topic", 0, "msg") # opt は省略
:leader_not_available

11:56:41.715 [error] Leader for topic topic is not available

あれ?どうやら、トピック名が topic だとだめそう。別のトピックを指定すると通る。

iex
iex(3)> KafkaEx.produce("new_topic", 0, "msg")
:ok

(追記) 使えなかったのはトピックではなくパーティションでした。リーダーが使用不可になっています。それがなぜかは不明ですが。。。

kafka-console-consumer.sh で確認

前回同様、 Kafka Shell を起動して

shell
$ start-kafka-shell.sh 192.168.145.65 192.168.145.65:32777 # Kafka Shell を起動

kafka-console-consumer.sh を叩く。 KafkaEx.produce/4 でもう一度送り、メッセージが表示されればOK!!!

kafka-shell
$ kafka-console-consumer.sh --topic new_topic --zookeeper $ZK
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by pas
sing [bootstrap-server] instead of [zookeeper].
msg

ハマったところ

  • config.exs の設定
    • brokers{"192.168.145.65", "32776"} と設定してしまう凡ミスを犯していた。
  • トピック名が topic だとだめそう。
  • mix.exs の設定
    • kafka_ex の README.md には mod: {MyApp, []}, という行があったので入れていた。
    • これはモジュールのコールバックの設定を行うためのもの。
    • 入れておくと MyApp.start/2 (今回の場合 ProducerSampleEx.start/2 )を実行しようとしてしまう。
    • 今回は不要なので削除した。
3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1