#はじめに
IoTという言葉が流行し始めて以後、M2M用プロトコルとして、世間のMQTTに対する意識は急上昇の最中にあります。自然と日の目を見るのはMQTTブローカーの存在でしょう。枯れたところではJMS実装のActiveMQから、最近ではerlang実装のemqttやvernemqなど、実に様々なブローカーが世間を賑わせております。
しかしながら、いざ実際にサービスとしてIoTなアプリケーションの開発を考えたとき、ネックとなるのはMQTTブローカーとは別のところにあります。ブローカーは基本的にパッケージとなっていて、簡単にインストールできます。管理UIが付属する製品も少なくありません。SaaSサービスを利用するという手もあります。
Publish(端末)側もデータを投げるだけなので、こちらも実装は比較的簡単です。
問題なのは端末を集約するSubscriber(データベース)側です。gem文化に甘やかされて育った底辺プログラマの私には、自前で大量のコネクションやスレッドを管理するような真似は無理ゲーです。キモオタがアキバでJKをナンパするようなものです。心に深い傷を負い、シクシクと泣きながら女装山脈をプレイする未来が容易に窺えます。
これに対する一つのアプローチとして、elixir(erlang/OTP)が良いのではないかと、昨今、そこはかとなく考えております。ということで、gen_emqttを用いたSubscriberのサンプルをPhoenix上に作成してみようと思います。言語仕様としてプロセスのマネージメントが規定されている同言語だからこその領分であります。
尚、以降の作業に関しては、以下の記事でelixirとphoenixをインストールしていることが前提となります。動作の確認はcentos6で行っておりますが、centos7でも動くと思います。デビアンマンは適宜読み替えて下さい。
#実装
##Postgresqlのインストール
PhoenixのデフォルトのデータベースはPostgresqlです。newの際にオプションを指定してMySQLにスイッチすることも可能です。今回はPhoenixの流儀に習ってPostgresqlで実装してゆきます。
$ sudo yum install postgresql-server
$ sudo /etc/init.d/postgresql initdb
$ sudo vi /var/lib/pgsql/data/pg_hba.conf
$ sudo /etc/init.d/postgresql start
Postgresqlはデフォルトでident認証が入っています。その設定は今回の主題とは離れますので、全てをtrustとして接続を許可してしまいます。
# TYPE DATABASE USER CIDR-ADDRESS METHOD
# "local" is for Unix domain socket connections only
local all all trust
# IPv4 local connections:
host all all 127.0.0.1/32 trust
# IPv6 local connections:
host all all ::1/128 trust
##Workerモデルの追加
$ mix phoenix.gen.html Worker workers name:string status:string
$ mix ecto.create
$ mix ecto.migrate
##今回追加する処理全体を管理するSupervisorの追加
$ vi lib/hello_world/mqtt_supervisor.ex
defmodule HelloWorld.MqttSupervisor do
use Supervisor
require Logger
alias HelloWorld.MqttWorkerSupervisor
alias HelloWorld.MqttWorkerManager
def start_link do
Supervisor.start_link(__MODULE__, [], name: MqttSupervisor)
end
def init(args) do
children = [
supervisor(MqttWorkerSupervisor, []),
worker(MqttWorkerManager, [[]])
]
options = [
strategy: :one_for_one
]
supervise(children, options)
end
end
##MqttWorkerを管理するMqttWorkerSupervisorの追加
$ vi lib/hello_world/mqtt_worker_supervisor.ex
defmodule HelloWorld.MqttWorkerSupervisor do
use Supervisor
require Logger
alias HelloWorld.MqttWorkerSupervisor
alias HelloWorld.MqttWorker
def start_link do
Supervisor.start_link(__MODULE__, [], name: MqttWorkerSupervisor)
end
def init(args) do
children = [
worker(MqttWorker, [], restart: :transient)
]
options = [
strategy: :simple_one_for_one
]
Logger.info("[mqtt_worker_supervisor: #{inspect self}] supervisor start")
supervise(children, options)
end
def start_worker(model) do
Supervisor.start_child(MqttWorkerSupervisor, [model])
end
def stop_worker(worker_pid) do
Supervisor.terminate_child(MqttWorkerSupervisor, worker_pid)
end
def count_workers do
Supervisor.count_children(MqttWorkerSupervisor)
end
end
##MqttWorkerを管理するMqttWorkerManagerの追加
$ vi lib/hello_world/mqtt_worker_manager.ex
defmodule HelloWorld.MqttWorkerManager do
use GenServer
require Logger
import Ecto.Query
alias HelloWorld.Repo
alias HelloWorld.Worker
alias HelloWorld.MqttWorker
alias HelloWorld.MqttWorkerSupervisor
def start_link(args) do
GenServer.start_link(__MODULE__, args, name: MqttWorkerManager)
end
def init(state) do
Logger.info("[MqttWorkkerManager] manager start")
workers = :ets.new(:workers, [:named_table, read_concurrency: true])
query = from w in Worker, where: w.status == "running"
Repo.all(query) |> Enum.each fn(model) -> MqttWorkerSupervisor.start_worker(model) end
{:ok, state}
end
def handle_call({:start_worker, model}, from, state) do
MqttWorkerSupervisor.start_worker(model)
{:reply, model, state}
end
def handle_call({:stop_worker, model}, from, state) do
case :ets.lookup(:workers, model.id) do
[{id, worker_pid}] ->
MqttWorkerSupervisor.stop_worker(worker_pid)
:ets.delete(:workers, id)
[] ->
Logger.warn("[MqttWorkerManager] worker #{inspect model.id} not stoped")
end
{:reply, model, state}
end
def handle_cast({:started_worker, model, worker_pid}, state) do
:ets.insert(:workers, {model.id, worker_pid})
Logger.info("[MqttWorkkerManager] worker #{inspect worker_pid} started")
{:noreply, state}
end
def start_worker(model) do
:gen_server.call(MqttWorkerManager, {:start_worker, model})
end
def stop_worker(model) do
:gen_server.call(MqttWorkerManager, {:stop_worker, model})
end
def worker_started(model, worker_pid) do
:gen_server.cast(MqttWorkerManager, {:started_worker, model, worker_pid})
end
end
##MqttWorkerの追加
$ vi lib/hello_world/mqtt_worker.ex
defmodule HelloWorld.MqttWorker do
require Logger
alias HelloWorld.MqttWorkerSupervisor
alias HelloWorld.MqttWorkerManager
alias HelloWorld.Worker
def start_link(model) do
state = %{
subscribe_qos: 1,
model: model
}
options = [
host: 'localhost',
port: 1883,
username: "admin",
password: "password",
client: UUID.uuid4(:hex) |> String.slice(0..12)
]
:gen_emqtt.start_link(__MODULE__, state, options)
end
def init(state) do
MqttWorkerManager.worker_started(state.model, self)
{:ok, state}
end
#
# emqtt callbacks
#
def on_connect(state) do
Logger.info("[worker: #{inspect self}] MQTT connection established")
:gen_emqtt.subscribe(self, "#", state.subscribe_qos)
{:ok, state}
end
def on_connect_error(reason, state) do
Logger.warn("[worker: #{inspect self}] MQTT connection error: #{inspect reason}")
{:ok, state}
end
def on_disconnect(state) do
Logger.info("[worker: #{inspect self}] MQTT server disconnected")
{:ok, state}
end
def on_subscribe([{topic, _qos}]=subscription, state) do
Logger.debug("[worker: #{inspect self}] MQTT server subscribed: #{inspect subscription}")
{:ok, state}
end
def on_unsubscribe([topic], state) do
Logger.debug("[worker: #{inspect self}] MQTT server unsubscribed: #{inspect topic}")
{:ok, state}
end
def on_publish(topic, msg, state) do
Logger.debug("[worker: #{inspect self}] MQTT server published #{inspect {topic, msg}}")
{:ok, state}
end
def handle_call(msg, _from, state) do
{:stop, {:error, {:unexpected_msg, msg}}, state}
end
def handle_cast(msg, state) do
{:stop, {:error, {:unexpected_msg, msg}}, state}
end
def handle_info(msg, state) do
Logger.warn("[worker: #{inspect self}] #{inspect msg}")
{:stop, {:error, {:unexpected_msg, msg}}, state}
end
def terminate(reason, _state) do
Logger.info("[worker: #{inspect self}] terminate worker: #{inspect reason}")
end
def code_change(_oldvsn, state, _extra), do: {:ok, state}
end
##MqttWorker管理Controllerの追加
$ vi web/controllers/worker_controller.ex
defmodule HelloWorld.WorkerController do
use HelloWorld.Web, :controller
alias HelloWorld.Worker
alias HelloWorld.MqttWorker
alias HelloWorld.MqttWorkerManager
plug :scrub_params, "worker" when action in [:create, :update]
def index(conn, _params) do
workers = Repo.all(Worker)
render(conn, "index.html", workers: workers)
end
def new(conn, _params) do
changeset = Worker.changeset(%Worker{})
render(conn, "new.html", changeset: changeset)
end
def create(conn, %{"worker" => worker_params}) do
changeset = Worker.changeset(%Worker{}, worker_params)
if changeset.valid? do
Repo.insert!(changeset)
conn
|> put_flash(:info, "Worker created successfully.")
|> redirect(to: worker_path(conn, :index))
else
render(conn, "new.html", changeset: changeset)
end
end
def show(conn, %{"id" => id}) do
worker = Repo.get!(Worker, id)
render(conn, "show.html", worker: worker)
end
def edit(conn, %{"id" => id}) do
worker = Repo.get!(Worker, id)
changeset = Worker.changeset(worker)
render(conn, "edit.html", worker: worker, changeset: changeset)
end
def update(conn, %{"id" => id, "worker" => worker_params}) do
worker = Repo.get!(Worker, id)
changeset = Worker.changeset(worker, worker_params)
if changeset.valid? do
Repo.update!(changeset)
conn
|> put_flash(:info, "Worker updated successfully.")
|> redirect(to: worker_path(conn, :index))
else
render(conn, "edit.html", worker: worker, changeset: changeset)
end
end
def delete(conn, %{"id" => id}) do
worker = Repo.get!(Worker, id)
Repo.delete!(worker)
conn
|> put_flash(:info, "Worker deleted successfully.")
|> redirect(to: worker_path(conn, :index))
end
def start(conn, %{"worker_id" => worker_id}) do
worker = Repo.get!(Worker, worker_id)
changeset = Worker.changeset(worker, %{"status" => "running"})
if changeset.valid? do
Repo.update!(changeset)
HelloWorld.MqttWorkerManager.start_worker(worker)
conn
|> put_flash(:info, "Worker updated successfully.")
|> redirect(to: worker_path(conn, :index))
else
render(conn, "edit.html", worker: worker, changeset: changeset)
end
end
def stop(conn, %{"worker_id" => worker_id}) do
worker = Repo.get!(Worker, worker_id)
changeset = Worker.changeset(worker, %{"status" => "stop"})
if changeset.valid? do
Repo.update!(changeset)
HelloWorld.MqttWorkerManager.stop_worker(worker)
conn
|> put_flash(:info, "Worker updated successfully.")
|> redirect(to: worker_path(conn, :index))
else
render(conn, "edit.html", worker: worker, changeset: changeset)
end
end
end
##MqttWorker管理Viewの追加
$ vi web/templates/worker/index.html.eex
<h2>Listing workers</h2>
<h3>ActiveWorkers: <%= inspect HelloWorld.MqttWorkerSupervisor.count_workers[:active] %></dd></h3>
<table class="table">
<thead>
<tr>
<th>Name</th>
<th>Status</th>
<th></th>
</tr>
</thead>
<tbody>
<%= for worker <- @workers do %>
<tr>
<td><%= worker.name %></td>
<td><%= worker.status %></td>
<td class="text-right">
<%= if worker.status === "running", do: link("Stop", to: worker_worker_path(@conn, :stop, worker), method: :put, class: "btn btn-default btn-xs") %>
<%= if worker.status === "stop", do: link("Start", to: worker_worker_path(@conn, :start, worker), method: :put, class: "btn btn-default btn-xs") %>
<%= link "Show", to: worker_path(@conn, :show, worker), class: "btn btn-default btn-xs" %>
<%= link "Edit", to: worker_path(@conn, :edit, worker), class: "btn btn-default btn-xs" %>
<%= link "Delete", to: worker_path(@conn, :delete, worker), method: :delete, class: "btn btn-danger btn-xs" %>
</td>
</tr>
<% end %>
</tbody>
</table>
<%= link "New worker", to: worker_path(@conn, :new) %>
##ルーティングの追加
$ vi web/router.ex
defmodule HelloWorld.Router do
use HelloWorld.Web, :router
pipeline :browser do
plug :accepts, ["html"]
plug :fetch_session
plug :fetch_flash
plug :protect_from_forgery
end
pipeline :api do
plug :accepts, ["json"]
end
scope "/", HelloWorld do
pipe_through :browser # Use the default browser stack
get "/", PageController, :index
resources "/workers", WorkerController do
put "/start", WorkerController, :start
put "/stop", WorkerController, :stop
end
end
# Other scopes may use custom stacks.
# scope "/api", HelloWorld do
# pipe_through :api
# end
end
#確認
コードを書いたら、サーバを起動してみます。以下のように定義したプロセスの立ち上がる様子が確認できると思います。
$ mix phoenix.server
[info] MQTT connection established
[info] Running HelloWorld.Endpoint with Cowboy on http://localhost:4000
[info] [mqtt_worker_supervisor: #PID<0.282.0>] supervisor start
[info] [MqttWorkkerManager] manager start
[debug] SELECT w0."id", w0."name", w0."status", w0."inserted_at", w0."updated_at
" FROM "workers" AS w0 WHERE (w0."status" = 'running') [] OK query=71.7ms queue=
2.4ms
10 Oct 15:19:04 - info: compiled 3 files into 2 files, copied 3 in 1910ms
この状態でウェブ画面の方から、viewに定義したとおり、MqttWorkerを立ち上げるリンク(start/stop)をクリックしてみます。
すると以下の通り、起動したワーカー上でMQTTのメッセージのやり取りを確認できました。ちなみに$SYS/VerneMQ@127.0.0.1/~といったメッセージは、時雨堂さまの開発ブログで確認させて頂いたところ、MQTTの利用に関係する統計情報だそうです。
[info] POST /workers/2/start
[debug] Processing by HelloWorld.WorkerController.start/2
Parameters: %{"_csrf_token" => "Iw1VCScwUX1/BxUcYgQiBiZ9BgUnJgAAVZ/akW2ELKdW8u
RPN5jBLg==", "_method" => "put", "format" => "html", "worker_id" => "2"}
Pipelines: [:browser]
[debug] SELECT w0."id", w0."name", w0."status", w0."inserted_at", w0."updated_at
" FROM "workers" AS w0 WHERE (w0."id" = $1) [2] OK query=0.7ms
[debug] BEGIN [] OK query=0.3ms queue=0.1ms
[debug] UPDATE "workers" SET "status" = $1, "updated_at" = $2 WHERE "id" = $3 ["
running", {{2015, 10, 10}, {6, 19, 4, 0}}, 2] OK query=0.6ms
[debug] COMMIT [] OK query=1.0ms
[info] [worker: #PID<0.289.0>] initing with %HelloWorld.Worker{__meta__: %Ecto.Schema.Metadata{source: "workers", state: :loaded}, id: 2, inserted_at: #Ecto.DateTime<2015-10-04T05:24:53Z>, name: "bar", status: "stop", updated_at: #Ecto.DateTime<2015-10-10T03:50:54Z>}
[info] [MqttWorkkerManager] worker #PID<0.289.0> started
[info] [worker: #PID<0.289.0>] MQTT connection established
[info] Sent 302 in 172ms
[debug] [worker: #PID<0.289.0>] MQTT server subscribed: [{"#", 1}]
[info] GET /workers
[debug] Processing by HelloWorld.WorkerController.index/2
Parameters: %{"format" => "html"}
Pipelines: [:browser]
[debug] SELECT w0."id", w0."name", w0."status", w0."inserted_at", w0."updated_at" FROM "workers" AS w0 [] OK query=0.7ms
[info] Sent 200 in 82ms
[debug] [worker: #PID<0.289.0>] MQTT server published {'$SYS/VerneMQ@127.0.0.1/memory/processes', "79689824"}
[debug] [worker: #PID<0.289.0>] MQTT server published {'$SYS/VerneMQ@127.0.0.1/connects/received/mean', "1"}
[debug] [worker: #PID<0.289.0>] MQTT server published {'$SYS/VerneMQ@127.0.0.1/memory/total', "187362080"}
[debug] [worker: #PID<0.289.0>] MQTT server published {'$SYS/VerneMQ@127.0.0.1/bytes/received/min', "8"}
[debug] MQTT server subscribed: [{'phoenix:live_reload', 0}]
[debug] [worker: #PID<0.289.0>] MQTT server published {'$SYS/VerneMQ@127.0.0.1/publishes/dropped/median', "0"}
[debug] [worker: #PID<0.289.0>] MQTT server published {'$SYS/VerneMQ@127.0.0.1/bytes/received/median', "32"}
同記事では#指定によっては取得できないとの記載がありましたが、vernemq/gen_emqttの組み合わせでは、#指定により取得できてしまうみたいですね。MqttWorkerを起動後、放っておくと延々とログが出力され続けます。
この辺りがMQTT界隈のサイヤ人であるVの方曰く、ちゃんと仕様に沿って作られたブローカーが少ないと呟かれるが所以なのかもしれません。