はじめに
本記事は NTTコミュニケーションズ Advent Calendar 2018 の23日目です。
今月あるサービスをリリースしましたが、そのバックエンドで使われているノウハウの一部を一般化してご紹介します。
具体的には、Elixir から Apache Kafka を利用する場合、通常は KafkaEx を利用すると思われますが、そのTipsのご紹介です。
前提
Kafkaについて
本記事では、Kafka自体の解説はしません。また、登場するKafkaのキーワードは以下の通りです。本記事では解説をしませんので、必要に応じて調べて下さい。
- Broker
- Producer
- Consumer
- Topic
- Partition
想定環境
本記事が想定する実行環境は以下の通りです。
- Ubuntu 16.04 LTS
- Erlang/OTP 20.3
- anyenv, erlenv でインストール
- configureオプションは以下の通り。
$ ./configure \
--prefix=$HOME/.anyenv/envs/erlenv/releases/20.3 \
--enable-dynamic-ssl-lib \
--with-ssl=/usr/lib/ssl \
--enable-shared-zlib \
--enable-threads \
--enable-dirty-schedulers \
--enable-smp-support \
--enable-kernel-poll \
--enable-sctp \
--enable-hipe \
--enable-m64-build \
--with-termcap \
--enable-sharing-preserving \
--with-javac
- Elixir 1.7.4
- anyenv, exenv でインストール
想定ユースケース
本記事が想定するKafkaのユースケースは以下の通りです。
- Phoenix Framework で開発されたWebアプリケーションがProducerとしてKafkaへメッセージを送信
- Kafkaはクラスタを組んでおり、Brokerは複数存在
- 並行性を高めるため、Partitionは複数作成されている
- 他の言語で開発されたプログラムがConsumerとしてメッセージを受信して処理
ConsumerとしてのKafkaExの利用は、個人的にメリットを感じないため、本記事の対象外とさせて頂きます。
KafkaExについて
Elixir製のKafkaクライアントライブラリです。ProducerとしてもConsumerとしても利用できます。
内部的にはGenServerによりプロセス化されています。これにより、 mix phx.server
を実行したらすぐにBrokerへ接続しセッションを張るため、コネクション周りの処理を意識しなくて良いのが嬉しいところです。
なお、このために必要な設定があります。mixではおなじみの設定ですが、 mix.exs
に以下のように atom :kafka_ex
を追加しましょう。
def application do
[
mod: {MyApp, []},
extra_applications: [
:logger,
:kafka_ex,
]
]
end
一方、他の言語で利用できるKafkaクライアントライブラリと比較すると、 ユーザ側がPartitonを意識した実装をしなければならないのが不便 であると個人的には感じます。本記事では、そこも含めたTipsをご紹介します。
小技集
コンフィグのパラメタを環境変数で定義する小技
Phoenix Framework で KafkaEx のコンフィグを行う場合、基本的に deps/kafka_ex/config/config.exs
の内容を、Phoenix Framework の config/{config,dev,prod,test}.exs
にコピーして、編集するだけです。
ここでは、環境変数を使ってパラメタを定義する設定例を示します。
config :kafka_ex,
brokers: Enum.zip(
~r/([A-Za-z0-9._-]+):([0-9]{1,5})/
|> Regex.scan("KAFKA_BROKERS" |> System.get_env)
|> Enum.map(fn x -> List.pop_at(x, 1) |> elem(0) end),
~r/([A-Za-z0-9._-]+):([0-9]{1,5})/
|> Regex.scan("KAFKA_BROKERS" |> System.get_env)
|> Enum.map(fn x -> List.pop_at(x, 2) |> elem(0) |> String.to_integer end)
),
consumer_group: "KAFKA_CONSUMER_GROUP"
|> System.get_env
|| :inet.gethostname() |> elem(1) |> List.to_string,
disable_default_worker: ("KAFKA_DISABLE_DEFAULT_WORKER" |> System.get_env || "false")
|> Code.eval_string
|> elem(0),
sync_timeout: ("KAFKA_SYNC_TIMEOUT" |> System.get_env || "3000")
|> String.to_integer,
max_restarts: ("KAFKA_MAX_RESTARTS" |> System.get_env || "10")
|> String.to_integer,
max_seconds: ("KAFKA_MAX_SECONDS" |> System.get_env || "60")
|> String.to_integer,
commit_interval: ("KAFKA_COMMIT_INTERVAL" |> System.get_env || "5000")
|> String.to_integer,
commit_threshold: ("KAFKA_COMMIT_THRESHOLD" |> System.get_env || "100")
|> String.to_integer,
use_ssl: ("KAFKA_USE_SSL" |> System.get_env || "false")
|> Code.eval_string
|> elem(0),
ssl_options: [
cacertfile: "KAFKA_SSL_OPTIONS_CACERTFILE"
|> System.get_env
|| System.cwd <> "/ssl/ca-cert",
certfile: "KAFKA_SSL_OPTIONS_CERTFILE"
|> System.get_env
|| System.cwd <> "/ssl/cert.pem",
keyfile: "KAFKA_SSL_OPTIONS_KEYFILE"
|> System.get_env
|| System.cwd <> "/ssl/key.pem",
],
kafka_version: "KAFKA_VERSION"
|> System.get_env
|| "0.9.0"
-
KafkaEx側でデフォルト値を持っているパラメタについては、環境変数での指定が無ければ、デフォルト値を設定するようにしました。
-
System.get_env/1
で取得できる値は全てBinaryであることに留意が必要です。 -
例えば、Brokerに関して環境変数を以下の通り指定した場合、
export KAFKA_BROKERS="10.0.10.2:9092,10.0.11.2:9092,10.0.12.2:9092"
ElixirではTupleのListとして保持できるようにしました。
[{"10.0.10.2", 9092}, {"10.0.11.2", 9092}, {"10.0.12.2", 9092}]
ただ、KafkaExではmetadataとして各種情報を保持しており、Brokerの情報もこちらにあるため、以後各コード内では基本的にmetadataを利用します。(metadataのフォーマットについては KafkaExのREADME を参照。)
Brokerの正常性をチェックする小技
Brokerを指定してチェック
def check_broker_health(broker) when is_map(broker) do
host = broker |> Map.get(:host) |> String.to_charlist
port = broker |> Map.get(:port)
{:ok, _} = :gen_tcp.connect(host, port, [])
end
BrokerのTCPポートへの接続を試行し、 :gen_tcp.connect/3
が :ok
を返すことを確認しています。
Topicを指定してチェック
内部で先述の check_broker_health/1
を利用します。
def check_broker_health(topic) when is_binary(topic) do
KafkaEx.metadata(topic: topic)
|> Map.get(:brokers)
|> Enum.map(fn x -> check_broker_health(x) end)
end
任意のTopicを指定するだけで、Topicに割り当てられている全Brokerに対して正常性確認を実行します。
Topicの正常性をチェックする小技
Topic全体のステータスを取得
def get_topic_status(topic) when is_binary(topic) do
KafkaEx.metadata(topic: topic)
|> Map.get(:topic_metadatas)
|> List.first
|> Map.get(:error_code)
end
Topicごとのmetadataには、Listの先頭に %KafkaEx.Protocol.Metadata.TopicMetadata
という構造体があるので、そこにあるエラーコードを取得しています。エラーが無ければ :no_error
が返りますので、チェックを行う場合は左辺をこれにすればOKです。
Topic全体の正常性をチェック
内部で先述の get_topic_status/1
を利用します。
def check_topic_health(topic) when is_binary(topic) do
case get_topic_status(topic) do
:no_error -> :ok
_ -> :error
end
end
:ok
か :error
を返すだけですので、呼び出し元でパターンマッチングする形です。
Topicの正常性をPartitionごとにチェックする小技
任意のPartition番号の正常性を取得
def get_topic_status(topic, partition_number)
when is_binary(topic)
and is_integer(partition_number) do
KafkaEx.metadata(topic: topic)
|> Map.get(:topic_metadatas)
|> List.first
|> Map.get(:partition_metadatas)
|> Enum.map(fn x ->
if x |> Map.get(:partition_id) == partition_number do
x |> Map.get(:error_code)
end
end)
|> Enum.filter(fn x -> x == :no_error end)
end
Topicごとのmetadataの内部には、さらにPartitionごとのmetadataがあり、そのステータスを取得しています。
任意のPartition番号の正常性をチェック
内部で先述の get_topic_status/2
を利用します。
def check_topic_health(topic, partition_number)
when is_binary(topic)
and is_integer(partition_number) do
case get_topic_status(topic, partition_number) do
[:no_error] -> :ok
_ -> :error
end
end
Topicにおける全Partitionのステータスを取得
def get_all_topic_status(topic) when is_binary(topic) do
KafkaEx.metadata(topic: topic)
|> Map.get(:topic_metadatas)
|> List.first
|> Map.get(:partition_metadatas)
|> Enum.map(fn x -> x |> Map.get(:error_code) == :no_error end)
end
上記と異なり、全Partitionを対象としています。例えばPartitionが4つある場合 [true, true, true, true]
といった値が返ります。
Topicにおける全Partitionのステータスを一括チェック
内部で先述の get_all_topic_status/1
を利用します。
def check_all_topic_health(topic) when is_binary(topic) do
result = get_all_topic_status(topic)
|> Enum.find(fn x -> x == false end)
case result do
nil -> :ok
_ -> :error
end
end
全てPartitionのステータスが正常なら :ok
、一つでもエラーがあれば :error
を返します。
Produceするのに便利な小技
Partition番号の最大値を取得する
def get_greatest_partition_id(topic) when is_binary(topic) do
KafkaEx.metadata(topic: topic)
|> Map.get(:topic_metadatas)
|> List.first
|> Map.get(:partition_metadatas)
|> Enum.map_reduce(0, fn x, acc -> {0, max(Map.get(x, :partition_id), acc)} end)
|> elem(1)
end
Topicが分かれば、metadataを利用してPartitionがいくつあるかを調べることができます。
KafkaExにおいて、Produceする際にはPartitionを指定する必要があるので、そういう場面で使用します。
Partition番号をランダムに決める
内部で先述の get_greatest_partition_id/1
を利用します。
def get_partition_id(topic) when is_binary(topic) do
id = topic |> get_greatest_partition_id
Range.new(0, id) |> Enum.random
end
本当は rand
モジュールを使用したかったのですが、パーティション番号 0
の扱いで困ったので、上記の方式となりました。
JSON形式でメッセージをProduceする
先述の各種関数を利用しながら、JSON形式のメッセージをProduceします。まずは必要な情報を揃えるところから。なお、Topicは環境変数 KAFKA_TOPIC
から取得しています。
defmodule MyApp.KafkaMessage do
def generate(module, function, my_arg)
when module == MyApp.Foo
and function == {:foo, 1} do
topic = "KAFKA_TOPIC" |> System.get_env
:ok = topic |> check_topic_health
:ok = topic |> check_all_topic_health
partition = topic |> get_partition_id
:ok = topic |> check_topic_health(partition)
msg = %{"my_arg" => my_arg} |> Poison.encode!
{:ok, topic, partition, msg}
end
end
元々ErlangやElixirは引数の数で多重ディスパッチ的なことが可能で、 MyApp.KafkaMessage.generate_with_my_arg
といった個別的な名前の関数を作らず、 MyApp.KafkaMessage.generate
という関数名を使い回すことができますが、さらにwhenガードで呼び出し元のmoduleとfunctionを指定することで、よりきめ細かな制御が可能になります。
これを使って生成したメッセージ等を、KafkaExでProduceします。
defmodule MyApp.Foo do
def foo(my_arg) do
# ...
{:ok, topic, partition, msg} = KafkaMessage.generate(__MODULE__, __ENV__.function, my_arg)
:ok = KafkaEx.produce(topic, partition, msg)
end
end
まとめ
KafkaExを使ってPhoenix Framework アプリケーションからメッセージをProduceするにあたっての各種小技をご紹介しました。
Kafkaはそれ自身が分散システムであるというだけでなく、ConsumerがpullするタイプのPubSubですので、Consumer側のマシンリソースに合わせて柔軟でスケーラブルなシステムを組むことができます。
そのフロントエンドとして、並行処理が得意なElixir 及び Phoenix Framework はまさに適任で、非常に強力な組み合わせであると言えるでしょう。
KafkaExは他の言語のKafkaクライアントライブラリに慣れていると不便なこともありますが、本記事のTipsを参考にすることで、サクッと解決することができたら幸いです。