LoginSignup
0
0

ゼロコピーでプロセス間通信が可能なiceoryxの使い方:(その3)同期通信

Posted at

テストコード

前記事ゼロコピーでプロセス間通信が可能なiceoryxの使い方:(その2)定周期通信では、publisherもsubscriberもそれぞれが周期的に発行・購読を行うだけで、同期はしていませんでした。Publish-Subscribeモデルはそもそも非同期通信を想定するものなので、本来の使われ方としてはこれで十分だとも言えますが、iceoryxは同期通信、つまりpublisherがトピック発行したタイミングでsubscriberに即座に購読させるための仕組みも提供しています。

本記事で示すコードは、本家に含まれるiceoryx_examples/callbacksを元にしています。

トピック定義用ヘッダファイルmytopic.h、publisherソースファイルsync_publisher.cpp、subscriberソースファイルsync_subscriber.cppの三つです。先に全体を示します。

mytopic.h

#ifndef MYTOPIC_H
#define MYTOPIC_H

struct MyTopic{
  int data;
};

#define SERVICE_NAME      "MyTopic"
#define SERVICE_INSTANCE1 "Instance1"
#define SERVICE_INSTANCE2 "Instance2"
#define SERVICE_EVENT     "Event"

#endif // MYTOPIC_H

sync_publisher.cpp

#include <iceoryx_posh/popo/publisher.hpp>
#include <iceoryx_posh/runtime/posh_runtime.hpp>
#include <iox/signal_watcher.hpp>

#include <iostream>

#include "mytopic.h"

int main()
{
  constexpr char app_name[] = "sync-publisher";
  iox::runtime::PoshRuntime::initRuntime( app_name );

  iox::popo::Publisher<MyTopic> publisher1( { SERVICE_NAME, SERVICE_INSTANCE1, SERVICE_EVENT } );
  iox::popo::Publisher<MyTopic> publisher2( { SERVICE_NAME, SERVICE_INSTANCE2, SERVICE_EVENT } );
  iox::popo::Publisher<MyTopic> *publisher[] = { &publisher1, &publisher2 };

  while( !iox::hasTerminationRequested() ){
    int instance_id;
    std::cout << "Enter ID of service instance (1/2)> ";
    std::cin >> instance_id;
    instance_id = instance_id <= 1 ? 0 : 1;

    MyTopic topic;
    std::cout << "Enter a number> ";
    std::cin >> topic.data;

    std::cout << "sending " << topic.data << " to Instance " << instance_id + 1 << std::endl;
    publisher[instance_id]->publishCopyOf( topic )
      .or_else( [](auto){ std::cerr << " Failed." << std::endl; } );
  }
  return EXIT_SUCCESS;
}

sync_subscriber.cpp

#include <iceoryx_posh/popo/listener.hpp>
#include <iceoryx_posh/popo/subscriber.hpp>
#include <iceoryx_posh/popo/user_trigger.hpp>
#include <iceoryx_posh/runtime/posh_runtime.hpp>
#include <iox/optional.hpp>
#include <iox/signal_watcher.hpp>

#include <iostream>

#include "mytopic.h"

iox::optional<MyTopic> cache1, cache2;

void receive_sample(iox::popo::Subscriber<MyTopic> *subscriber)
{
  while( subscriber->take().and_then( [subscriber](auto &sample){
    iox::optional<MyTopic> *cache = NULL;
    auto instance_name = subscriber->getServiceDescription().getInstanceIDString();
    if( instance_name == iox::capro::IdString_t( SERVICE_INSTANCE1 ) ){
      cache = &cache1;
    } else
    if( instance_name == iox::capro::IdString_t( SERVICE_INSTANCE2 ) ){
      cache = &cache2;
    }
    if( cache ){
      cache->emplace( *sample );
      std::cout << instance_name << " received a sample " << sample->data << "." << std::endl;
    }
  } ) );
  if( cache1 && cache2 ){
    std::cout << "Sum of Samples is " << cache1->data << " + " << cache2->data
              << " = " << cache1->data + cache2->data << std::endl;
    cache1.reset();
    cache2.reset();
  }
}

void abort_subscriber(int id)
{
  std::cerr << "unable to attach subscriber " << id << "." << std::endl;
  std::exit( EXIT_FAILURE );
}

int main()
{
  constexpr char app_name[] = "sync-subscriber";
  iox::runtime::PoshRuntime::initRuntime( app_name );

  iox::popo::Subscriber<MyTopic> subscriber1( { SERVICE_NAME, SERVICE_INSTANCE1, SERVICE_EVENT } );
  iox::popo::Subscriber<MyTopic> subscriber2( { SERVICE_NAME, SERVICE_INSTANCE2, SERVICE_EVENT } );

  iox::popo::Listener listener;
  listener
    .attachEvent( subscriber1,
      iox::popo::SubscriberEvent::DATA_RECEIVED,
      iox::popo::createNotificationCallback( receive_sample ) )
    .or_else( [](auto){ abort_subscriber( 1 ); } );
  listener
    .attachEvent( subscriber2,
      iox::popo::SubscriberEvent::DATA_RECEIVED,
      iox::popo::createNotificationCallback( receive_sample ) )
    .or_else( [](auto){ abort_subscriber( 2 ); } );

  iox::waitForTerminationRequest();
  // termination
  listener.detachEvent( subscriber1, iox::popo::SubscriberEvent::DATA_RECEIVED );
  listener.detachEvent( subscriber2, iox::popo::SubscriberEvent::DATA_RECEIVED );
  return EXIT_SUCCESS;
}

前回同様に、順を追って中身を説明していきます。

トピック定義用ヘッダファイルmytopic.h

今回のMyTopicは凝ったことをしておらず、int型の値を一つ持つだけにしています。これ以上の説明は不要と思います。

通信プログラム内で二つのpublisher/subscriberを走らせるので、SERVICE_INSTANCE1SERVICE_INSTANCE2の二つを定義しています。

#define SERVICE_NAME      "MyTopic"
#define SERVICE_INSTANCE1 "Instance1"
#define SERVICE_INSTANCE2 "Instance2"
#define SERVICE_EVENT     "Event"

publisherソースファイルsync_publisher.cpp

publisherプログラムです。

まず、例によって通信用ランタイムを初期化します。

  constexpr char app_name[] = "sync-publisher";
  iox::runtime::PoshRuntime::initRuntime( app_name );

publisherを二つ作り、さらにそれらへのポインタを配列にしておきます。

  iox::popo::Publisher<MyTopic> publisher1( { SERVICE_NAME, SERVICE_INSTANCE1, SERVICE_EVENT } );
  iox::popo::Publisher<MyTopic> publisher2( { SERVICE_NAME, SERVICE_INSTANCE2, SERVICE_EVENT } );
  iox::popo::Publisher<MyTopic> *publisher[] = { &publisher1, &publisher2 };

ループの中身は対話的処理にしています。まず、二つあるpublisherのうちどちらにトピック発行させるかを選択します。

    int instance_id;
    std::cout << "Enter ID of service instance (1/2)> ";
    std::cin >> instance_id;
    instance_id = instance_id <= 1 ? 0 : 1;

最後の1行は単なる安全策です。

次にトピック内の値を入力させます。

    MyTopic topic;
    std::cout << "Enter a number> ";
    std::cin >> topic.data;

発行はpublishCopyOf()メソッドで行います。

    publisher[instance_id]->publishCopyOf( topic )
      .or_else( [](auto){ std::cerr << " Failed." << std::endl; } );

中身を分かっていませんが、メソッド名からはゼロコピーというライブラリの主旨に沿っていないのでは?という引っかかりを覚えます。まあ、気にしないことにしましょう。気にする場合は、前回用いたloan()メソッドを使ってトピック取得すれば良いだろうとは思います。

ところで、例外処理に使われているメソッドor_else()は筆者は初めてみました。C++23の機能のようですが、面白いですね。

subscriberソースファイルsync_subscriber.cpp

subscriberプログラムは、publisherに比べると少し長めです。

main()関数から見ていきましょう。こちらも例によって通信用ランタイムを初期化します。

  constexpr char app_name[] = "sync-subscriber";
  iox::runtime::PoshRuntime::initRuntime( app_name );

subscriberも二つ作ります。

  iox::popo::Subscriber<MyTopic> subscriber1( { SERVICE_NAME, SERVICE_INSTANCE1, SERVICE_EVENT } );
  iox::popo::Subscriber<MyTopic> subscriber2( { SERVICE_NAME, SERVICE_INSTANCE2, SERVICE_EVENT } );

イベントを拾うlistenerインスタンスを作り、上記の二つのsubscriberをアタッチします。

  iox::popo::Listener listener;
  listener
    .attachEvent( subscriber1,
      iox::popo::SubscriberEvent::DATA_RECEIVED,
      iox::popo::createNotificationCallback( receive_sample ) )
    .or_else( [](auto){ abort_subscriber( 1 ); } );
  listener
    .attachEvent( subscriber2,
      iox::popo::SubscriberEvent::DATA_RECEIVED,
      iox::popo::createNotificationCallback( receive_sample ) )
    .or_else( [](auto){ abort_subscriber( 2 ); } );

attachEvent()iox::popo::SubscriberEvent::DATA_RECEIVEDイベントに反応して、subscriber1またはsubscriber1を引数としたreceive_sample()が呼ばれるようにしている、と読み取れます。receive_sample()については後述します。例外処理はpublisherと同じくor_else()を使っています。スマートですね。

listenerにイベントが割り当てられたので、あとはCtrl-Cを待つだけにします。

  iox::waitForTerminationRequest();

Ctrl-Cが発行されたら、リソース回収して終了です。

  listener.detachEvent( subscriber1, iox::popo::SubscriberEvent::DATA_RECEIVED );
  listener.detachEvent( subscriber2, iox::popo::SubscriberEvent::DATA_RECEIVED );

receive_sample()の中身に入りましょう。まずはMyTopicを継承する形でcache1cache2を用意します。

iox::optional<MyTopic> cache1, cache2;

subscriberのtake()メソッドを使ってトピック取得します。

  while( subscriber->take().and_then( [subscriber](auto &sample){

本プログラムでは、publisher/subscriberの判別はサービスインスタンスの文字列を用いてやることにしたのでした。getServiceDescription().getInstanceIDString()メソッドでこれを取得し、どちらのsubscriberでトピックを読めば良いか決めます。文字列なので、if〜elseで選択するのが野暮ったいですが、仕方ありません。

    iox::optional<MyTopic> *cache = NULL;
    auto instance_name = subscriber->getServiceDescription().getInstanceIDString();
    if( instance_name == iox::capro::IdString_t( SERVICE_INSTANCE1 ) ){
      cache = &cache1;
    } else
    if( instance_name == iox::capro::IdString_t( SERVICE_INSTANCE2 ) ){
      cache = &cache2;
    }

どちらかのsubscriberが選ばれていれば、サンプルされたトピックをcacheに保持させます。これもコピーじゃないのか?と思わないでもないですが、気にしないことにします。

    if( cache ){
      cache->emplace( *sample );
      std::cout << instance_name << " received a sample " << sample->data << "." << std::endl;
    }

subscriber1subscriber2、二つ分のトピックが揃ったタイミングで、それぞれから取得した値の和を表示(深い意味はありません)した後にcacheをリセットします。

  if( cache1 && cache2 ){
    std::cout << "Sum of Samples is " << cache1->data << " + " << cache2->data
              << " = " << cache1->data + cache2->data << std::endl;
    cache1.reset();
    cache2.reset();
  }
}

どちらかのsubscriberが続けて呼び出された場合は、cacheは最新のものに更新されます。

makefileのTARGET

TARGET=sync_publisher sync_subscriber

としてプログラムをコンパイルし、動作確認してみて下さい。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0