今回は Pub/Sub 機構を使って入出力デバイスを関連付ける方法にトライしてみます。Pub/Sub には Elixir の GenStage を使います。ハードウェアは BeagleBone Green を使ってみました。
はじめに
以前より、入力デバイスをPublisherに、出力デバイスをSubscriberにして、トピックを共有することで入出力デバイスを結びつけることでIoT装置のプログラミングができるだろうと考えていました。中間的な処理を行う部分は入力デバイスをsubしてデータを処理したあとに出力デバイスにpubするようにすれば、全部がPub/Sub機構の上にのるのではないかと。
これを実現するのにElixir/ErlangのPub/Sub機構を色々と調べてきました。基本的にはプロセスに名前をつけるレジストリライブラリがあれば、入出力デバイスをプロセスにすることでPub/Subが可能になります。レジストリ機構には :global, :gproc, :syn, Registry, Swarm などがあり、調べた限りではどれも帯に短し襷に長しで入出力の Pub/Sub にスポンとハマるのがありません。今回使う GenStage も後で述べるようにオススメではありませんが、大変ユニークなバックプレッシャという原理を使う他にはない Pub/Sub で、GenStage ならではのプログラムができるので、ここに披露する次第です。
GenStage ライブラリ
Elixir の GenStage は Flow を実現するために裏で動いているプロセス間の Pub/Sub の実装です。今年になって version が 1.0 になりました。
GenStage は公式ドキュメントはしっかり書いてあるようですが、機能が豊富でドキュメントの量が多く、その割に例が少ないので理解が難しいです。Qiita には ElixirのGenStageに入門する #1 という記事がありますので、こちらも参照してください。
BeagleBone のボードシリーズ
これまでの「はじめてNerves」ではラズベリーパイ3Bを使ってきました。色々調べてるとBeagleBoneというのも良さそうです。
初代の BeagleBoard から始まって BeagleBone Black そして BeagleBone Green というボードなどが出ています。このなかでも BeagleBoard Green は Grove インタフェースのI/Oセンサを出している Seeed 社による製品で、Grove IF が2つついていたり、余計な HDMI がなくなっていたりとか土佐弁で言うところのボッチリ1な製品です。価格も5000円前後でラズパイと遜色ないです。PocketBeagle というラズパイ0に相当するような小さなボードも出ています。
そして BeagleBone シリーズの何が良いって「オープンソースハードウェア」なことです。元の回路を参考にやろうと思えばオリジナルボードも作れますし、何よりブラックボックスなところがありません。ラズパイだとロックインしてしまうと逃れられなくなる可能性がありますが、BeagleBone だと安全です。いよいよとなったら自分でなんとかできそうな安心が得られます。というところでこのシリーズは暫く BeagleBone で進めていこうと思っています。
GPIO入出力
BeagleBone の入出力ピンはそもそものピン数が多い上に、モードの概念があってやたらといろんな動作に使えます。以下の図は Seeed Studio BeagleBone Green - Seeed Wiki からのピン配置です。
今回の BeagleBone Green の場合は拡張コネクタのP9を使って
- GPIO20: タクトスイッチ
- GPIO49: LED
としました。入力回路は以下に準じましたが、内部にプルアップ・プルダウン抵抗を持つようでしたので(要確認)スイッチしか配線してないです。
出力の LED も Circuits.GPIO の例に準じてて、違っているのは 100Ωとある電流制限抵抗に 330Ωをつかっているところです。
Nerves の準備
BeagleBone の I/O を GenStage を使った Pub/Sub で制御してみましょう。と言っても今回はボタンを押したらLEDが点くだけの(私が言うところの通称)ボタチカです。
まず、Nerves の初期化からやります。Nervesは1.10から nerves-pack が導入されてネットワーク周りの設定が簡便になりました。以下のように mix コマンドで開発用ディレクトリを作成します。
$ mix nerves.new exineris --nerves-pack
$ cd exineris
次に必要なライブラリを指定します。Pub/Subに使う GenStage と GPIO 制御用の Circuits.GPIO ライブラリを指定します。あと、今回はプロセスレジストリに :syn ver 2.1 を使いましたので、それも記述しておきます。ただ今回のように同一ホストの同一ノードで実行するならこれは過剰というか不要です。
defp deps do
[
...
{:syn, "~> 2.1"},
{:gen_stage, "~> 1.0"},
{:circuits_gpio, "~> 0.4"},
...
]
end
ここまでで一旦ファームウェアを作れるか確認してみます。ラズパイの場合と異なるのが以下の先頭行でターゲット名を bbb
で指定します。
$ export MIX_TARGET=bbb
$ mix deps.get
$ mix firmware
ここまでできたらまずは一安心です。
次にネットワークの設定をします。config/target.exs
を編集します。これは各自の環境に合わせてやってください。詳細は nerves_pack(vintage_net含む)を使ってNervesのネットワーク設定をした〜SSHログインまで〜 などを見てくだい。
最初はSDカードに焼いてください。次回からはネットワーク経由でファームウェアを焼きます。
$ mix firmware.burn
焼きたての SDカードを BeagleBone Green に挿して電源を入れてください。電源は母艦PCからのUSBケーブルで行います。そのときに SDカードスロットのすぐ近くにある「USERボタン」というのを押しながら電源を入れてください。そうするとSDカードからブートします。そうしないと eMMC にプリインストールしてある OS でブートしてしまいます。
うまくブートできたら USB シリアル上の Virtual Ethernet で ssh でログインできます。接続して iex が出てくるか確認してください。
$ ssh nerves.local
Warning: Permanently added 'nerves.local,172.31.254.129' (RSA) to the list of known hosts.
Interactive Elixir (1.10.3) - press Ctrl+C to exit (type h() ENTER for help)
Toolshed imported. Run h(Toolshed) for more info.
RingLogger is collecting log messages from Elixir and Linux. To see the
messages, either attach the current IEx session to the logger:
RingLogger.attach
or print the next messages in the log:
RingLogger.next
iex(1)>
SD カードを焼くのは次回からは以下で可能になります。物理的にSDカードを外したり挿したりがなくなるので大変便利です。
$ mix firmware.gen.script
$ ./upload.sh
プログラムの内容
今回は lib/exineris.ex
にプログラムを書いていきます。まず必要なライブラリを引っ張ってきます。
defmodule Exineris do
use GenStage
require Circuits.GPIO
require Logger
入出力デバイスごとにプロセスを作って名前をつけます。外部APIは Exineris.start_link/3
で、デバイス名、GPIO番号、入出力別を記述します。コールバック関数の init/3
は :input と :output で実装を分け、:input の場合は割込処理を宣言しておきます。
def start_link(name, gpio_no, in_out) do
Logger.debug("#{__MODULE__} start_link: #{inspect(name)}, #{gpio_no}, #{in_out}}")
GenStage.start_link(__MODULE__, {name, gpio_no, in_out}, [name: {:via, :syn, name}])
end
@impl GenStage
def init({name, gpio_no, in_out = :input}) do
{:ok, gpioref} = Circuits.GPIO.open(gpio_no, in_out)
Circuits.GPIO.set_interrupts(gpioref, :both)
{:producer, {name, gpioref}}
end
@impl GenStage
def init({name, gpio_no, in_out = :output}) do
{:ok, gpioref} = Circuits.GPIO.open(gpio_no, in_out)
{:consumer, {name, gpioref}}
end
デバイスの書出し・読込みの API は Exineris.write/2
と Exineris.read/1
です。それぞれコールバックの handle_cast/2
と handle_call/3
を実装しておきます。GenStage
の場合は GenServer
と違って返り値が3要素になることに注意してください。
def write(name, :true), do: GenStage.cast(:syn.whereis(name), {:write, 1})
def write(name, :false), do: GenStage.cast(:syn.whereis(name), {:write, 0})
def write(name, val), do: GenStage.cast(:syn.whereis(name), {:write, val})
def read(name), do: GenStage.call(:syn.whereis(name), :read)
@impl GenStage
def handle_cast({:write, val}, {name, gpioref}) do
Logger.debug("#{__MODULE__} handle_call write: #{inspect({val, name, gpioref})}")
Circuits.GPIO.write(gpioref, val)
{:noreply, [], {name, gpioref}}
end
@impl GenStage
def handle_call(:read, _from, {name, gpioref}) do
Logger.debug("#{__MODULE__} handle_call read: #{inspect({name, gpioref})}")
{:reply, {:ok, Circuits.GPIO.read(gpioref)}, [], {name, gpioref}}
end
プロセスを停止させる API Exineris.stop/1
も作っています。これを呼び出すと GPIO の当該インタフェースを閉じてプロセスが終了します。明示的に書いていますが、おそらくなくても停止の際は開いている PID や REF をきれいに閉じてくれると思うので、動作に支障はないでしょう。
def stop(name), do: GenStage.stop(:syn.whereis(name))
@impl GenStage
def terminate(reason, gpioref) do
Circuits.GPIO.close(gpioref)
reason
end
入力で割込みがかかるとコールバック関数 handle_info/2
が呼ばれるのでそれの対処を記述しておきます。今回はデバッグ用に割込みがかかった旨をログ出力するだけです。ここも GenServer
と異なり GenStage
の返り値は3つ組になりますので注意です。
@impl GenStage
def handle_info({:circuits_gpio, gpiono, time, val}, {name, gpioref}) do
Logger.debug("#{__MODULE__} handle_info: :gpio #{inspect({gpiono, time, val})}")
{:noreply, [val], {name, gpioref}}
end
Pub/Sub の実装
大体のお膳立てができたので Pub/Sub の中身を記述します。
handle_demand による Publisher の記述
まず、Publisher を記述します。handle_demand
関数で記述します。これは外部 API から明示的に起動するのではなくて、後で述べる Subscriber プロセスの handle_events
関数から呼び出されます。どのプロセスの handle_events
関数から呼ばれるかはやはり後で述べる API関数 ssusbscribe/2
で明示的に指定します。
@impl GenStage
def handle_demand(demand, {name, gpioref}) do
val = Circuits.GPIO.read(gpioref)
Logger.debug("#{__MODULE__} handle_demand: #{inspect({demand, name, gpioref, val})}")
{:noreply, [val], {name, gpioref}}
end
これだけです。要は GPIO を読みに行くだけです。あえてPublishするという動作の記述が必要ありません。ログを出さないなら以下だけで済みます。
@impl GenStage
def handle_demand(demand, {name, gpioref}) do
{:noreply, [Circuits.GPIO.read(gpioref)], {name, gpioref}}
end
このとき返り値をリストで渡すことに注意してください。
handle_events による Subscriber の記述
次に Subscriber です。handle_events
関数で記述しますが、これは外部 API から明示的に起動するのではなく「プロセス内で勝手に自走」します。
@impl GenStage
def handle_events(event, _from, {name, gpioref}) do
Logger.debug("#{__MODULE__} handle_event: #{inspect({event, name, gpioref})}")
Circuits.GPIO.write(gpioref, hd(event))
Process.sleep(100)
{:noreply, [], {name, gpioref}}
end
これは以下の意味になります。
- Publisher にデータを貰いに行く
- Publisher からのデータは event 変数に入る
- event 変数の内容を GPIO に write 出力する
- 100ms 待つ
もらうデータはリスト型です。後で述べるように要素が1つだけからなるリストにするので、値はリストの先頭を取るようにします。この関数も待ち時間なしでログ出力をしないように書くと以下のように簡便になります。
@impl GenStage
def handle_events(event, _from, {name, gpioref}) do
Circuits.GPIO.write(gpioref, hd(event))
{:noreply, [], {name, gpioref}}
end
sync_subscribe で Pub と Sub とを連結する
さて Pub 側と Sub 側との記述ができたので、両者を結びつける関数 ssubscribe/2
を記述します。第1引数が Sub 側のプロセス名、第2引数が Pub 側のプロセス名です。
def ssubscribe(consumer, producer) do
GenStage.sync_subscribe(:syn.whereis(consumer),
to: :syn.whereis(producer),
max_demand: 1, min_demand: 0)
end
最後のオプション引数の max_demand: 1
と min_demand: 0
とは「一度にやり取りするデータの個数を1つにする」ことを指示します。これをすることで「出力側が入力側にデータをただ1つだけ要求して、それが返ってくるとそのデータを出力する」という動作を行います。また、この指定により、pub側とsub側とでやり取りするリストの要素数が1になります。
なお、よくある Pub/Sub の例と異なり、特定の Pub プロセスと Sub プロセスとを明示的に結びつけるような関数であることに注意してください。通常の Pub/Sub モデルでは「トピック」の概念があって、共通のトピックを持つ Pub と Sub とが互いを知らずにデータ交換しますが、今回はそのような動作にはなっていません。
プログラムの全貌
以上をまとめたプログラムの全体を以下に改めて載せておきます。
defmodule Exineris do
use GenStage
require Circuits.GPIO
require Logger
def start_link(name, gpio_no, in_out) do
Logger.debug("#{__MODULE__} start_link: #{inspect(name)}, #{gpio_no}, #{in_out}}")
GenStage.start_link(__MODULE__, {name, gpio_no, in_out}, [name: {:via, :syn, name}])
end
@impl GenStage
def init({name, gpio_no, in_out = :input}) do
{:ok, gpioref} = Circuits.GPIO.open(gpio_no, in_out)
Circuits.GPIO.set_interrupts(gpioref, :both)
{:producer, {name, gpioref}}
end
@impl GenStage
def init({name, gpio_no, in_out = :output}) do
{:ok, gpioref} = Circuits.GPIO.open(gpio_no, in_out)
{:consumer, {name, gpioref}}
end
def write(name, :true), do: GenStage.cast(:syn.whereis(name), {:write, 1})
def write(name, :false), do: GenStage.cast(:syn.whereis(name), {:write, 0})
def write(name, val), do: GenStage.cast(:syn.whereis(name), {:write, val})
def read(name), do: GenStage.call(:syn.whereis(name), :read)
def stop(name), do: GenStage.stop(:syn.whereis(name))
@impl GenStage
def handle_cast({:write, val}, {name, gpioref}) do
Logger.debug("#{__MODULE__} handle_call write: #{inspect({val, name, gpioref})}")
Circuits.GPIO.write(gpioref, val)
{:noreply, [], {name, gpioref}}
end
@impl GenStage
def handle_call(:read, _from, {name, gpioref}) do
Logger.debug("#{__MODULE__} handle_call read: #{inspect({name, gpioref})}")
{:reply, {:ok, Circuits.GPIO.read(gpioref)}, [], {name, gpioref}}
end
def ssubscribe(consumer, producer) do
GenStage.sync_subscribe(:syn.whereis(consumer), to: :syn.whereis(producer),
max_demand: 1, min_demand: 0)
end
@impl GenStage
def handle_demand(demand, {name, gpioref}) do
val = Circuits.GPIO.read(gpioref)
Logger.debug("#{__MODULE__} handle_demand: #{inspect({demand, name, gpioref, val})}")
{:noreply, [val], {name, gpioref}}
end
@impl GenStage
def handle_events(event, _from, {name, gpioref}) do
Logger.debug("#{__MODULE__} handle_event: #{inspect({event, name, gpioref})}")
Circuits.GPIO.write(gpioref, hd(event))
Process.sleep(100)
# Process.sleep(500)
{:noreply, [], {name, gpioref}}
end
@impl GenStage
def handle_info({:circuits_gpio, gpiono, time, val}, {name, gpioref}) do
Logger.debug("#{__MODULE__} handle_info: :gpio #{inspect({gpiono, time, val})}")
{:noreply, [val], {name, gpioref}}
end
@impl GenStage
def terminate(reason, gpioref) do
Circuits.GPIO.close(gpioref)
reason
end
end
実行してみる
GPIOの20番をボタンに49番をLEDに割当てて、それぞれPublisherとSubscriberとしてプロセスを立ち上げます。両者を結びつけると Pub/Sub が始まり 100ms で入力がポーリングされ、その結果がLEDに出力されます。
iex(1)> Exineris.start_link("button", 20, :input)
{:ok, #PID<0.1168.0>}
iex(2)> Exineris.start_link("led", 49, :output)
{:ok, #PID<0.1170.0>}
iex(3)> Exineris.ssubscribe("led", "button")
{:ok, #Reference<0.2258460604.268697601.204063>}
iex(4)>
まとめ
今回は GenStage
を使って Pub/Sub モデル風に入出力を結びつけてデバイスの動作の表現をするということをやってみました。
GenStage
を使うと出力側からの要求で全体が動作します。入力側からのイベントで動くようにはなっていません。このため デバイスを直接叩くようなプログラムにおいて今回のような手法は向いていない です。ただし、ノードが別だったり、ホストが別だったりするような、そして物理デバイスから離れて多少抽象的な動作をするプロセス間の通信に用いるのであれば、面白い使い方ができそうです。
今回の手法で、入力と出力との間で何らかの処理をするプロセスの記述ができないかも検討してみました。例えば2つの入力をSubscribeして論理積をPublishするようなプロセスが記述できれば、2つのボタンが押されたときだけLEDが点灯するようなプログラムを記述できるはずです。
しかしながら、今回のプログラムを素直に拡張するのは困難でした。handle_events
で受け取るデータは event
変数に格納されますが、データを受け取ったときに「どの Publisher から受け取ったデータなのか」を判別する方法がないのです。sync_subscribe
関数では2つのプロセスを結びつけるときに reference
を作りますが、プロセスと違ってリファレンスには名前がつけられません。ですので、やろうとすると ssubscribe
関数を呼んだときに Pub/Sub 関係を自分で ETS
等のデータベースに保持するか、もしくは handle_demand
関数が値と一緒に自分が誰かをタプルにして渡す… というようなスマートでない手法が必要になります。
謝辞
これはサッポロビームのもくもく会で作成したものです。Sapporo.BEAM のみなさまに感謝します。
参考文献
- Elixir GenStage
- ElixirのGenStageに入門する #1
- ElixirのGenStageに入門する#2 バックプレッシャーを理解する
- nerves_pack(vintage_net含む)を使ってNervesのネットワーク設定をした〜SSHログインまで〜
- はじめてNerves(0)
- はじめてなElixir(0)
- はじめてな Elixir(29) Registry で Pub/Sub する
- はじめてな Elixir(31) Swarm で Pub/Sub する
- はじめてな Elixir(32) Syn (v2.1) で Pub/Sub する
-
ちょうどよい程度である様。 ↩