1.はじめに
Elixirの環境下で、ROS - (Robot Operating System) のようにPublisherとSubscriberを作って、ノード間(Elixirでいうプロセス間)通信する仕組みを試行してみました。
- ROSにおける概念
- 本記事ではこんな感じに置き換えて考える↓
(1)トピックの発行と購読の考え方
トピックごとに発行&購読範囲を区切って、メッセージをやり取りします。
- ノード「yu」は、2つのトピック「TopicA」「TopicB」に対してメッセージを発行します
- ノード「pom」は、トピック「TopicA」を購読します。「TopicB」は受け取りません
- ノード「setsu」は、トピック「TopicB」を購読します。「TopicA」は受け取りません
ノード名 (プロセス名) |
TopicA | TopicB |
---|---|---|
yu | 発行 | 発行 |
pom | 購読 | × |
setsu | × | 購読 |
(2)メッセージの考え方
メッセージは、Elixirのdefstruct
を使って「MessageStruct」という構造体に複数の変数を格納して、やり取りします。
今回は下記の2つの変数です。
MessageStruct | 変数名 |
---|---|
(bitstring用) | itemstring |
(integer用) | itemint |
(3)Publish&Subscribeを提供するライブラリ
kikuyutaさまの記事から着想を得て、今回は「SYN」というライブラリを活用します。
SYNを使うときの考え方として、Publish&Subscribeのほかに、「Scope」という概念があります。
Publish&Subscribeしたいノードを、同じScopeに所属させることで、ノード間通信の範囲を限定することができます。
今回は「:sample」というScopeの中でやり取りをしています。
Elixirで、ROS2のトピックやメッセージのやり取りを実現するには、Rclexが大変便利ですので、ご参考ください。
2.環境
$ uname -a
Linux dicom-sv11 5.15.0-56-generic #62-Ubuntu SMP Tue Nov 22 19:54:14 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux
$ elixir --version
Erlang/OTP 25 [erts-13.0.4] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:1] [jit:ns]
Elixir 1.13.4 (compiled with Erlang/OTP 25)
3.ソースコード
(1)下準備
プロジェクトを作成
$ cd <your working directory>
# プロジェクトを作ります
$ mix new pubsub
$ cd pubsub
# 空のファイルを用意します(あとで中身を書きます)
$ touch lib/pub.ex lib/sub.ex lib/message_struct.ex
設定
ライブラリ「SYN」を登録します。
defmodule Pubsub.MixProject do
use Mix.Project
def project do
[
app: :pubsub,
version: "0.1.0",
elixir: "~> 1.13",
start_permanent: Mix.env() == :prod,
deps: deps()
]
end
# Run "mix help compile.app" to learn about applications.
def application do
[
extra_applications: [:logger]
]
end
# Run "mix help deps" to learn about dependencies.
defp deps do
[
# {:dep_from_hexpm, "~> 0.3.0"},
# {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
# ↓追記
{:syn, "~> 3.3"}
]
end
end
(2)コード
メッセージを格納する構造体
defmodule MessageStruct do
@moduledoc """
メッセージの構造体
"""
defstruct [
# 文字列
itemstring: nil,
# 数値
iteminteger: nil
]
end
受信側
defmodule Sub do
@moduledoc """
Subscribeサンプル
"""
use GenServer
require Logger
### 定数 ########################################
# SYNのスコープを定義
@syn_scope :sample
# デフォルトで購読するトピックのリスト
@sub_topics_list ["TopicA"]
# ステート(サンプル中ではとくになし)
@initial_state []
### クライアント側API / ヘルパー関数 #############
@doc """
起動
"""
def start_link(opts \\ [topic: @sub_topics_list]) do
GenServer.start_link(__MODULE__, {@initial_state, opts}, name: __MODULE__)
end
@doc """
停止
"""
def stop(reason \\ :normal, timeout \\ :infinity) do
# 起動中であることを確認してから、呼び出し
case GenServer.whereis(__MODULE__) do
nil -> Logger.warning("not started")
_ -> GenServer.stop(__MODULE__, reason, timeout)
end
end
### GenServer API #############################
@doc """
GenServer API・初期化
"""
@impl GenServer
def init({state, opts}) do
# optに必須のキーが含まれているか確認
case Keyword.has_key?(opts, :topic) do
true ->
# 必須のキーが含まれていたら
# スコープの登録
:ok = :syn.add_node_to_scopes([@syn_scope])
# synのSub登録
sub_topics_list = opts[:topic]
Enum.map(sub_topics_list, &:syn.join(@syn_scope, &1, self()))
# 登録結果を表示
|> (&Logger.info("syn Subscribe init: #{inspect(sub_topics_list)} -> #{inspect(&1)}")).()
{:ok, state}
_ ->
# キーが含まれてなければ起動しない
Logger.error("Required key not found for argument, [topic: [\"topicname\", ...]]")
:ignore
end
end
@doc """
GenServer API・停止
"""
@impl GenServer
def terminate(reason, _gpioref) do
# synのSub破棄
Enum.map(@sub_topics_list, &:syn.leave(@syn_scope, &1, self()))
# 結果表示
|> (&Logger.info("syn Subscribe term: #{inspect(@sub_topics_list)} -> #{inspect(&1)}")).()
reason
end
@doc """
GenServer.handle_info/2コールバック
"""
@impl GenServer
def handle_info({:hoge, _, message}, state) when is_bitstring(message) do
# トピックを受信したとき
# dest = :hoge & 文字列型
Logger.debug("syn Subscribe: [:hoge / bitstring] #{inspect(message)}")
{:noreply, state}
end
@impl GenServer
def handle_info({:fuga, _, message}, state) when is_integer(message) do
# トピックを受信したとき
# dest = :fuga & 数値型
Logger.debug("syn Subscribe: [:fuga / integer] #{inspect(message)}")
{:noreply, state}
end
@impl GenServer
def handle_info({_, _, message}, state) when is_struct(message, MessageStruct) do
# トピックを受信したとき
# MessageStruct型
Logger.debug(
"syn Subscribe: [MessageStruct] str: #{inspect(message.itemstring)}, int #{inspect(message.iteminteger)}"
)
{:noreply, state}
end
@impl GenServer
def handle_info(dest, state) do
# トピックを受信したとき
# 前述のhandle_infoに引っかからなかった
Logger.warn("syn Subscribe: not implemented... #{inspect(dest)}")
{:noreply, state}
end
end
送信側
defmodule Pub do
@moduledoc """
Publishサンプル
"""
require Logger
### 定数 ########################################
# SYNのスコープを定義
@syn_scope :sample
# ppidごとにつける一意な名前
@syn_registor_name :pulishsample
# 発行するトピックのリスト
@pub_topics_list ["TopicA", "TopicB"]
### 関数 ########################################
@doc """
発行
## Parameters
- message : 送信データ
- dest : Sub側のhandle_infoでマッチさせるdestワード
## Examples
iex>
"""
def publish(dest, message) when is_atom(dest) do
# 初期化、発行、解放まで
with :ok <- initpub(),
:ok <- pub(dest, message),
:ok <- termpub(),
do: :ok
end
def publish(arg1, arg2)
when is_bitstring(arg1) and is_integer(arg2) do
# 初期化、発行、解放まで
with :ok <- initpub(),
:ok <- pub(:struct, %MessageStruct{itemstring: arg1, iteminteger: arg2}),
:ok <- termpub(),
do: :ok
end
defp initpub() do
# スコープの登録
:ok = :syn.add_node_to_scopes([@syn_scope])
# synのPub登録
case :syn.register(@syn_scope, @syn_registor_name, self(), @pub_topics_list) do
:ok ->
:ok
{_, term} ->
# 異常の時
Logger.error(inspect(term))
:error
end
end
defp pub(dest, message) when is_atom(dest) do
# 先に登録した@syn_registor_nameに含まれるトピック一覧を取得
case :syn.lookup(@syn_scope, @syn_registor_name) do
{_pid, topics_list} ->
Enum.map(
# トピック一覧の順に、各トピックごとにPublish
topics_list,
&:syn.publish(@syn_scope, &1, {dest, @syn_registor_name, message})
)
Logger.info(
"syn Publish: #{inspect(topics_list)}, #{inspect(dest)} / #{inspect(message)}"
)
# 返り値
:ok
ret ->
# 異常の時
Logger.error(inspect(ret))
:noop
end
end
defp termpub() do
# synのPub破棄
case :syn.unregister(@syn_scope, @syn_registor_name) do
:ok ->
:ok
{_, term} ->
# 異常の時
Logger.error(inspect(term))
:error
end
end
end
4.実行結果
以下のサンプルは、一つの同じホストの中で、複数のターミナルを立ち上げて実験しました。
ターミナルを3つ立ち上げて、それぞれで下記の操作をします。
ターミナル1:受信側その1
# 「pom」という名前でiexを起動
$ iex --sname pom --cookie odaiba -S mix
# 受信用のGenServerを起動して待機
iex> Sub.start_link()
# ターミナル3:送信側で Node.connect([:"pom@nijigaku", :"setsu@nijigaku"]) まで進める
# ノードの接続状況を確認
iex> Node.list
[:"yu@nijigaku", :"setsu@nijigaku"]
# メッセージを受け取ると、下記が表示される
22:10:06.175 [debug] syn Subscribe: [MessageStruct] str: "kawaiiyo", int 301
# 受信用のGenServerを止める
iex> Sub.stop()
# [Ctrl-\]でiex終了
ターミナル2:受信側その2
# 「setsu」という名前でiexを起動
$ iex --sname setsu --cookie odaiba -S mix
# 受信用のGenServerを起動して待機
iex> Sub.start_link()
# ターミナル3:送信側で Node.connect([:"pom@nijigaku", :"setsu@nijigaku"]) まで進める
# ノードの接続状況を確認
iex> Node.list
[:"yu@nijigaku", :"pom@nijigaku"]
# メッセージを受け取ると、下記が表示される
22:10:06.175 [debug] syn Subscribe: [MessageStruct] str: "kawaiiyo", int 301
# 受信用のGenServerを止める
iex> Sub.stop()
# [Ctrl-\]でiex終了
ターミナル3:送信側
# 「yu」という名前でiexを起動
$ iex --sname yu --cookie odaiba -S mix
# ノード「pom」と「setsu」に接続
iex> Node.connect([:"pom@nijigaku", :"setsu@nijigaku"])
true
# ノードの接続状況を確認
iex> Node.list
[:"pom@nijigaku", :"setsu@nijigaku"]
# メッセージを送信
iex> Pub.publish("kawaiiyo", 301)
22:10:06.175 [info] syn Publish: [["TopicA", "TopicB"]], :struct / %MessageStruct{iteminteger: 301, itemstring: "kawaii"}
:ok
# [Ctrl-\]でiex終了
iex
を起動するときに、各iexのノード名と、ノード間通信に必要な合言葉も併せて指定することがポイントです。
$ --sname <node name> --cookie <key word> -S mix
異なったホスト間で通信する
別のホストの間で通信する場合は、IPアドレスの指定も必要になります。
先ほどの3つのコンソールでの実行例を、以下のように読み替えます。
ホストのIPアドレスの例 | ノード名 | 発行 | 購読 | iexの起動 | Node.connectの引数 |
---|---|---|---|---|---|
192.168.3.2 | yu | ○ | $ iex --name yu@192.168.3.2 --cookie odaiba -S mix |
Node.connect([:"pom@192.168.3.1", :"setsu@192.168.3.3"]) |
|
192.168.3.1 | pom | ○ | $ iex --name pom@192.168.3.1 --cookie odaiba -S mix |
- | |
192.168.3.3 | setsu | ○ | $ iex --name setsu@192.168.3.3 --cookie odaiba -S mix |
- |
ポイント
-
iex
コマンドの--sname
のところが、--name <node name>@<ip address>
になります -
Node.connect
するときのリストの内容の書き方が、:"<node name>@<ip address>"
になります。- セミコロンをつけることで、IPアドレスの区切りのピリオドを、アトムとして渡せるようになります
5.まとめ
ROSのノード間通信に使われる、Publish&Subscribeの仕組みは大変便利です。
Elixirの環境下でも、同じようにPublisherとSubscriberを作って、ノード間通信をすることができました。
これを活用する場面は限定的かとは思いますが、ご参考になる方があれば幸いです。
6.参考資料
- SYNを使った通信
- ROS2における通信
- ROS2とElixirの連携