Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
8
Help us understand the problem. What is going on with this article?
@tuchiro

Elixir製MQTTクライアントtortoiseを使ってみた

More than 1 year has passed since last update.

この記事は、fukuoka.ex Elixir/Phoenix Advent Calendar 2019 の5日目です。

昨日は @hisaway さんの「Elixirでコード変換してみよう」でした。

今日は、最近触ってみたMQTTクライアントtortoiseについてです。

触ってみた感じ、tortoise自体は非常に簡単で使いやすいかったです。
MQTTクライアント、サーバーの実装候補としてElixir+tortoiseは強力な候補になるのではないでしょうか?

MQTTとは

AWS IoTとかに繋いでPub/Subできるプロトコルですね。
センサーデバイスとかやってる人はきっと使ってるはず!
以上

tortoise

MQTTのクライアントライブラリと言えばpahoが比較的有名な様なのですが、Elixirでやりたかったので探したらありました。

tortoise

tortoise の設定例

まずはいつものdepsから

defp deps do
    [
      {:phoenix, "~> 1.3.2"},
      {:phoenix_pubsub, "~> 1.0"},
      {:phoenix_ecto, "~> 3.2"},
      {:plug, "1.5.1"},
      {:phoenix_html, "~> 2.11"},
      {:phoenix_live_reload, "~> 1.0", only: :dev},
      {:gettext, "~> 0.11"},
      {:jason, "~> 1.0"},
      {:cowboy, "~> 1.0"},
      {:cors_plug, "~> 1.5"},
      {:tortoise, "~> 0.9"}, // これ
      {:certifi, "~> 2.4"},
      {:ssl_verify_fun, "~> 1.1"},
      {:poison, "3.1.0", override: true},
      {:idna, "~> 6.0"},
      {:mimerl, "~> 1.2"},
      {:metrics, "~> 1.0"},
      {:hackney, "~> 1.15"},
    ]

いつも通り簡単です。
certifiとかssl_verify_funとか色々追加されているのはAWSIoTに繋ぐのにSSLの接続設定が必要になったからです。
tortoiseの本筋とは関係ありません。

続いてapplication.exの設定例

defmodule SampleMqtt.Application do

  @moduledoc false

  use Application

〜中略〜

  def start(_type, _args) do
   {:ok ,client} = Tortoise.Supervisor.start_child(
    client_id: "your app client id",
    handler: {SampleMqtt.Handler.Messagehandler, []},
    server: {
      // エンドポイントとSSLの設定
      Tortoise.Transport.SSL,
      host: '{Bloker(AWS Iot Coreなどのエンドポイント)}',
      port: {Bloker(AWS Iot Coreなどのポート) eg: 8883},
      keyfile: '{秘密鍵のローカルパス}',
      certfile: '{Certファイルのローカルパス}',
      cacerts: :certifi.cacerts(),
      depth: 99,
      versions: [:'tlsv1.2'],
      customize_hostname_check: [match_fun: 
      :public_key.pkix_verify_hostname_match_fun(:https)]
    },
    //subscribeするtopictopicQos
    subscriptions: [
      {"test/topic1", 0},  //
      {"test/topic1", 1},  //
      ])
  end

Tortoise.Supervisorの起動設定です。
設定自体は意外と少ないですが、AWS IoT Coreに直接繋ぐとSSLの設定が必要となるため、
その辺りの知識が要求されます。

MQTT固有の注意点としてはclient_idはBlokerに接続する全Clientで一意である必要があります。

Subscriber側(Tortoise.Handler)とPublisher側(Tortoise.publish)の実装例

まずは、use Tortoise.Handlerで定義された各種イベントハンドラーを実装します。
例ではログ出力のみで特に何もしていません。


defmodule SampleMqtt.Handler.Messagehandler do

  use Tortoise.Handler

def init(_opts) do
    Logger.info("Initializing handler")
    {:ok, %State{}}
  end

  def connection(:up, state) do
    Logger.info("Connection has been established")
    {:ok, state}
  end

  def connection(:down, state) do
    Logger.warn("Connection has been dropped")
    {:ok, state}
  end

  def connection(:terminating, state) do
    Logger.warn("Connection is terminating")
    {:ok, state}
  end

  def subscription(:up, topic, state) do
    Logger.info("Subscribed to #{topic}")
    {:ok, state}
  end

  def subscription({:warn, [requested: req, accepted: qos]}, topic, state) do
    Logger.warn("Subscribed to #{topic}; requested #{req} but got accepted with QoS #{qos}")
    {:ok, state}
  end

  def subscription({:error, reason}, topic, state) do
    Logger.error("Error subscribing to #{topic}; #{inspect(reason)}")
    {:ok, state}
  end

  def subscription(:down, topic, state) do
    Logger.info("Unsubscribed from #{topic}")
    {:ok, state}
  end

〜以下略〜

続いてメインとなるtopic受信時のハンドラ実装です。


def handle_message(topic, publish, state) do

    topic_path = Enum.join(topic, "/")

    state = handle(topic_path, topic, publish, state)

    {:ok, state}
  end

def handle("test/topic1" , topic, publish, state) do

{:ok, payload} = Poison.encode({ message: "hogehoge"})
  case Tortoise.publish_sync({your app client id}, "test/topic_resp", payload, qos: 1, timeout: 200) do
      :ok ->
        {:ok, state}

      {:error, :timeout} ->
        {:error, state}
      {:error, :timeout} ->
        {:error, state}
    end

end

def handle("test/topic2" , topic, publish, state) do

   〜なんかいい感じの処理〜

    {:ok, state}

  end

handle_message/3関数を実装することになりますが、
topicには受信したtopicの階層がリストで入ってくるので
(test/topic1 なら topicには["test", "topic1"]と入ってくる)ので
受けたtopicに応じてパターンマッチなどで処理を分岐することができます。

publishにはMQTT Topicのpayloadがそのまま入ってきます。
なので通常のJsonフォーマットであれば

{:ok, json} = Poison.decode(publish)

としてデコードすればあとは自由にできます。

例ではわかりやすい様に一旦通常のtopicの表記(/区切り)に戻して内部でパターンマッチでtopic毎に処理を分けています。
共通の前処理などが不要の場合はhandle_messageでいきなりパターンマッチもありだと思います。
test/topic1の例では、topicを受けた処理内部でTortoise.publish_sync関数を使って別のTopicをPublishしています。
(なのでこの例でクライアント側の実装例も兼ねていますw)

まとめ

ご覧の通り、Supervisorを起動してしまえば、

  • handle_message
  • Tortoise.publish(or Tortoise.publish_sync)

を使って縦横無尽にPub/Subすることができます!

MQTTに関してはAWS Iot Core ではRetainやQoS=2がサポートされていないなど、
クライアントより寧ろBloker側の制約に気を使う必要がありますが、
サクッとElixirでPub/Subしてみる場合にはtortoiseを使うと比較的簡単(SSL周り以外)に実装できることがわかりました。


明日12/6は @sym_num さんの「ElixirでPrologコンパイラを作ったお話」です.お楽しみに!

8
Help us understand the problem. What is going on with this article?
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
tuchiro
株式会社カラビナテクノロジーでElixirを用いたシステム開発を実施 主にElixir&Phoenixでのバックエンド実装を担当
fukuokaex
エンジニア/企業向けにElixirプロダクト開発・SI案件開発を支援する福岡のコミュニティ

Comments

No comments
Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account Login
8
Help us understand the problem. What is going on with this article?