LoginSignup
19
3

More than 1 year has passed since last update.

ElixirでROSっぽい通信を試す~SynでPub&Sub

Last updated at Posted at 2022-12-18

1.はじめに

Elixirの環境下で、ROS - (Robot Operating System) のようにPublisherとSubscriberを作って、ノード間(Elixirでいうプロセス間)通信する仕組みを試行してみました。

(1)トピックの発行と購読の考え方

トピックごとに発行&購読範囲を区切って、メッセージをやり取りします。

  • ノード「yu」は、2つのトピック「TopicA」「TopicB」に対してメッセージを発行します
  • ノード「pom」は、トピック「TopicA」を購読します。「TopicB」は受け取りません
  • ノード「setsu」は、トピック「TopicB」を購読します。「TopicA」は受け取りません
ノード名
(プロセス名)
TopicA TopicB
yu 発行 発行
pom 購読 ×
setsu × 購読

(2)メッセージの考え方

メッセージは、Elixirのdefstructを使って「MessageStruct」という構造体に複数の変数を格納して、やり取りします。
今回は下記の2つの変数です。

MessageStruct 変数名
(bitstring用) itemstring
(integer用) itemint

(3)Publish&Subscribeを提供するライブラリ

kikuyutaさまの記事から着想を得て、今回は「SYN」というライブラリを活用します。

SYNを使うときの考え方として、Publish&Subscribeのほかに、「Scope」という概念があります。
Publish&Subscribeしたいノードを、同じScopeに所属させることで、ノード間通信の範囲を限定することができます。
今回は「:sample」というScopeの中でやり取りをしています。

Elixirで、ROS2のトピックやメッセージのやり取りを実現するには、Rclexが大変便利ですので、ご参考ください。

2.環境

$ uname -a
Linux dicom-sv11 5.15.0-56-generic #62-Ubuntu SMP Tue Nov 22 19:54:14 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux

$ elixir --version
Erlang/OTP 25 [erts-13.0.4] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:1] [jit:ns]
Elixir 1.13.4 (compiled with Erlang/OTP 25)

3.ソースコード

(1)下準備

プロジェクトを作成

$ cd <your working directory>

# プロジェクトを作ります
$ mix new pubsub
$ cd pubsub

# 空のファイルを用意します(あとで中身を書きます)
$ touch lib/pub.ex lib/sub.ex lib/message_struct.ex

設定

ライブラリ「SYN」を登録します。

mix.exs
defmodule Pubsub.MixProject do
  use Mix.Project

  def project do
    [
      app: :pubsub,
      version: "0.1.0",
      elixir: "~> 1.13",
      start_permanent: Mix.env() == :prod,
      deps: deps()
    ]
  end

  # Run "mix help compile.app" to learn about applications.
  def application do
    [
      extra_applications: [:logger]
    ]
  end

  # Run "mix help deps" to learn about dependencies.
  defp deps do
    [
      # {:dep_from_hexpm, "~> 0.3.0"},
      # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
      # ↓追記
      {:syn, "~> 3.3"}
    ]
  end
end

(2)コード

メッセージを格納する構造体

lib/message_struct.ex
defmodule MessageStruct do
  @moduledoc """
  メッセージの構造体
  """
  defstruct [
    # 文字列
    itemstring: nil,
    # 数値
    iteminteger: nil
  ]
end

受信側

lib/sub.ex
defmodule Sub do
  @moduledoc """
  Subscribeサンプル
  """
  use GenServer
  require Logger

  ### 定数 ########################################

  # SYNのスコープを定義
  @syn_scope :sample
  # デフォルトで購読するトピックのリスト
  @sub_topics_list ["TopicA"]
  # ステート(サンプル中ではとくになし)
  @initial_state []

  ### クライアント側API / ヘルパー関数 #############

  @doc """
  起動
  """
  def start_link(opts \\ [topic: @sub_topics_list]) do
    GenServer.start_link(__MODULE__, {@initial_state, opts}, name: __MODULE__)
  end

  @doc """
  停止
  """
  def stop(reason \\ :normal, timeout \\ :infinity) do
    # 起動中であることを確認してから、呼び出し
    case GenServer.whereis(__MODULE__) do
      nil -> Logger.warning("not started")
      _ -> GenServer.stop(__MODULE__, reason, timeout)
    end
  end

  ### GenServer API #############################

  @doc """
  GenServer API・初期化
  """
  @impl GenServer
  def init({state, opts}) do
    # optに必須のキーが含まれているか確認
    case Keyword.has_key?(opts, :topic) do
      true ->
        # 必須のキーが含まれていたら
        # スコープの登録
        :ok = :syn.add_node_to_scopes([@syn_scope])

        # synのSub登録
        sub_topics_list = opts[:topic]

        Enum.map(sub_topics_list, &:syn.join(@syn_scope, &1, self()))
        # 登録結果を表示
        |> (&Logger.info("syn Subscribe init: #{inspect(sub_topics_list)} -> #{inspect(&1)}")).()

        {:ok, state}

      _ ->
        # キーが含まれてなければ起動しない
        Logger.error("Required key not found for argument, [topic: [\"topicname\", ...]]")
        :ignore
    end
  end

  @doc """
  GenServer API・停止
  """
  @impl GenServer
  def terminate(reason, _gpioref) do
    # synのSub破棄
    Enum.map(@sub_topics_list, &:syn.leave(@syn_scope, &1, self()))
    # 結果表示
    |> (&Logger.info("syn Subscribe term: #{inspect(@sub_topics_list)} -> #{inspect(&1)}")).()

    reason
  end

  @doc """
  GenServer.handle_info/2コールバック
  """
  @impl GenServer
  def handle_info({:hoge, _, message}, state) when is_bitstring(message) do
    # トピックを受信したとき
    # dest = :hoge & 文字列型
    Logger.debug("syn Subscribe: [:hoge / bitstring] #{inspect(message)}")
    {:noreply, state}
  end

  @impl GenServer
  def handle_info({:fuga, _, message}, state) when is_integer(message) do
    # トピックを受信したとき
    # dest = :fuga & 数値型
    Logger.debug("syn Subscribe: [:fuga / integer] #{inspect(message)}")
    {:noreply, state}
  end

  @impl GenServer
  def handle_info({_, _, message}, state) when is_struct(message, MessageStruct) do
    # トピックを受信したとき
    # MessageStruct型
    Logger.debug(
      "syn Subscribe: [MessageStruct] str: #{inspect(message.itemstring)}, int #{inspect(message.iteminteger)}"
    )

    {:noreply, state}
  end

  @impl GenServer
  def handle_info(dest, state) do
    # トピックを受信したとき
    # 前述のhandle_infoに引っかからなかった
    Logger.warn("syn Subscribe: not implemented... #{inspect(dest)}")
    {:noreply, state}
  end
end

送信側

lib/pub.ex
defmodule Pub do
  @moduledoc """
  Publishサンプル
  """
  require Logger

  ### 定数 ########################################

  # SYNのスコープを定義
  @syn_scope :sample
  # ppidごとにつける一意な名前
  @syn_registor_name :pulishsample
  # 発行するトピックのリスト
  @pub_topics_list ["TopicA", "TopicB"]

  ### 関数 ########################################

  @doc """
  発行

  ## Parameters
    - message : 送信データ
    - dest    : Sub側のhandle_infoでマッチさせるdestワード

  ## Examples
    iex>
  """
  def publish(dest, message) when is_atom(dest) do
    # 初期化、発行、解放まで
    with :ok <- initpub(),
         :ok <- pub(dest, message),
         :ok <- termpub(),
         do: :ok
  end

  def publish(arg1, arg2)
      when is_bitstring(arg1) and is_integer(arg2) do
    # 初期化、発行、解放まで
    with :ok <- initpub(),
         :ok <- pub(:struct, %MessageStruct{itemstring: arg1, iteminteger: arg2}),
         :ok <- termpub(),
         do: :ok
  end

  defp initpub() do
    # スコープの登録
    :ok = :syn.add_node_to_scopes([@syn_scope])
    # synのPub登録
    case :syn.register(@syn_scope, @syn_registor_name, self(), @pub_topics_list) do
      :ok ->
        :ok

      {_, term} ->
        # 異常の時
        Logger.error(inspect(term))
        :error
    end
  end

  defp pub(dest, message) when is_atom(dest) do
    # 先に登録した@syn_registor_nameに含まれるトピック一覧を取得
    case :syn.lookup(@syn_scope, @syn_registor_name) do
      {_pid, topics_list} ->
        Enum.map(
          # トピック一覧の順に、各トピックごとにPublish
          topics_list,
          &:syn.publish(@syn_scope, &1, {dest, @syn_registor_name, message})
        )

        Logger.info(
          "syn Publish: #{inspect(topics_list)}, #{inspect(dest)} / #{inspect(message)}"
        )

        # 返り値
        :ok

      ret ->
        # 異常の時
        Logger.error(inspect(ret))
        :noop
    end
  end

  defp termpub() do
    # synのPub破棄
    case :syn.unregister(@syn_scope, @syn_registor_name) do
      :ok ->
        :ok

      {_, term} ->
        # 異常の時
        Logger.error(inspect(term))
        :error
    end
  end
end

4.実行結果

以下のサンプルは、一つの同じホストの中で、複数のターミナルを立ち上げて実験しました。

ターミナルを3つ立ち上げて、それぞれで下記の操作をします。

ターミナル1:受信側その1

# 「pom」という名前でiexを起動
$ iex --sname pom --cookie odaiba -S mix

# 受信用のGenServerを起動して待機
iex> Sub.start_link()

# ターミナル3:送信側で Node.connect([:"pom@nijigaku", :"setsu@nijigaku"]) まで進める

# ノードの接続状況を確認
iex> Node.list
[:"yu@nijigaku", :"setsu@nijigaku"]

# メッセージを受け取ると、下記が表示される
22:10:06.175 [debug] syn Subscribe: [MessageStruct] str: "kawaiiyo", int 301

# 受信用のGenServerを止める
iex> Sub.stop()

# [Ctrl-\]でiex終了

ターミナル2:受信側その2

# 「setsu」という名前でiexを起動
$ iex --sname setsu --cookie odaiba -S mix

# 受信用のGenServerを起動して待機
iex> Sub.start_link()

# ターミナル3:送信側で Node.connect([:"pom@nijigaku", :"setsu@nijigaku"]) まで進める

# ノードの接続状況を確認
iex> Node.list
[:"yu@nijigaku", :"pom@nijigaku"]

# メッセージを受け取ると、下記が表示される
22:10:06.175 [debug] syn Subscribe: [MessageStruct] str: "kawaiiyo", int 301

# 受信用のGenServerを止める
iex> Sub.stop()

# [Ctrl-\]でiex終了

ターミナル3:送信側

# 「yu」という名前でiexを起動
$ iex --sname yu --cookie odaiba -S mix

# ノード「pom」と「setsu」に接続
iex> Node.connect([:"pom@nijigaku", :"setsu@nijigaku"])
true

# ノードの接続状況を確認
iex> Node.list
[:"pom@nijigaku", :"setsu@nijigaku"]

# メッセージを送信
iex> Pub.publish("kawaiiyo", 301)
22:10:06.175 [info]  syn Publish: [["TopicA", "TopicB"]], :struct / %MessageStruct{iteminteger: 301, itemstring: "kawaii"}
:ok

# [Ctrl-\]でiex終了

iexを起動するときに、各iexのノード名と、ノード間通信に必要な合言葉も併せて指定することがポイントです。

$ --sname <node name> --cookie <key word> -S mix

異なったホスト間で通信する

別のホストの間で通信する場合は、IPアドレスの指定も必要になります。
先ほどの3つのコンソールでの実行例を、以下のように読み替えます。

ホストのIPアドレスの例 ノード名 発行 購読 iexの起動 Node.connectの引数
192.168.3.2 yu $ iex --name yu@192.168.3.2 --cookie odaiba -S mix Node.connect([:"pom@192.168.3.1", :"setsu@192.168.3.3"])
192.168.3.1 pom $ iex --name pom@192.168.3.1 --cookie odaiba -S mix -
192.168.3.3 setsu $ iex --name setsu@192.168.3.3 --cookie odaiba -S mix -

ポイント

  • iexコマンドの--snameのところが、--name <node name>@<ip address>になります
  • Node.connectするときのリストの内容の書き方が、:"<node name>@<ip address>"になります。
    • セミコロンをつけることで、IPアドレスの区切りのピリオドを、アトムとして渡せるようになります

5.まとめ

ROSのノード間通信に使われる、Publish&Subscribeの仕組みは大変便利です。
Elixirの環境下でも、同じようにPublisherとSubscriberを作って、ノード間通信をすることができました。
これを活用する場面は限定的かとは思いますが、ご参考になる方があれば幸いです。

6.参考資料

19
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
19
3