最近、趣味でプログラミング言語Elixir、IoTプラットフォームNerves、WebフレームワークPhoenixを使用して、モダンで快適なIoT開発を楽しんでいます。
autoracex主催者@torifukukaiouさんの記事を参考に自宅の温度と湿度をリアルタイムで監視できるシステムを作りました。LiveViewについてはThe Pragmatic StudioのLiveViewオンラインコースで勉強しています。
APIサーバーがセンサー測定値を受け入れ、リアルタイムダッシュボードを見ているすべてのユーザーに対して、更新情報をプッシュします。
今日は、Phoenix LiveViewとPhoenix PubSub使用してのリアルタイムにページ更新することについてまとめます。
4/3(土) 00:00〜 4/5(月) 23:59開催のautoracex #21での成果です。
メッセージ購読および配信するため関数を実装
まず、特定のコンテキストモジュールでメッセージを購読および配信するための関数を準備します。トピックとしてinspect(__MODULE__)
を使用しています。そうすることで、トピック名を一意にできますし、トピック名の決定やトピック名の発見にかかる時間を節約できます。
defmodule Example.Environment do
...
+
+ @topic inspect(__MODULE__)
+
+ @doc """
+ Subscribe to this context module's messages.
+ """
+ def subscribe do
+ Phoenix.PubSub.subscribe(Example.PubSub, @topic)
+ end
+
+ @doc """
+ Broadcast a message to the subscribers when something happens.
+ """
+ def broadcast({:ok, record}, event) do
+ Phoenix.PubSub.broadcast(Example.PubSub, @topic, {event, record})
+ {:ok, record}
+ end
+
+ def broadcast({:error, _} = error, _event), do: error
+
必要に応じてメッセージ配信
必要に応じてメッセージを配信します。次の例では、新しく挿入されたレコードをすべての購読者に通知します。
defmodule Example.Environment do
...
@doc """
Creates a measurement.
## Examples
iex> create_measurement(%{field: value})
{:ok, %Measurement{}}
iex> create_measurement(%{field: bad_value})
{:error, %Ecto.Changeset{}}
"""
def create_measurement(attrs \\ %{}) do
%Measurement{}
|> Measurement.changeset(attrs)
|> Repo.insert()
+ |> broadcast(:measurement_inserted)
end
LiveView
接続完了後にメッセージ購読
defmodule ExampleWeb.EnvironmentLive do
use ExampleWeb, :live_view
...
@impl true
def mount(_params, _session, socket) do
+ if connected?(socket) do
+ Environment.subscribe()
+ end
...
{:ok, socket, temporary_assigns: [measurements: []]}
end
必要に応じてメッセージ処理
購読したトピックのメッセージがどんどん送られてきますが、それを処理するにはhandle_info/2
を実装する必要があります。パターンマッチしてイベントごとに実装します。
defmodule ExampleWeb.EnvironmentLive do
use ExampleWeb, :live_view
...
+ def handle_info({:measurement_inserted, new_measurement}, socket) do
+ {:noreply, assign(socket, last_measurement: new_measurement)}
+ end
着信メッセージのスロットリング
着信メッセージによりダッシュボードをリアルタイムに更新するのですが、(すべてのメッセージに対して処理するのではなく)予めLiveView
側で指定した周期で更新したいです。そうすることでリクエストが殺到したときにLiveView
のレンダリングが追いつかなくなることを回避できます。
def handle_info({:measurement_inserted, new_measurement}, socket) do
- {:noreply, assign(socket, last_measurement: new_measurement)}
+ if refresh_interval_elapsed?(socket) do
+ {:noreply, assign(socket, last_measurement: new_measurement)}
+ else
+ {:noreply, socket}
+ end
end
+
+ # Check if the refresh interval has elapsed. (next_refresh >= now)
+ defp refresh_interval_elapsed?(socket) do
+ next_refresh = DateTime.add(socket.assigns.last_measurement.measured_at, socket.assigns.refresh_interval)
+
+ case DateTime.compare(DateTime.utc_now(), next_refresh) do
+ :gt -> true
+ :eq -> true
+ _ -> false
+ end
+ end
(IPごとに)APIエンドポイントに対してgrempe/ex_ratedを用いてレート制限もかけようと考えています。
# enforce a rate limit of no more than 5 calls in 10 seconds
iex> ExRated.check_rate("my-rate-limited-api", 10_000, 5)
{:ok, 1}
iex> ExRated.check_rate("my-rate-limited-api", 10_000, 5)
{:ok, 2}
iex> ExRated.check_rate("my-rate-limited-api", 10_000, 5)
{:ok, 3}
iex> ExRated.check_rate("my-rate-limited-api", 10_000, 5)
{:ok, 4}
iex> ExRated.check_rate("my-rate-limited-api", 10_000, 5)
{:ok, 5}
iex> ExRated.check_rate("my-rate-limited-api", 10_000, 5)
{:error, 5}
おしまい。