この記事は、fukuoka.ex Elixir/Phoenix Advent Calendar 2019 の5日目です。
昨日は @hisaway さんの「Elixirでコード変換してみよう」でした。
今日は、最近触ってみたMQTTクライアントtortoiseについてです。
触ってみた感じ、tortoise自体は非常に簡単で使いやすいかったです。
MQTTクライアント、サーバーの実装候補としてElixir+tortoiseは強力な候補になるのではないでしょうか?
MQTTとは
AWS IoTとかに繋いでPub/Subできるプロトコルですね。
センサーデバイスとかやってる人はきっと使ってるはず!
以上
tortoise
MQTTのクライアントライブラリと言えばpahoが比較的有名な様なのですが、Elixirでやりたかったので探したらありました。
tortoise の設定例
まずはいつものdepsから
defp deps do
[
{:phoenix, "~> 1.3.2"},
{:phoenix_pubsub, "~> 1.0"},
{:phoenix_ecto, "~> 3.2"},
{:plug, "1.5.1"},
{:phoenix_html, "~> 2.11"},
{:phoenix_live_reload, "~> 1.0", only: :dev},
{:gettext, "~> 0.11"},
{:jason, "~> 1.0"},
{:cowboy, "~> 1.0"},
{:cors_plug, "~> 1.5"},
{:tortoise, "~> 0.9"}, // これ
{:certifi, "~> 2.4"},
{:ssl_verify_fun, "~> 1.1"},
{:poison, "3.1.0", override: true},
{:idna, "~> 6.0"},
{:mimerl, "~> 1.2"},
{:metrics, "~> 1.0"},
{:hackney, "~> 1.15"},
]
いつも通り簡単です。
certifiとかssl_verify_funとか色々追加されているのはAWSIoTに繋ぐのにSSLの接続設定が必要になったからです。
tortoiseの本筋とは関係ありません。
続いてapplication.exの設定例
defmodule SampleMqtt.Application do
@moduledoc false
use Application
〜中略〜
def start(_type, _args) do
{:ok ,client} = Tortoise.Supervisor.start_child(
client_id: "your app client id",
handler: {SampleMqtt.Handler.Messagehandler, []},
server: {
// エンドポイントとSSLの設定
Tortoise.Transport.SSL,
host: '{Bloker(AWS Iot Coreなどのエンドポイント)}',
port: {Bloker(AWS Iot Coreなどのポート) eg: 8883},
keyfile: '{秘密鍵のローカルパス}',
certfile: '{Certファイルのローカルパス}',
cacerts: :certifi.cacerts(),
depth: 99,
versions: [:'tlsv1.2'],
customize_hostname_check: [match_fun:
:public_key.pkix_verify_hostname_match_fun(:https)]
},
//subscribeするtopicのtopicとQos
subscriptions: [
{"test/topic1", 0}, //
{"test/topic1", 1}, //
])
end
Tortoise.Supervisorの起動設定です。
設定自体は意外と少ないですが、AWS IoT Coreに直接繋ぐとSSLの設定が必要となるため、
その辺りの知識が要求されます。
MQTT固有の注意点としてはclient_idはBlokerに接続する全Clientで一意である必要があります。
Subscriber側(Tortoise.Handler)とPublisher側(Tortoise.publish)の実装例
まずは、use Tortoise.Handlerで定義された各種イベントハンドラーを実装します。
例ではログ出力のみで特に何もしていません。
defmodule SampleMqtt.Handler.Messagehandler do
use Tortoise.Handler
def init(_opts) do
Logger.info("Initializing handler")
{:ok, %State{}}
end
def connection(:up, state) do
Logger.info("Connection has been established")
{:ok, state}
end
def connection(:down, state) do
Logger.warn("Connection has been dropped")
{:ok, state}
end
def connection(:terminating, state) do
Logger.warn("Connection is terminating")
{:ok, state}
end
def subscription(:up, topic, state) do
Logger.info("Subscribed to #{topic}")
{:ok, state}
end
def subscription({:warn, [requested: req, accepted: qos]}, topic, state) do
Logger.warn("Subscribed to #{topic}; requested #{req} but got accepted with QoS #{qos}")
{:ok, state}
end
def subscription({:error, reason}, topic, state) do
Logger.error("Error subscribing to #{topic}; #{inspect(reason)}")
{:ok, state}
end
def subscription(:down, topic, state) do
Logger.info("Unsubscribed from #{topic}")
{:ok, state}
end
〜以下略〜
続いてメインとなるtopic受信時のハンドラ実装です。
def handle_message(topic, publish, state) do
topic_path = Enum.join(topic, "/")
state = handle(topic_path, topic, publish, state)
{:ok, state}
end
def handle("test/topic1" , topic, publish, state) do
{:ok, payload} = Poison.encode({ message: "hogehoge"})
case Tortoise.publish_sync(”{your app client id}”, "test/topic_resp", payload, qos: 1, timeout: 200) do
:ok ->
{:ok, state}
{:error, :timeout} ->
{:error, state}
{:error, :timeout} ->
{:error, state}
end
end
def handle("test/topic2" , topic, publish, state) do
〜なんかいい感じの処理〜
{:ok, state}
end
handle_message/3関数を実装することになりますが、
topicには受信したtopicの階層がリストで入ってくるので
(test/topic1 なら topicには["test", "topic1"]と入ってくる)ので
受けたtopicに応じてパターンマッチなどで処理を分岐することができます。
publishにはMQTT Topicのpayloadがそのまま入ってきます。
なので通常のJsonフォーマットであれば
{:ok, json} = Poison.decode(publish)
としてデコードすればあとは自由にできます。
例ではわかりやすい様に一旦通常のtopicの表記(/区切り)に戻して内部でパターンマッチでtopic毎に処理を分けています。
共通の前処理などが不要の場合はhandle_messageでいきなりパターンマッチもありだと思います。
test/topic1の例では、topicを受けた処理内部でTortoise.publish_sync関数を使って別のTopicをPublishしています。
(なのでこの例でクライアント側の実装例も兼ねていますw)
まとめ
ご覧の通り、Supervisorを起動してしまえば、
- handle_message
- Tortoise.publish(or Tortoise.publish_sync)
を使って縦横無尽にPub/Subすることができます!
MQTTに関してはAWS Iot Core ではRetainやQoS=2がサポートされていないなど、
クライアントより寧ろBloker側の制約に気を使う必要がありますが、
サクッとElixirでPub/Subしてみる場合にはtortoiseを使うと比較的簡単(SSL周り以外)に実装できることがわかりました。
明日12/6は @sym_num さんの「ElixirでPrologコンパイラを作ったお話」です.お楽しみに!