Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
Help us understand the problem. What is going on with this article?

はじめてNerves(9) GenStage での Pub/Sub でボタチカしてみる

今回は Pub/Sub 機構を使って入出力デバイスを関連付ける方法にトライしてみます。Pub/Sub には Elixir の GenStage を使います。ハードウェアは BeagleBone Green を使ってみました。

はじめに

以前より、入力デバイスをPublisherに、出力デバイスをSubscriberにして、トピックを共有することで入出力デバイスを結びつけることでIoT装置のプログラミングができるだろうと考えていました。中間的な処理を行う部分は入力デバイスをsubしてデータを処理したあとに出力デバイスにpubするようにすれば、全部がPub/Sub機構の上にのるのではないかと。

これを実現するのにElixir/ErlangのPub/Sub機構を色々と調べてきました。基本的にはプロセスに名前をつけるレジストリライブラリがあれば、入出力デバイスをプロセスにすることでPub/Subが可能になります。レジストリ機構には :global, :gproc, :syn, Registry, Swarm などがあり、調べた限りではどれも帯に短し襷に長しで入出力の Pub/Sub にスポンとハマるのがありません。今回使う GenStage も後で述べるようにオススメではありませんが、大変ユニークなバックプレッシャという原理を使う他にはない Pub/Sub で、GenStage ならではのプログラムができるので、ここに披露する次第です。

GenStage ライブラリ

Elixir の GenStageFlow を実現するために裏で動いているプロセス間の Pub/Sub の実装です。今年になって version が 1.0 になりました。

GenStage は公式ドキュメントはしっかり書いてあるようですが、機能が豊富でドキュメントの量が多く、その割に例が少ないので理解が難しいです。Qiita には ElixirのGenStageに入門する #1 という記事がありますので、こちらも参照してください。

BeagleBone のボードシリーズ

これまでの「はじめてNerves」ではラズベリーパイ3Bを使ってきました。色々調べてるとBeagleBoneというのも良さそうです。

初代の BeagleBoard から始まって BeagleBone Black そして BeagleBone Green というボードなどが出ています。このなかでも BeagleBoard Green は Grove インタフェースのI/Oセンサを出している Seeed 社による製品で、Grove IF が2つついていたり、余計な HDMI がなくなっていたりとか土佐弁で言うところのボッチリ1な製品です。価格も5000円前後でラズパイと遜色ないです。PocketBeagle というラズパイ0に相当するような小さなボードも出ています。

alt

そして BeagleBone シリーズの何が良いって「オープンソースハードウェア」なことです。元の回路を参考にやろうと思えばオリジナルボードも作れますし、何よりブラックボックスなところがありません。ラズパイだとロックインしてしまうと逃れられなくなる可能性がありますが、BeagleBone だと安全です。いよいよとなったら自分でなんとかできそうな安心が得られます。というところでこのシリーズは暫く BeagleBone で進めていこうと思っています。

GPIO入出力

BeagleBone の入出力ピンはそもそものピン数が多い上に、モードの概念があってやたらといろんな動作に使えます。以下の図は Seeed Studio BeagleBone Green - Seeed Wiki からのピン配置です。

alt

今回の BeagleBone Green の場合は拡張コネクタのP9を使って

  • GPIO20: タクトスイッチ
  • GPIO49: LED

としました。入力回路は以下に準じましたが、内部にプルアップ・プルダウン抵抗を持つようでしたので(要確認)スイッチしか配線してないです。

alt

出力の LED も Circuits.GPIO の例に準じてて、違っているのは 100Ωとある電流制限抵抗に 330Ωをつかっているところです。
alt

Nerves の準備

BeagleBone の I/O を GenStage を使った Pub/Sub で制御してみましょう。と言っても今回はボタンを押したらLEDが点くだけの(私が言うところの通称)ボタチカです。

まず、Nerves の初期化からやります。Nervesは1.10から nerves-pack が導入されてネットワーク周りの設定が簡便になりました。以下のように mix コマンドで開発用ディレクトリを作成します。

$ mix nerves.new exineris --nerves-pack
$ cd exineris

次に必要なライブラリを指定します。Pub/Subに使う GenStage と GPIO 制御用の Circuits.GPIO ライブラリを指定します。あと、今回はプロセスレジストリに :syn ver 2.1 を使いましたので、それも記述しておきます。ただ今回のように同一ホストの同一ノードで実行するならこれは過剰というか不要です。

mix.exs
  defp deps do
    [
      ...
      {:syn, "~> 2.1"},
      {:gen_stage, "~> 1.0"},
      {:circuits_gpio, "~> 0.4"},
      ...
    ]
  end

ここまでで一旦ファームウェアを作れるか確認してみます。ラズパイの場合と異なるのが以下の先頭行でターゲット名を bbb で指定します。

$ export MIX_TARGET=bbb
$ mix deps.get
$ mix firmware

ここまでできたらまずは一安心です。

次にネットワークの設定をします。config/target.exs を編集します。これは各自の環境に合わせてやってください。詳細は nerves_pack(vintage_net含む)を使ってNervesのネットワーク設定をした〜SSHログインまで〜 などを見てくだい。

最初はSDカードに焼いてください。次回からはネットワーク経由でファームウェアを焼きます。

$ mix firmware.burn

焼きたての SDカードを BeagleBone Green に挿して電源を入れてください。電源は母艦PCからのUSBケーブルで行います。そのときに SDカードスロットのすぐ近くにある「USERボタン」というのを押しながら電源を入れてください。そうするとSDカードからブートします。そうしないと eMMC にプリインストールしてある OS でブートしてしまいます。

うまくブートできたら USB シリアル上の Virtual Ethernet で ssh でログインできます。接続して iex が出てくるか確認してください。

$ ssh nerves.local
Warning: Permanently added 'nerves.local,172.31.254.129' (RSA) to the list of known hosts.
Interactive Elixir (1.10.3) - press Ctrl+C to exit (type h() ENTER for help)
Toolshed imported. Run h(Toolshed) for more info.
RingLogger is collecting log messages from Elixir and Linux. To see the
messages, either attach the current IEx session to the logger:

  RingLogger.attach

or print the next messages in the log:

  RingLogger.next

iex(1)> 

SD カードを焼くのは次回からは以下で可能になります。物理的にSDカードを外したり挿したりがなくなるので大変便利です。

$ mix firmware.gen.script
$ ./upload.sh

プログラムの内容

今回は lib/exineris.ex にプログラムを書いていきます。まず必要なライブラリを引っ張ってきます。

defmodule Exineris do
  use GenStage
  require Circuits.GPIO
  require Logger

入出力デバイスごとにプロセスを作って名前をつけます。外部APIは Exineris.start_link/3 で、デバイス名、GPIO番号、入出力別を記述します。コールバック関数の init/3 は :input と :output で実装を分け、:input の場合は割込処理を宣言しておきます。

  def start_link(name, gpio_no, in_out) do
    Logger.debug("#{__MODULE__} start_link: #{inspect(name)}, #{gpio_no}, #{in_out}}")
    GenStage.start_link(__MODULE__, {name, gpio_no, in_out}, [name: {:via, :syn, name}])
  end

  @impl GenStage
  def init({name, gpio_no, in_out = :input}) do
    {:ok, gpioref} = Circuits.GPIO.open(gpio_no, in_out)
    Circuits.GPIO.set_interrupts(gpioref, :both)
    {:producer, {name, gpioref}}
  end

  @impl GenStage
  def init({name, gpio_no, in_out = :output}) do
    {:ok, gpioref} = Circuits.GPIO.open(gpio_no, in_out)
    {:consumer, {name, gpioref}}
  end

デバイスの書出し・読込みの API は Exineris.write/2Exineris.read/1 です。それぞれコールバックの handle_cast/2handle_call/3 を実装しておきます。GenStage の場合は GenServer と違って返り値が3要素になることに注意してください。

  def write(name, :true), do: GenStage.cast(:syn.whereis(name), {:write, 1})
  def write(name, :false), do: GenStage.cast(:syn.whereis(name), {:write, 0})
  def write(name, val), do: GenStage.cast(:syn.whereis(name), {:write, val})
  def read(name), do: GenStage.call(:syn.whereis(name), :read)

  @impl GenStage
  def handle_cast({:write, val}, {name, gpioref}) do
    Logger.debug("#{__MODULE__} handle_call write: #{inspect({val, name, gpioref})}")
    Circuits.GPIO.write(gpioref, val)
    {:noreply, [], {name, gpioref}}
  end

  @impl GenStage
  def handle_call(:read, _from, {name, gpioref}) do
    Logger.debug("#{__MODULE__} handle_call read: #{inspect({name, gpioref})}")
    {:reply, {:ok, Circuits.GPIO.read(gpioref)}, [], {name, gpioref}}
  end

プロセスを停止させる API Exineris.stop/1 も作っています。これを呼び出すと GPIO の当該インタフェースを閉じてプロセスが終了します。明示的に書いていますが、おそらくなくても停止の際は開いている PID や REF をきれいに閉じてくれると思うので、動作に支障はないでしょう。

  def stop(name), do: GenStage.stop(:syn.whereis(name))

  @impl GenStage
  def terminate(reason, gpioref) do
    Circuits.GPIO.close(gpioref)
    reason
  end

入力で割込みがかかるとコールバック関数 handle_info/2 が呼ばれるのでそれの対処を記述しておきます。今回はデバッグ用に割込みがかかった旨をログ出力するだけです。ここも GenServer と異なり GenStage の返り値は3つ組になりますので注意です。

  @impl GenStage
  def handle_info({:circuits_gpio, gpiono, time, val}, {name, gpioref}) do
    Logger.debug("#{__MODULE__} handle_info: :gpio #{inspect({gpiono, time, val})}")
    {:noreply, [val], {name, gpioref}}
  end

Pub/Sub の実装

大体のお膳立てができたので Pub/Sub の中身を記述します。

handle_demand による Publisher の記述

まず、Publisher を記述します。handle_demand 関数で記述します。これは外部 API から明示的に起動するのではなくて、後で述べる Subscriber プロセスの handle_events 関数から呼び出されます。どのプロセスの handle_events 関数から呼ばれるかはやはり後で述べる API関数 ssusbscribe/2 で明示的に指定します。

  @impl GenStage
  def handle_demand(demand, {name, gpioref}) do
    val = Circuits.GPIO.read(gpioref)
    Logger.debug("#{__MODULE__} handle_demand: #{inspect({demand, name, gpioref, val})}")
    {:noreply, [val], {name, gpioref}}
  end

これだけです。要は GPIO を読みに行くだけです。あえてPublishするという動作の記述が必要ありません。ログを出さないなら以下だけで済みます。

  @impl GenStage
  def handle_demand(demand, {name, gpioref}) do
    {:noreply, [Circuits.GPIO.read(gpioref)], {name, gpioref}}
  end

このとき返り値をリストで渡すことに注意してください。

handle_events による Subscriber の記述

次に Subscriber です。handle_events 関数で記述しますが、これは外部 API から明示的に起動するのではなく「プロセス内で勝手に自走」します。

  @impl GenStage
  def handle_events(event, _from, {name, gpioref}) do
    Logger.debug("#{__MODULE__} handle_event: #{inspect({event, name, gpioref})}")
    Circuits.GPIO.write(gpioref, hd(event))
    Process.sleep(100)
    {:noreply, [], {name, gpioref}}
  end

これは以下の意味になります。

  • Publisher にデータを貰いに行く
    • Publisher からのデータは event 変数に入る
  • event 変数の内容を GPIO に write 出力する
  • 100ms 待つ

もらうデータはリスト型です。後で述べるように要素が1つだけからなるリストにするので、値はリストの先頭を取るようにします。この関数も待ち時間なしでログ出力をしないように書くと以下のように簡便になります。

  @impl GenStage
  def handle_events(event, _from, {name, gpioref}) do
    Circuits.GPIO.write(gpioref, hd(event))
    {:noreply, [], {name, gpioref}}
  end

sync_subscribe で Pub と Sub とを連結する

さて Pub 側と Sub 側との記述ができたので、両者を結びつける関数 ssubscribe/2 を記述します。第1引数が Sub 側のプロセス名、第2引数が Pub 側のプロセス名です。

  def ssubscribe(consumer, producer) do
    GenStage.sync_subscribe(:syn.whereis(consumer),
      to: :syn.whereis(producer),
      max_demand: 1, min_demand: 0)
  end

最後のオプション引数の max_demand: 1min_demand: 0 とは「一度にやり取りするデータの個数を1つにする」ことを指示します。これをすることで「出力側が入力側にデータをただ1つだけ要求して、それが返ってくるとそのデータを出力する」という動作を行います。また、この指定により、pub側とsub側とでやり取りするリストの要素数が1になります。

なお、よくある Pub/Sub の例と異なり、特定の Pub プロセスと Sub プロセスとを明示的に結びつけるような関数であることに注意してください。通常の Pub/Sub モデルでは「トピック」の概念があって、共通のトピックを持つ Pub と Sub とが互いを知らずにデータ交換しますが、今回はそのような動作にはなっていません。

プログラムの全貌

以上をまとめたプログラムの全体を以下に改めて載せておきます。

lib/exineris.ex
defmodule Exineris do
  use GenStage
  require Circuits.GPIO
  require Logger

  def start_link(name, gpio_no, in_out) do
    Logger.debug("#{__MODULE__} start_link: #{inspect(name)}, #{gpio_no}, #{in_out}}")
    GenStage.start_link(__MODULE__, {name, gpio_no, in_out}, [name: {:via, :syn, name}])
  end

  @impl GenStage
  def init({name, gpio_no, in_out = :input}) do
    {:ok, gpioref} = Circuits.GPIO.open(gpio_no, in_out)
    Circuits.GPIO.set_interrupts(gpioref, :both)
    {:producer, {name, gpioref}}
  end

  @impl GenStage
  def init({name, gpio_no, in_out = :output}) do
    {:ok, gpioref} = Circuits.GPIO.open(gpio_no, in_out)
    {:consumer, {name, gpioref}}
  end

  def write(name, :true), do: GenStage.cast(:syn.whereis(name), {:write, 1})
  def write(name, :false), do: GenStage.cast(:syn.whereis(name), {:write, 0})
  def write(name, val), do: GenStage.cast(:syn.whereis(name), {:write, val})
  def read(name), do: GenStage.call(:syn.whereis(name), :read)
  def stop(name), do: GenStage.stop(:syn.whereis(name))

  @impl GenStage
  def handle_cast({:write, val}, {name, gpioref}) do
    Logger.debug("#{__MODULE__} handle_call write: #{inspect({val, name, gpioref})}")
    Circuits.GPIO.write(gpioref, val)
    {:noreply, [], {name, gpioref}}
  end

  @impl GenStage
  def handle_call(:read, _from, {name, gpioref}) do
    Logger.debug("#{__MODULE__} handle_call read: #{inspect({name, gpioref})}")
    {:reply, {:ok, Circuits.GPIO.read(gpioref)}, [], {name, gpioref}}
  end

  def ssubscribe(consumer, producer) do
    GenStage.sync_subscribe(:syn.whereis(consumer), to: :syn.whereis(producer),
      max_demand: 1, min_demand: 0)
  end

  @impl GenStage
  def handle_demand(demand, {name, gpioref}) do
    val = Circuits.GPIO.read(gpioref)
    Logger.debug("#{__MODULE__} handle_demand: #{inspect({demand, name, gpioref, val})}")
    {:noreply, [val], {name, gpioref}}
  end

  @impl GenStage
  def handle_events(event, _from, {name, gpioref}) do
    Logger.debug("#{__MODULE__} handle_event: #{inspect({event, name, gpioref})}")
    Circuits.GPIO.write(gpioref, hd(event))
    Process.sleep(100)
#    Process.sleep(500)
    {:noreply, [], {name, gpioref}}
  end

  @impl GenStage
  def handle_info({:circuits_gpio, gpiono, time, val}, {name, gpioref}) do
    Logger.debug("#{__MODULE__} handle_info: :gpio #{inspect({gpiono, time, val})}")
    {:noreply, [val], {name, gpioref}}
  end

  @impl GenStage
  def terminate(reason, gpioref) do
    Circuits.GPIO.close(gpioref)
    reason
  end
end

実行してみる

GPIOの20番をボタンに49番をLEDに割当てて、それぞれPublisherとSubscriberとしてプロセスを立ち上げます。両者を結びつけると Pub/Sub が始まり 100ms で入力がポーリングされ、その結果がLEDに出力されます。

iex(1)> Exineris.start_link("button", 20, :input)
{:ok, #PID<0.1168.0>}
iex(2)> Exineris.start_link("led", 49, :output)
{:ok, #PID<0.1170.0>}
iex(3)> Exineris.ssubscribe("led", "button")
{:ok, #Reference<0.2258460604.268697601.204063>}
iex(4)> 

out.gif

まとめ

今回は GenStage を使って Pub/Sub モデル風に入出力を結びつけてデバイスの動作の表現をするということをやってみました。

GenStage を使うと出力側からの要求で全体が動作します。入力側からのイベントで動くようにはなっていません。このため デバイスを直接叩くようなプログラムにおいて今回のような手法は向いていない です。ただし、ノードが別だったり、ホストが別だったりするような、そして物理デバイスから離れて多少抽象的な動作をするプロセス間の通信に用いるのであれば、面白い使い方ができそうです。

今回の手法で、入力と出力との間で何らかの処理をするプロセスの記述ができないかも検討してみました。例えば2つの入力をSubscribeして論理積をPublishするようなプロセスが記述できれば、2つのボタンが押されたときだけLEDが点灯するようなプログラムを記述できるはずです。

しかしながら、今回のプログラムを素直に拡張するのは困難でした。handle_events で受け取るデータは event 変数に格納されますが、データを受け取ったときに「どの Publisher から受け取ったデータなのか」を判別する方法がないのです。sync_subscribe 関数では2つのプロセスを結びつけるときに reference を作りますが、プロセスと違ってリファレンスには名前がつけられません。ですので、やろうとすると ssubscribe 関数を呼んだときに Pub/Sub 関係を自分で ETS 等のデータベースに保持するか、もしくは handle_demand 関数が値と一緒に自分が誰かをタプルにして渡す… というようなスマートでない手法が必要になります。

謝辞

これはサッポロビームのもくもく会で作成したものです。Sapporo.BEAM のみなさまに感謝します。

参考文献


  1. ちょうどよい程度である様。 

kikuyuta
FA・PLC・リレーと言った制御のゾーンと、クラウド・インターネットなゾーンとを融合して、シームレスなIoT空間を構築できるようにしたいです。現在 Elixir 修行中。
kochi-ex
Elixir / Nerves / Phoenix / Erlang を中心とした研究・開発・勉強を楽しむ非営利の団体です。高知県を中心に活動してます。オフ会を不定期に開催しています。
https://kochi-ex.connpass.com/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away