テストコード
前記事ゼロコピーでプロセス間通信が可能なiceoryxの使い方:(その2)定周期通信では、publisherもsubscriberもそれぞれが周期的に発行・購読を行うだけで、同期はしていませんでした。Publish-Subscribeモデルはそもそも非同期通信を想定するものなので、本来の使われ方としてはこれで十分だとも言えますが、iceoryxは同期通信、つまりpublisherがトピック発行したタイミングでsubscriberに即座に購読させるための仕組みも提供しています。
本記事で示すコードは、本家に含まれるiceoryx_examples/callbacksを元にしています。
トピック定義用ヘッダファイルmytopic.h、publisherソースファイルsync_publisher.cpp、subscriberソースファイルsync_subscriber.cppの三つです。先に全体を示します。
mytopic.h
#ifndef MYTOPIC_H
#define MYTOPIC_H
struct MyTopic{
int data;
};
#define SERVICE_NAME "MyTopic"
#define SERVICE_INSTANCE1 "Instance1"
#define SERVICE_INSTANCE2 "Instance2"
#define SERVICE_EVENT "Event"
#endif // MYTOPIC_H
sync_publisher.cpp
#include <iceoryx_posh/popo/publisher.hpp>
#include <iceoryx_posh/runtime/posh_runtime.hpp>
#include <iox/signal_watcher.hpp>
#include <iostream>
#include "mytopic.h"
int main()
{
constexpr char app_name[] = "sync-publisher";
iox::runtime::PoshRuntime::initRuntime( app_name );
iox::popo::Publisher<MyTopic> publisher1( { SERVICE_NAME, SERVICE_INSTANCE1, SERVICE_EVENT } );
iox::popo::Publisher<MyTopic> publisher2( { SERVICE_NAME, SERVICE_INSTANCE2, SERVICE_EVENT } );
iox::popo::Publisher<MyTopic> *publisher[] = { &publisher1, &publisher2 };
while( !iox::hasTerminationRequested() ){
int instance_id;
std::cout << "Enter ID of service instance (1/2)> ";
std::cin >> instance_id;
instance_id = instance_id <= 1 ? 0 : 1;
MyTopic topic;
std::cout << "Enter a number> ";
std::cin >> topic.data;
std::cout << "sending " << topic.data << " to Instance " << instance_id + 1 << std::endl;
publisher[instance_id]->publishCopyOf( topic )
.or_else( [](auto){ std::cerr << " Failed." << std::endl; } );
}
return EXIT_SUCCESS;
}
sync_subscriber.cpp
#include <iceoryx_posh/popo/listener.hpp>
#include <iceoryx_posh/popo/subscriber.hpp>
#include <iceoryx_posh/popo/user_trigger.hpp>
#include <iceoryx_posh/runtime/posh_runtime.hpp>
#include <iox/optional.hpp>
#include <iox/signal_watcher.hpp>
#include <iostream>
#include "mytopic.h"
iox::optional<MyTopic> cache1, cache2;
void receive_sample(iox::popo::Subscriber<MyTopic> *subscriber)
{
while( subscriber->take().and_then( [subscriber](auto &sample){
iox::optional<MyTopic> *cache = NULL;
auto instance_name = subscriber->getServiceDescription().getInstanceIDString();
if( instance_name == iox::capro::IdString_t( SERVICE_INSTANCE1 ) ){
cache = &cache1;
} else
if( instance_name == iox::capro::IdString_t( SERVICE_INSTANCE2 ) ){
cache = &cache2;
}
if( cache ){
cache->emplace( *sample );
std::cout << instance_name << " received a sample " << sample->data << "." << std::endl;
}
} ) );
if( cache1 && cache2 ){
std::cout << "Sum of Samples is " << cache1->data << " + " << cache2->data
<< " = " << cache1->data + cache2->data << std::endl;
cache1.reset();
cache2.reset();
}
}
void abort_subscriber(int id)
{
std::cerr << "unable to attach subscriber " << id << "." << std::endl;
std::exit( EXIT_FAILURE );
}
int main()
{
constexpr char app_name[] = "sync-subscriber";
iox::runtime::PoshRuntime::initRuntime( app_name );
iox::popo::Subscriber<MyTopic> subscriber1( { SERVICE_NAME, SERVICE_INSTANCE1, SERVICE_EVENT } );
iox::popo::Subscriber<MyTopic> subscriber2( { SERVICE_NAME, SERVICE_INSTANCE2, SERVICE_EVENT } );
iox::popo::Listener listener;
listener
.attachEvent( subscriber1,
iox::popo::SubscriberEvent::DATA_RECEIVED,
iox::popo::createNotificationCallback( receive_sample ) )
.or_else( [](auto){ abort_subscriber( 1 ); } );
listener
.attachEvent( subscriber2,
iox::popo::SubscriberEvent::DATA_RECEIVED,
iox::popo::createNotificationCallback( receive_sample ) )
.or_else( [](auto){ abort_subscriber( 2 ); } );
iox::waitForTerminationRequest();
// termination
listener.detachEvent( subscriber1, iox::popo::SubscriberEvent::DATA_RECEIVED );
listener.detachEvent( subscriber2, iox::popo::SubscriberEvent::DATA_RECEIVED );
return EXIT_SUCCESS;
}
前回同様に、順を追って中身を説明していきます。
トピック定義用ヘッダファイルmytopic.h
今回のMyTopic
は凝ったことをしておらず、int型の値を一つ持つだけにしています。これ以上の説明は不要と思います。
通信プログラム内で二つのpublisher/subscriberを走らせるので、SERVICE_INSTANCE1
とSERVICE_INSTANCE2
の二つを定義しています。
#define SERVICE_NAME "MyTopic"
#define SERVICE_INSTANCE1 "Instance1"
#define SERVICE_INSTANCE2 "Instance2"
#define SERVICE_EVENT "Event"
publisherソースファイルsync_publisher.cpp
publisherプログラムです。
まず、例によって通信用ランタイムを初期化します。
constexpr char app_name[] = "sync-publisher";
iox::runtime::PoshRuntime::initRuntime( app_name );
publisherを二つ作り、さらにそれらへのポインタを配列にしておきます。
iox::popo::Publisher<MyTopic> publisher1( { SERVICE_NAME, SERVICE_INSTANCE1, SERVICE_EVENT } );
iox::popo::Publisher<MyTopic> publisher2( { SERVICE_NAME, SERVICE_INSTANCE2, SERVICE_EVENT } );
iox::popo::Publisher<MyTopic> *publisher[] = { &publisher1, &publisher2 };
ループの中身は対話的処理にしています。まず、二つあるpublisherのうちどちらにトピック発行させるかを選択します。
int instance_id;
std::cout << "Enter ID of service instance (1/2)> ";
std::cin >> instance_id;
instance_id = instance_id <= 1 ? 0 : 1;
最後の1行は単なる安全策です。
次にトピック内の値を入力させます。
MyTopic topic;
std::cout << "Enter a number> ";
std::cin >> topic.data;
発行はpublishCopyOf()
メソッドで行います。
publisher[instance_id]->publishCopyOf( topic )
.or_else( [](auto){ std::cerr << " Failed." << std::endl; } );
中身を分かっていませんが、メソッド名からはゼロコピーというライブラリの主旨に沿っていないのでは?という引っかかりを覚えます。まあ、気にしないことにしましょう。気にする場合は、前回用いたloan()
メソッドを使ってトピック取得すれば良いだろうとは思います。
ところで、例外処理に使われているメソッドor_else()
は筆者は初めてみました。C++23の機能のようですが、面白いですね。
subscriberソースファイルsync_subscriber.cpp
subscriberプログラムは、publisherに比べると少し長めです。
main()
関数から見ていきましょう。こちらも例によって通信用ランタイムを初期化します。
constexpr char app_name[] = "sync-subscriber";
iox::runtime::PoshRuntime::initRuntime( app_name );
subscriberも二つ作ります。
iox::popo::Subscriber<MyTopic> subscriber1( { SERVICE_NAME, SERVICE_INSTANCE1, SERVICE_EVENT } );
iox::popo::Subscriber<MyTopic> subscriber2( { SERVICE_NAME, SERVICE_INSTANCE2, SERVICE_EVENT } );
イベントを拾うlistenerインスタンスを作り、上記の二つのsubscriberをアタッチします。
iox::popo::Listener listener;
listener
.attachEvent( subscriber1,
iox::popo::SubscriberEvent::DATA_RECEIVED,
iox::popo::createNotificationCallback( receive_sample ) )
.or_else( [](auto){ abort_subscriber( 1 ); } );
listener
.attachEvent( subscriber2,
iox::popo::SubscriberEvent::DATA_RECEIVED,
iox::popo::createNotificationCallback( receive_sample ) )
.or_else( [](auto){ abort_subscriber( 2 ); } );
attachEvent()
はiox::popo::SubscriberEvent::DATA_RECEIVED
イベントに反応して、subscriber1
またはsubscriber1
を引数としたreceive_sample()
が呼ばれるようにしている、と読み取れます。receive_sample()
については後述します。例外処理はpublisherと同じくor_else()
を使っています。スマートですね。
listenerにイベントが割り当てられたので、あとはCtrl-Cを待つだけにします。
iox::waitForTerminationRequest();
Ctrl-Cが発行されたら、リソース回収して終了です。
listener.detachEvent( subscriber1, iox::popo::SubscriberEvent::DATA_RECEIVED );
listener.detachEvent( subscriber2, iox::popo::SubscriberEvent::DATA_RECEIVED );
receive_sample()
の中身に入りましょう。まずはMyTopic
を継承する形でcache1
、cache2
を用意します。
iox::optional<MyTopic> cache1, cache2;
subscriberのtake()
メソッドを使ってトピック取得します。
while( subscriber->take().and_then( [subscriber](auto &sample){
本プログラムでは、publisher/subscriberの判別はサービスインスタンスの文字列を用いてやることにしたのでした。getServiceDescription().getInstanceIDString()
メソッドでこれを取得し、どちらのsubscriberでトピックを読めば良いか決めます。文字列なので、if〜elseで選択するのが野暮ったいですが、仕方ありません。
iox::optional<MyTopic> *cache = NULL;
auto instance_name = subscriber->getServiceDescription().getInstanceIDString();
if( instance_name == iox::capro::IdString_t( SERVICE_INSTANCE1 ) ){
cache = &cache1;
} else
if( instance_name == iox::capro::IdString_t( SERVICE_INSTANCE2 ) ){
cache = &cache2;
}
どちらかのsubscriberが選ばれていれば、サンプルされたトピックをcache
に保持させます。これもコピーじゃないのか?と思わないでもないですが、気にしないことにします。
if( cache ){
cache->emplace( *sample );
std::cout << instance_name << " received a sample " << sample->data << "." << std::endl;
}
subscriber1
とsubscriber2
、二つ分のトピックが揃ったタイミングで、それぞれから取得した値の和を表示(深い意味はありません)した後にcache
をリセットします。
if( cache1 && cache2 ){
std::cout << "Sum of Samples is " << cache1->data << " + " << cache2->data
<< " = " << cache1->data + cache2->data << std::endl;
cache1.reset();
cache2.reset();
}
}
どちらかのsubscriberが続けて呼び出された場合は、cache
は最新のものに更新されます。
makefileのTARGET
を
TARGET=sync_publisher sync_subscriber
としてプログラムをコンパイルし、動作確認してみて下さい。