10
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

KafkaEx小技集

Last updated at Posted at 2018-12-23

はじめに

本記事は NTTコミュニケーションズ Advent Calendar 2018 の23日目です。

今月あるサービスをリリースしましたが、そのバックエンドで使われているノウハウの一部を一般化してご紹介します。

具体的には、Elixir から Apache Kafka を利用する場合、通常は KafkaEx を利用すると思われますが、そのTipsのご紹介です。

前提

Kafkaについて

本記事では、Kafka自体の解説はしません。また、登場するKafkaのキーワードは以下の通りです。本記事では解説をしませんので、必要に応じて調べて下さい。

  • Broker
  • Producer
  • Consumer
  • Topic
  • Partition

想定環境

本記事が想定する実行環境は以下の通りです。

  • Ubuntu 16.04 LTS
  • Erlang/OTP 20.3
    • anyenv, erlenv でインストール
    • configureオプションは以下の通り。
$ ./configure \
--prefix=$HOME/.anyenv/envs/erlenv/releases/20.3 \
--enable-dynamic-ssl-lib \
--with-ssl=/usr/lib/ssl \
--enable-shared-zlib \
--enable-threads \
--enable-dirty-schedulers \
--enable-smp-support \
--enable-kernel-poll \
--enable-sctp \
--enable-hipe \
--enable-m64-build \
--with-termcap \
--enable-sharing-preserving \
--with-javac
  • Elixir 1.7.4
    • anyenv, exenv でインストール

想定ユースケース

本記事が想定するKafkaのユースケースは以下の通りです。

  • Phoenix Framework で開発されたWebアプリケーションがProducerとしてKafkaへメッセージを送信
  • Kafkaはクラスタを組んでおり、Brokerは複数存在
  • 並行性を高めるため、Partitionは複数作成されている
  • 他の言語で開発されたプログラムがConsumerとしてメッセージを受信して処理

ConsumerとしてのKafkaExの利用は、個人的にメリットを感じないため、本記事の対象外とさせて頂きます。

KafkaExについて

Elixir製のKafkaクライアントライブラリです。ProducerとしてもConsumerとしても利用できます。

内部的にはGenServerによりプロセス化されています。これにより、 mix phx.server を実行したらすぐにBrokerへ接続しセッションを張るため、コネクション周りの処理を意識しなくて良いのが嬉しいところです。

なお、このために必要な設定があります。mixではおなじみの設定ですが、 mix.exs に以下のように atom :kafka_ex を追加しましょう。

mix.exs
  def application do
    [
      mod: {MyApp, []},
      extra_applications: [
        :logger,
        :kafka_ex,
      ]
    ]
  end

一方、他の言語で利用できるKafkaクライアントライブラリと比較すると、 ユーザ側がPartitonを意識した実装をしなければならないのが不便 であると個人的には感じます。本記事では、そこも含めたTipsをご紹介します。

小技集

コンフィグのパラメタを環境変数で定義する小技

Phoenix Framework で KafkaEx のコンフィグを行う場合、基本的に deps/kafka_ex/config/config.exs の内容を、Phoenix Framework の config/{config,dev,prod,test}.exs にコピーして、編集するだけです。

ここでは、環境変数を使ってパラメタを定義する設定例を示します。

config.exs
config :kafka_ex,
  brokers: Enum.zip(
    ~r/([A-Za-z0-9._-]+):([0-9]{1,5})/
      |> Regex.scan("KAFKA_BROKERS" |> System.get_env)
      |> Enum.map(fn x -> List.pop_at(x, 1) |> elem(0) end),
    ~r/([A-Za-z0-9._-]+):([0-9]{1,5})/
      |> Regex.scan("KAFKA_BROKERS" |> System.get_env)
      |> Enum.map(fn x -> List.pop_at(x, 2) |> elem(0) |> String.to_integer end)
  ),
  consumer_group: "KAFKA_CONSUMER_GROUP"
    |> System.get_env
    || :inet.gethostname() |> elem(1) |> List.to_string,
  disable_default_worker: ("KAFKA_DISABLE_DEFAULT_WORKER" |> System.get_env || "false")
    |> Code.eval_string
    |> elem(0),
  sync_timeout: ("KAFKA_SYNC_TIMEOUT" |> System.get_env || "3000")
    |> String.to_integer,
  max_restarts: ("KAFKA_MAX_RESTARTS" |> System.get_env || "10")
    |> String.to_integer,
  max_seconds: ("KAFKA_MAX_SECONDS" |> System.get_env || "60")
    |> String.to_integer,
  commit_interval: ("KAFKA_COMMIT_INTERVAL" |> System.get_env || "5000")
    |> String.to_integer,
  commit_threshold: ("KAFKA_COMMIT_THRESHOLD" |> System.get_env || "100")
    |> String.to_integer,
  use_ssl: ("KAFKA_USE_SSL" |> System.get_env || "false")
    |> Code.eval_string
    |> elem(0),
  ssl_options: [
    cacertfile: "KAFKA_SSL_OPTIONS_CACERTFILE"
      |> System.get_env
      || System.cwd <> "/ssl/ca-cert",
    certfile: "KAFKA_SSL_OPTIONS_CERTFILE"
      |> System.get_env
      || System.cwd <> "/ssl/cert.pem",
    keyfile: "KAFKA_SSL_OPTIONS_KEYFILE"
      |> System.get_env
      || System.cwd <> "/ssl/key.pem",
  ],
  kafka_version: "KAFKA_VERSION"
    |> System.get_env
    || "0.9.0"
  • KafkaEx側でデフォルト値を持っているパラメタについては、環境変数での指定が無ければ、デフォルト値を設定するようにしました。

  • System.get_env/1 で取得できる値は全てBinaryであることに留意が必要です。

  • 例えば、Brokerに関して環境変数を以下の通り指定した場合、

export KAFKA_BROKERS="10.0.10.2:9092,10.0.11.2:9092,10.0.12.2:9092"

ElixirではTupleのListとして保持できるようにしました。

[{"10.0.10.2", 9092}, {"10.0.11.2", 9092}, {"10.0.12.2", 9092}]

ただ、KafkaExではmetadataとして各種情報を保持しており、Brokerの情報もこちらにあるため、以後各コード内では基本的にmetadataを利用します。(metadataのフォーマットについては KafkaExのREADME を参照。)

Brokerの正常性をチェックする小技

Brokerを指定してチェック

def check_broker_health(broker) when is_map(broker) do
  host = broker |> Map.get(:host) |> String.to_charlist
  port = broker |> Map.get(:port)
  {:ok, _} = :gen_tcp.connect(host, port, [])
end

BrokerのTCPポートへの接続を試行し、 :gen_tcp.connect/3:ok を返すことを確認しています。

Topicを指定してチェック

内部で先述の check_broker_health/1 を利用します。

def check_broker_health(topic) when is_binary(topic) do
  KafkaEx.metadata(topic: topic)
  |> Map.get(:brokers)
  |> Enum.map(fn x -> check_broker_health(x) end)
end

任意のTopicを指定するだけで、Topicに割り当てられている全Brokerに対して正常性確認を実行します。

Topicの正常性をチェックする小技

Topic全体のステータスを取得

def get_topic_status(topic) when is_binary(topic) do
  KafkaEx.metadata(topic: topic)
  |> Map.get(:topic_metadatas)
  |> List.first
  |> Map.get(:error_code)
end

Topicごとのmetadataには、Listの先頭に %KafkaEx.Protocol.Metadata.TopicMetadata という構造体があるので、そこにあるエラーコードを取得しています。エラーが無ければ :no_error が返りますので、チェックを行う場合は左辺をこれにすればOKです。

Topic全体の正常性をチェック

内部で先述の get_topic_status/1 を利用します。

def check_topic_health(topic) when is_binary(topic) do
  case get_topic_status(topic) do
    :no_error -> :ok
    _ -> :error
  end
end

:ok:error を返すだけですので、呼び出し元でパターンマッチングする形です。

Topicの正常性をPartitionごとにチェックする小技

任意のPartition番号の正常性を取得

def get_topic_status(topic, partition_number)
  when is_binary(topic)
  and is_integer(partition_number) do
  KafkaEx.metadata(topic: topic)
  |> Map.get(:topic_metadatas)
  |> List.first
  |> Map.get(:partition_metadatas)
  |> Enum.map(fn x ->
       if x |> Map.get(:partition_id) == partition_number do
         x |> Map.get(:error_code)
       end
     end)
  |> Enum.filter(fn x -> x == :no_error end)
end

Topicごとのmetadataの内部には、さらにPartitionごとのmetadataがあり、そのステータスを取得しています。

任意のPartition番号の正常性をチェック

内部で先述の get_topic_status/2 を利用します。

def check_topic_health(topic, partition_number)
  when is_binary(topic)
  and is_integer(partition_number) do
  case get_topic_status(topic, partition_number) do
    [:no_error] -> :ok
    _ -> :error
  end
end

Topicにおける全Partitionのステータスを取得

def get_all_topic_status(topic) when is_binary(topic) do
  KafkaEx.metadata(topic: topic)
  |> Map.get(:topic_metadatas)
  |> List.first
  |> Map.get(:partition_metadatas)
  |> Enum.map(fn x -> x |> Map.get(:error_code) == :no_error end)
end

上記と異なり、全Partitionを対象としています。例えばPartitionが4つある場合 [true, true, true, true] といった値が返ります。

Topicにおける全Partitionのステータスを一括チェック

内部で先述の get_all_topic_status/1 を利用します。

def check_all_topic_health(topic) when is_binary(topic) do
  result = get_all_topic_status(topic)
  |> Enum.find(fn x -> x == false end)
  case result do
    nil -> :ok
    _ -> :error
  end
end

全てPartitionのステータスが正常なら :ok 、一つでもエラーがあれば :error を返します。

Produceするのに便利な小技

Partition番号の最大値を取得する

def get_greatest_partition_id(topic) when is_binary(topic) do
  KafkaEx.metadata(topic: topic)
  |> Map.get(:topic_metadatas)
  |> List.first
  |> Map.get(:partition_metadatas)
  |> Enum.map_reduce(0, fn x, acc -> {0, max(Map.get(x, :partition_id), acc)} end)
  |> elem(1)
end

Topicが分かれば、metadataを利用してPartitionがいくつあるかを調べることができます。
KafkaExにおいて、Produceする際にはPartitionを指定する必要があるので、そういう場面で使用します。

Partition番号をランダムに決める

内部で先述の get_greatest_partition_id/1 を利用します。

def get_partition_id(topic) when is_binary(topic) do
  id = topic |> get_greatest_partition_id
  Range.new(0, id) |> Enum.random
end

本当は rand モジュールを使用したかったのですが、パーティション番号 0 の扱いで困ったので、上記の方式となりました。

JSON形式でメッセージをProduceする

先述の各種関数を利用しながら、JSON形式のメッセージをProduceします。まずは必要な情報を揃えるところから。なお、Topicは環境変数 KAFKA_TOPIC から取得しています。

my_app.ex
defmodule MyApp.KafkaMessage do
  def generate(module, function, my_arg)
    when module == MyApp.Foo
    and function == {:foo, 1} do
    topic = "KAFKA_TOPIC" |> System.get_env
    :ok = topic |> check_topic_health
    :ok = topic |> check_all_topic_health
    partition = topic |> get_partition_id
    :ok = topic |> check_topic_health(partition)
    msg = %{"my_arg" => my_arg} |> Poison.encode!
    {:ok, topic, partition, msg}
  end
end

元々ErlangやElixirは引数の数で多重ディスパッチ的なことが可能で、 MyApp.KafkaMessage.generate_with_my_arg といった個別的な名前の関数を作らず、 MyApp.KafkaMessage.generate という関数名を使い回すことができますが、さらにwhenガードで呼び出し元のmoduleとfunctionを指定することで、よりきめ細かな制御が可能になります。

これを使って生成したメッセージ等を、KafkaExでProduceします。

foo.ex
defmodule MyApp.Foo do
  def foo(my_arg) do
    # ...
    {:ok, topic, partition, msg} = KafkaMessage.generate(__MODULE__, __ENV__.function, my_arg)
    :ok = KafkaEx.produce(topic, partition, msg)
  end
end

まとめ

KafkaExを使ってPhoenix Framework アプリケーションからメッセージをProduceするにあたっての各種小技をご紹介しました。
Kafkaはそれ自身が分散システムであるというだけでなく、ConsumerがpullするタイプのPubSubですので、Consumer側のマシンリソースに合わせて柔軟でスケーラブルなシステムを組むことができます。
そのフロントエンドとして、並行処理が得意なElixir 及び Phoenix Framework はまさに適任で、非常に強力な組み合わせであると言えるでしょう。
KafkaExは他の言語のKafkaクライアントライブラリに慣れていると不便なこともありますが、本記事のTipsを参考にすることで、サクッと解決することができたら幸いです。

See also

10
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?