14
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Elixir製MQTTクライアントtortoiseを使ってみた

Last updated at Posted at 2019-12-05

この記事は、fukuoka.ex Elixir/Phoenix Advent Calendar 2019 の5日目です。

昨日は @hisaway さんの「Elixirでコード変換してみよう」でした。

今日は、最近触ってみたMQTTクライアントtortoiseについてです。

触ってみた感じ、tortoise自体は非常に簡単で使いやすいかったです。
MQTTクライアント、サーバーの実装候補としてElixir+tortoiseは強力な候補になるのではないでしょうか?

MQTTとは

AWS IoTとかに繋いでPub/Subできるプロトコルですね。
センサーデバイスとかやってる人はきっと使ってるはず!
以上

tortoise

MQTTのクライアントライブラリと言えばpahoが比較的有名な様なのですが、Elixirでやりたかったので探したらありました。

tortoise

tortoise の設定例

まずはいつものdepsから

defp deps do
    [
      {:phoenix, "~> 1.3.2"},
      {:phoenix_pubsub, "~> 1.0"},
      {:phoenix_ecto, "~> 3.2"},
      {:plug, "1.5.1"},
      {:phoenix_html, "~> 2.11"},
      {:phoenix_live_reload, "~> 1.0", only: :dev},
      {:gettext, "~> 0.11"},
      {:jason, "~> 1.0"},
      {:cowboy, "~> 1.0"},
      {:cors_plug, "~> 1.5"},
      {:tortoise, "~> 0.9"}, // これ
      {:certifi, "~> 2.4"},
      {:ssl_verify_fun, "~> 1.1"},
      {:poison, "3.1.0", override: true},
      {:idna, "~> 6.0"},
      {:mimerl, "~> 1.2"},
      {:metrics, "~> 1.0"},
      {:hackney, "~> 1.15"},
    ]

いつも通り簡単です。
certifiとかssl_verify_funとか色々追加されているのはAWSIoTに繋ぐのにSSLの接続設定が必要になったからです。
tortoiseの本筋とは関係ありません。

続いてapplication.exの設定例

defmodule SampleMqtt.Application do

  @moduledoc false

  use Application

〜中略〜

  def start(_type, _args) do
   {:ok ,client} = Tortoise.Supervisor.start_child(
    client_id: "your app client id",
    handler: {SampleMqtt.Handler.Messagehandler, []},
    server: {
      // エンドポイントとSSLの設定
      Tortoise.Transport.SSL,
      host: '{Bloker(AWS Iot Coreなどのエンドポイント)}',
      port: {Bloker(AWS Iot Coreなどのポート) eg: 8883},
      keyfile: '{秘密鍵のローカルパス}',
      certfile: '{Certファイルのローカルパス}',
      cacerts: :certifi.cacerts(),
      depth: 99,
      versions: [:'tlsv1.2'],
      customize_hostname_check: [match_fun: 
      :public_key.pkix_verify_hostname_match_fun(:https)]
    },
    //subscribeするtopictopicQos
    subscriptions: [
      {"test/topic1", 0},  //
      {"test/topic1", 1},  //
      ])
  end

Tortoise.Supervisorの起動設定です。
設定自体は意外と少ないですが、AWS IoT Coreに直接繋ぐとSSLの設定が必要となるため、
その辺りの知識が要求されます。

MQTT固有の注意点としてはclient_idはBlokerに接続する全Clientで一意である必要があります。

Subscriber側(Tortoise.Handler)とPublisher側(Tortoise.publish)の実装例

まずは、use Tortoise.Handlerで定義された各種イベントハンドラーを実装します。
例ではログ出力のみで特に何もしていません。


defmodule SampleMqtt.Handler.Messagehandler do

  use Tortoise.Handler

def init(_opts) do
    Logger.info("Initializing handler")
    {:ok, %State{}}
  end

  def connection(:up, state) do
    Logger.info("Connection has been established")
    {:ok, state}
  end

  def connection(:down, state) do
    Logger.warn("Connection has been dropped")
    {:ok, state}
  end

  def connection(:terminating, state) do
    Logger.warn("Connection is terminating")
    {:ok, state}
  end

  def subscription(:up, topic, state) do
    Logger.info("Subscribed to #{topic}")
    {:ok, state}
  end

  def subscription({:warn, [requested: req, accepted: qos]}, topic, state) do
    Logger.warn("Subscribed to #{topic}; requested #{req} but got accepted with QoS #{qos}")
    {:ok, state}
  end

  def subscription({:error, reason}, topic, state) do
    Logger.error("Error subscribing to #{topic}; #{inspect(reason)}")
    {:ok, state}
  end

  def subscription(:down, topic, state) do
    Logger.info("Unsubscribed from #{topic}")
    {:ok, state}
  end

〜以下略〜

続いてメインとなるtopic受信時のハンドラ実装です。


def handle_message(topic, publish, state) do

    topic_path = Enum.join(topic, "/")

    state = handle(topic_path, topic, publish, state)

    {:ok, state}
  end

def handle("test/topic1" , topic, publish, state) do

{:ok, payload} = Poison.encode({ message: "hogehoge"})
  case Tortoise.publish_sync({your app client id}, "test/topic_resp", payload, qos: 1, timeout: 200) do
      :ok ->
        {:ok, state}

      {:error, :timeout} ->
        {:error, state}
      {:error, :timeout} ->
        {:error, state}
    end

end

def handle("test/topic2" , topic, publish, state) do

   〜なんかいい感じの処理〜

    {:ok, state}

  end

handle_message/3関数を実装することになりますが、
topicには受信したtopicの階層がリストで入ってくるので
(test/topic1 なら topicには["test", "topic1"]と入ってくる)ので
受けたtopicに応じてパターンマッチなどで処理を分岐することができます。

publishにはMQTT Topicのpayloadがそのまま入ってきます。
なので通常のJsonフォーマットであれば

{:ok, json} = Poison.decode(publish)

としてデコードすればあとは自由にできます。

例ではわかりやすい様に一旦通常のtopicの表記(/区切り)に戻して内部でパターンマッチでtopic毎に処理を分けています。
共通の前処理などが不要の場合はhandle_messageでいきなりパターンマッチもありだと思います。
test/topic1の例では、topicを受けた処理内部でTortoise.publish_sync関数を使って別のTopicをPublishしています。
(なのでこの例でクライアント側の実装例も兼ねていますw)

まとめ

ご覧の通り、Supervisorを起動してしまえば、

  • handle_message
  • Tortoise.publish(or Tortoise.publish_sync)

を使って縦横無尽にPub/Subすることができます!

MQTTに関してはAWS Iot Core ではRetainやQoS=2がサポートされていないなど、
クライアントより寧ろBloker側の制約に気を使う必要がありますが、
サクッとElixirでPub/Subしてみる場合にはtortoiseを使うと比較的簡単(SSL周り以外)に実装できることがわかりました。


明日12/6は @sym_num さんの「ElixirでPrologコンパイラを作ったお話」です.お楽しみに!

14
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?