Help us understand the problem. What is going on with this article?

Phoenix上でgen_emqttを用いたMQTTのsubscriberを作る

More than 3 years have passed since last update.

はじめに

 IoTという言葉が流行し始めて以後、M2M用プロトコルとして、世間のMQTTに対する意識は急上昇の最中にあります。自然と日の目を見るのはMQTTブローカーの存在でしょう。枯れたところではJMS実装のActiveMQから、最近ではerlang実装のemqttやvernemqなど、実に様々なブローカーが世間を賑わせております。

 しかしながら、いざ実際にサービスとしてIoTなアプリケーションの開発を考えたとき、ネックとなるのはMQTTブローカーとは別のところにあります。ブローカーは基本的にパッケージとなっていて、簡単にインストールできます。管理UIが付属する製品も少なくありません。SaaSサービスを利用するという手もあります。

 Publish(端末)側もデータを投げるだけなので、こちらも実装は比較的簡単です。

 問題なのは端末を集約するSubscriber(データベース)側です。gem文化に甘やかされて育った底辺プログラマの私には、自前で大量のコネクションやスレッドを管理するような真似は無理ゲーです。キモオタがアキバでJKをナンパするようなものです。心に深い傷を負い、シクシクと泣きながら女装山脈をプレイする未来が容易に窺えます。

 これに対する一つのアプローチとして、elixir(erlang/OTP)が良いのではないかと、昨今、そこはかとなく考えております。ということで、gen_emqttを用いたSubscriberのサンプルをPhoenix上に作成してみようと思います。言語仕様としてプロセスのマネージメントが規定されている同言語だからこその領分であります。

 尚、以降の作業に関しては、以下の記事でelixirとphoenixをインストールしていることが前提となります。動作の確認はcentos6で行っておりますが、centos7でも動くと思います。デビアンマンは適宜読み替えて下さい。

 Phoenixにオレオレプロセスを追加する

実装

Postgresqlのインストール

 PhoenixのデフォルトのデータベースはPostgresqlです。newの際にオプションを指定してMySQLにスイッチすることも可能です。今回はPhoenixの流儀に習ってPostgresqlで実装してゆきます。

$ sudo yum install postgresql-server
$ sudo /etc/init.d/postgresql initdb
$ sudo vi /var/lib/pgsql/data/pg_hba.conf
$ sudo /etc/init.d/postgresql start

 Postgresqlはデフォルトでident認証が入っています。その設定は今回の主題とは離れますので、全てをtrustとして接続を許可してしまいます。

pg_hba.conf
# TYPE  DATABASE    USER        CIDR-ADDRESS          METHOD

# "local" is for Unix domain socket connections only
local   all         all                               trust
# IPv4 local connections:
host    all         all         127.0.0.1/32          trust
# IPv6 local connections:
host    all         all         ::1/128               trust

Workerモデルの追加

$ mix phoenix.gen.html Worker workers name:string status:string
$ mix ecto.create
$ mix ecto.migrate

今回追加する処理全体を管理するSupervisorの追加

$ vi lib/hello_world/mqtt_supervisor.ex
mqtt_supervisor.ex
defmodule HelloWorld.MqttSupervisor do
  use Supervisor

  require Logger

  alias HelloWorld.MqttWorkerSupervisor
  alias HelloWorld.MqttWorkerManager

  def start_link do
    Supervisor.start_link(__MODULE__, [], name: MqttSupervisor)
  end

  def init(args) do
    children = [
      supervisor(MqttWorkerSupervisor, []),
      worker(MqttWorkerManager, [[]])
    ]
    options = [
      strategy: :one_for_one
    ]
    supervise(children, options)
  end
end

MqttWorkerを管理するMqttWorkerSupervisorの追加

$ vi lib/hello_world/mqtt_worker_supervisor.ex
worker_supervisor.ex
defmodule HelloWorld.MqttWorkerSupervisor do
  use Supervisor

  require Logger

  alias HelloWorld.MqttWorkerSupervisor
  alias HelloWorld.MqttWorker

  def start_link do
    Supervisor.start_link(__MODULE__, [], name: MqttWorkerSupervisor)
  end

  def init(args) do
    children = [
      worker(MqttWorker, [], restart: :transient)
    ]
    options = [
      strategy: :simple_one_for_one
    ]
    Logger.info("[mqtt_worker_supervisor: #{inspect self}] supervisor start")
    supervise(children, options)
  end

  def start_worker(model) do
    Supervisor.start_child(MqttWorkerSupervisor, [model])
  end

  def stop_worker(worker_pid) do
    Supervisor.terminate_child(MqttWorkerSupervisor, worker_pid)
  end

  def count_workers do
    Supervisor.count_children(MqttWorkerSupervisor)
  end

end

MqttWorkerを管理するMqttWorkerManagerの追加

$ vi lib/hello_world/mqtt_worker_manager.ex
worker_manager.ex
defmodule HelloWorld.MqttWorkerManager do

  use GenServer
  require Logger
  import Ecto.Query

  alias HelloWorld.Repo
  alias HelloWorld.Worker
  alias HelloWorld.MqttWorker
  alias HelloWorld.MqttWorkerSupervisor

  def start_link(args) do
    GenServer.start_link(__MODULE__, args, name: MqttWorkerManager)
  end

  def init(state) do
    Logger.info("[MqttWorkkerManager] manager start")
    workers = :ets.new(:workers, [:named_table, read_concurrency: true])

    query = from w in Worker, where: w.status == "running"
    Repo.all(query) |> Enum.each fn(model) -> MqttWorkerSupervisor.start_worker(model) end
    {:ok, state}
  end

  def handle_call({:start_worker, model}, from, state) do
    MqttWorkerSupervisor.start_worker(model)
    {:reply, model, state}
  end

  def handle_call({:stop_worker, model}, from, state) do
    case :ets.lookup(:workers, model.id) do
      [{id, worker_pid}] ->
        MqttWorkerSupervisor.stop_worker(worker_pid)
        :ets.delete(:workers, id)
      [] ->
        Logger.warn("[MqttWorkerManager] worker #{inspect model.id} not stoped")
    end
    {:reply, model, state}
  end

  def handle_cast({:started_worker, model, worker_pid}, state) do
    :ets.insert(:workers, {model.id, worker_pid})
    Logger.info("[MqttWorkkerManager] worker #{inspect worker_pid} started")
    {:noreply, state}
  end

  def start_worker(model) do
    :gen_server.call(MqttWorkerManager, {:start_worker, model})
  end

  def stop_worker(model) do
    :gen_server.call(MqttWorkerManager, {:stop_worker, model})
  end

  def worker_started(model, worker_pid) do
    :gen_server.cast(MqttWorkerManager, {:started_worker, model, worker_pid})
  end

end

MqttWorkerの追加

$ vi lib/hello_world/mqtt_worker.ex
mqtt_worker.ex
defmodule HelloWorld.MqttWorker do

  require Logger

  alias HelloWorld.MqttWorkerSupervisor
  alias HelloWorld.MqttWorkerManager
  alias HelloWorld.Worker

  def start_link(model) do
    state = %{
      subscribe_qos: 1,
      model: model
    }
    options = [
      host: 'localhost',
      port: 1883,
      username: "admin",
      password: "password",
      client: UUID.uuid4(:hex) |> String.slice(0..12)
    ]
    :gen_emqtt.start_link(__MODULE__, state, options)
  end

  def init(state) do
    MqttWorkerManager.worker_started(state.model, self)
    {:ok, state}
  end


  #
  # emqtt callbacks
  #

  def on_connect(state) do
    Logger.info("[worker: #{inspect self}] MQTT connection established")
    :gen_emqtt.subscribe(self, "#", state.subscribe_qos)
    {:ok, state}
  end

  def on_connect_error(reason, state) do
    Logger.warn("[worker: #{inspect self}] MQTT connection error: #{inspect reason}")
    {:ok, state}
  end

  def on_disconnect(state) do
    Logger.info("[worker: #{inspect self}] MQTT server disconnected")
    {:ok, state}
  end

  def on_subscribe([{topic, _qos}]=subscription, state) do
    Logger.debug("[worker: #{inspect self}] MQTT server subscribed: #{inspect subscription}")
    {:ok, state}
  end

  def on_unsubscribe([topic], state) do
    Logger.debug("[worker: #{inspect self}] MQTT server unsubscribed: #{inspect topic}")
    {:ok, state}
  end

  def on_publish(topic, msg, state) do
    Logger.debug("[worker: #{inspect self}] MQTT server published #{inspect {topic, msg}}")
    {:ok, state}
  end

  def handle_call(msg, _from, state) do
    {:stop, {:error, {:unexpected_msg, msg}}, state}
  end

  def handle_cast(msg, state) do
    {:stop, {:error, {:unexpected_msg, msg}}, state}
  end

  def handle_info(msg, state) do
    Logger.warn("[worker: #{inspect self}] #{inspect msg}")
    {:stop, {:error, {:unexpected_msg, msg}}, state}
  end


  def terminate(reason, _state) do
    Logger.info("[worker: #{inspect self}] terminate worker: #{inspect reason}")
  end

  def code_change(_oldvsn, state, _extra), do: {:ok, state}

end

MqttWorker管理Controllerの追加

$ vi web/controllers/worker_controller.ex
defmodule HelloWorld.WorkerController do
  use HelloWorld.Web, :controller

  alias HelloWorld.Worker
  alias HelloWorld.MqttWorker
  alias HelloWorld.MqttWorkerManager

  plug :scrub_params, "worker" when action in [:create, :update]

  def index(conn, _params) do
    workers = Repo.all(Worker)
    render(conn, "index.html", workers: workers)
  end

  def new(conn, _params) do
    changeset = Worker.changeset(%Worker{})
    render(conn, "new.html", changeset: changeset)
  end

  def create(conn, %{"worker" => worker_params}) do
    changeset = Worker.changeset(%Worker{}, worker_params)

    if changeset.valid? do
      Repo.insert!(changeset)

      conn
      |> put_flash(:info, "Worker created successfully.")
      |> redirect(to: worker_path(conn, :index))
    else
      render(conn, "new.html", changeset: changeset)
    end
  end

  def show(conn, %{"id" => id}) do
    worker = Repo.get!(Worker, id)
    render(conn, "show.html", worker: worker)
  end

  def edit(conn, %{"id" => id}) do
    worker = Repo.get!(Worker, id)
    changeset = Worker.changeset(worker)
    render(conn, "edit.html", worker: worker, changeset: changeset)
  end

  def update(conn, %{"id" => id, "worker" => worker_params}) do
    worker = Repo.get!(Worker, id)
    changeset = Worker.changeset(worker, worker_params)

    if changeset.valid? do
      Repo.update!(changeset)

      conn
      |> put_flash(:info, "Worker updated successfully.")
      |> redirect(to: worker_path(conn, :index))
    else
      render(conn, "edit.html", worker: worker, changeset: changeset)
    end
  end

  def delete(conn, %{"id" => id}) do
    worker = Repo.get!(Worker, id)
    Repo.delete!(worker)

    conn
    |> put_flash(:info, "Worker deleted successfully.")
    |> redirect(to: worker_path(conn, :index))
  end

  def start(conn, %{"worker_id" => worker_id}) do
    worker = Repo.get!(Worker, worker_id)
    changeset = Worker.changeset(worker, %{"status" => "running"})

    if changeset.valid? do
      Repo.update!(changeset)
      HelloWorld.MqttWorkerManager.start_worker(worker)

      conn
      |> put_flash(:info, "Worker updated successfully.")
      |> redirect(to: worker_path(conn, :index))
    else
      render(conn, "edit.html", worker: worker, changeset: changeset)
    end
  end

  def stop(conn, %{"worker_id" => worker_id}) do
    worker = Repo.get!(Worker, worker_id)
    changeset = Worker.changeset(worker, %{"status" => "stop"})

    if changeset.valid? do
      Repo.update!(changeset)
      HelloWorld.MqttWorkerManager.stop_worker(worker)

      conn
      |> put_flash(:info, "Worker updated successfully.")
      |> redirect(to: worker_path(conn, :index))
    else
      render(conn, "edit.html", worker: worker, changeset: changeset)
    end
  end

end

MqttWorker管理Viewの追加

$ vi web/templates/worker/index.html.eex
index.html.eex
<h2>Listing workers</h2>

<h3>ActiveWorkers: <%= inspect HelloWorld.MqttWorkerSupervisor.count_workers[:active] %></dd></h3>
<table class="table">
  <thead>
    <tr>
      <th>Name</th>
      <th>Status</th>

      <th></th>
    </tr>
  </thead>
  <tbody>
<%= for worker <- @workers do %>
    <tr>
      <td><%= worker.name %></td>
      <td><%= worker.status %></td>

      <td class="text-right">
        <%= if worker.status === "running", do: link("Stop", to: worker_worker_path(@conn, :stop, worker), method: :put, class: "btn btn-default btn-xs") %>
        <%= if worker.status === "stop", do: link("Start", to: worker_worker_path(@conn, :start, worker), method: :put, class: "btn btn-default btn-xs") %>
        <%= link "Show", to: worker_path(@conn, :show, worker), class: "btn btn-default btn-xs" %>
        <%= link "Edit", to: worker_path(@conn, :edit, worker), class: "btn btn-default btn-xs" %>
        <%= link "Delete", to: worker_path(@conn, :delete, worker), method: :delete, class: "btn btn-danger btn-xs" %>
      </td>
    </tr>
<% end %>
  </tbody>
</table>

<%= link "New worker", to: worker_path(@conn, :new) %>

ルーティングの追加

$ vi web/router.ex
web/router.ex
defmodule HelloWorld.Router do
  use HelloWorld.Web, :router

  pipeline :browser do
    plug :accepts, ["html"]
    plug :fetch_session
    plug :fetch_flash
    plug :protect_from_forgery
  end

  pipeline :api do
    plug :accepts, ["json"]
  end

  scope "/", HelloWorld do
    pipe_through :browser # Use the default browser stack

    get "/", PageController, :index
    resources "/workers", WorkerController do
      put "/start", WorkerController, :start
      put "/stop", WorkerController, :stop
    end
  end

  # Other scopes may use custom stacks.
  # scope "/api", HelloWorld do
  #   pipe_through :api
  # end
end

確認

 コードを書いたら、サーバを起動してみます。以下のように定義したプロセスの立ち上がる様子が確認できると思います。

$ mix phoenix.server
[info] MQTT connection established
[info] Running HelloWorld.Endpoint with Cowboy on http://localhost:4000
[info] [mqtt_worker_supervisor: #PID<0.282.0>] supervisor start
[info] [MqttWorkkerManager] manager start
[debug] SELECT w0."id", w0."name", w0."status", w0."inserted_at", w0."updated_at
" FROM "workers" AS w0 WHERE (w0."status" = 'running') [] OK query=71.7ms queue=
2.4ms
10 Oct 15:19:04 - info: compiled 3 files into 2 files, copied 3 in 1910ms

 この状態でウェブ画面の方から、viewに定義したとおり、MqttWorkerを立ち上げるリンク(start/stop)をクリックしてみます。

workers.png

 すると以下の通り、起動したワーカー上でMQTTのメッセージのやり取りを確認できました。ちなみに$SYS/VerneMQ@127.0.0.1/~といったメッセージは、時雨堂さまの開発ブログで確認させて頂いたところ、MQTTの利用に関係する統計情報だそうです。

[info] POST /workers/2/start
[debug] Processing by HelloWorld.WorkerController.start/2
  Parameters: %{"_csrf_token" => "Iw1VCScwUX1/BxUcYgQiBiZ9BgUnJgAAVZ/akW2ELKdW8u
RPN5jBLg==", "_method" => "put", "format" => "html", "worker_id" => "2"}
  Pipelines: [:browser]
[debug] SELECT w0."id", w0."name", w0."status", w0."inserted_at", w0."updated_at
" FROM "workers" AS w0 WHERE (w0."id" = $1) [2] OK query=0.7ms
[debug] BEGIN [] OK query=0.3ms queue=0.1ms
[debug] UPDATE "workers" SET "status" = $1, "updated_at" = $2 WHERE "id" = $3 ["
running", {{2015, 10, 10}, {6, 19, 4, 0}}, 2] OK query=0.6ms
[debug] COMMIT [] OK query=1.0ms
[info] [worker: #PID<0.289.0>] initing with %HelloWorld.Worker{__meta__: %Ecto.Schema.Metadata{source: "workers", state: :loaded}, id: 2, inserted_at: #Ecto.DateTime<2015-10-04T05:24:53Z>, name: "bar", status: "stop", updated_at: #Ecto.DateTime<2015-10-10T03:50:54Z>}
[info] [MqttWorkkerManager] worker #PID<0.289.0> started
[info] [worker: #PID<0.289.0>] MQTT connection established
[info] Sent 302 in 172ms
[debug] [worker: #PID<0.289.0>] MQTT server subscribed: [{"#", 1}]
[info] GET /workers
[debug] Processing by HelloWorld.WorkerController.index/2
  Parameters: %{"format" => "html"}
  Pipelines: [:browser]
[debug] SELECT w0."id", w0."name", w0."status", w0."inserted_at", w0."updated_at" FROM "workers" AS w0 [] OK query=0.7ms
[info] Sent 200 in 82ms
[debug] [worker: #PID<0.289.0>] MQTT server published {'$SYS/VerneMQ@127.0.0.1/memory/processes', "79689824"}
[debug] [worker: #PID<0.289.0>] MQTT server published {'$SYS/VerneMQ@127.0.0.1/connects/received/mean', "1"}
[debug] [worker: #PID<0.289.0>] MQTT server published {'$SYS/VerneMQ@127.0.0.1/memory/total', "187362080"}
[debug] [worker: #PID<0.289.0>] MQTT server published {'$SYS/VerneMQ@127.0.0.1/bytes/received/min', "8"}
[debug] MQTT server subscribed: [{'phoenix:live_reload', 0}]
[debug] [worker: #PID<0.289.0>] MQTT server published {'$SYS/VerneMQ@127.0.0.1/publishes/dropped/median', "0"}
[debug] [worker: #PID<0.289.0>] MQTT server published {'$SYS/VerneMQ@127.0.0.1/bytes/received/median', "32"}

 同記事では#指定によっては取得できないとの記載がありましたが、vernemq/gen_emqttの組み合わせでは、#指定により取得できてしまうみたいですね。MqttWorkerを起動後、放っておくと延々とログが出力され続けます。

 この辺りがMQTT界隈のサイヤ人であるVの方曰く、ちゃんと仕様に沿って作られたブローカーが少ないと呟かれるが所以なのかもしれません。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away