LoginSignup
0
0

More than 1 year has passed since last update.

AGL の can-low-level を読んでみる②(ソケット通信 / subscribe)

Last updated at Posted at 2023-03-21

AGL とは?

Automotive Grade Linux (以下、AGL) という コネクテッドカーに活用するオープンプラットフォームを開発する OSSのプロジェクト。
コネクテッドカーに関する 知見がつまっています。

can-low-level とは?

私の記事

を参考にしてもらえれば分かりますが、CAN 通信を、コマンドで解析できるようにするツール(リンクはこちら)になります。
前回は、この can-low-level のデコード・エンコードを見てみましたので、今回は そのソケット通信の部分を読んでみたいと思います。

概要

前回は Can のフレームからどのように車の情報を取得する方法(デコード)と、車の情報から Can のフレームを作る方法(エンコード)を見てみました。
今回は 実際に Can の情報が取得されて、デコードされる様子を見てみます。

では始めましょう!
※ 私自身も C, C++ 初心者なので、不明瞭な部分がまだあります。間違っている部分があったら、ご指摘ください。

low-can コマンドが叩かれてから、preinit されるまで

まず、can-low-level では 下のようなコマンドが叩いて、該当の Can メッセージを受信するのですが、

low-can subscribe { "event": "doors.driver.open" }

その定義は、low-can-apidef.json から afb が自動生成する hファイル low-can-apidef.h に書いてあります。

low-can-apidef.h
 static const struct afb_verb_v2 _afb_verbs_v2_low_can[] = {
     {
         .verb = "subscribe",
         .callback = subscribe,
         .auth = NULL,
         .info = "Subscribe to CAN signals events",
         .session = AFB_SESSION_NONE_V2
     },
     {
         .verb = "unsubscribe",
         .callback = unsubscribe,
         .auth = NULL,
         .info = "Unsubscribe previously suscribed signals.",
         .session = AFB_SESSION_NONE_V2
     },
// 以下 省略

ここでは subscribe コマンドを見ていますが、他にも unsubcribe , get , list , auth , write コマンドがあります。それぞれの内容の定義は low-can-cb.cpp にあります。
ですが ここで定義された subscirbe などを実行する前には 初期化作業を行います。それが [low-can-apidef.h] の preinit 部分 と init の部分 です。

low-can-apidef.h
 const struct afb_binding_v2 afbBindingV2 = {
     .api = "low-can",
     .specification = _afb_description_v2_low_can,
     .info = "",
     .verbs = _afb_verbs_v2_low_can,
+    .preinit = NULL,
+    .init = initv2,
     .onevent = NULL,
     .noconcurrency = 0
 };

このファイルでは preinit は 何も呼ばれず、init では initv2 が呼ばれますが、master ブランチでは low-can-apidef.json ファイルの /info/x-binding-c-generator/preinitload_config/info/x-binding-c-generator/initinit_binding が呼ばれます。

この load_config が 呼ばれる時に、おそらく (すみません確証がないのですが...) example配下のc++ファイル が呼ばれ、singleton の application_tadd_message_set で デコードするための車のデータが登録されます(「0x21 番地 から bit_position 52 の bit_length 1 の場所に steering_wheel.cruise.cancel の boolean 情報を登録してある」など)。
この add_message_set で登録された application_t の message_set_ を使って socketCan から受信した情報をデコードします。

ここまでをまとめると、

low-can subscribe { "event": "doors.driver.open" }

を叩くと、preinitで登録された関数initで登録された関数subscribe の順番で処理が走るのですが、まずは preinit の部分で Can メッセージを各車の情報にデコードするための情報が登録されます。

次に、init_binding の部分を読んでみましょう。

init_binding

この init_binding (コードはこちら) では、主に start_threads (コードはこちらこちら) という関数を呼び出していて、それ以後の処理は 最初にコマンドを叩いた段階ではあまり関係ありません。

can-bus.cpp
void can_bus_t::start_threads()
{
	is_decoding_ = true;
+	th_decoding_ = std::thread(&can_bus_t::can_decode_message, this);
	th_decoding_.detach();

	is_pushing_ = true;
+	th_pushing_ = std::thread(&can_bus_t::can_event_push, this);
	th_pushing_.detach();
}

ここでは、condition_variable で wait される can_decode_messagecan_event_push の 2つの関数が呼ばれます。この can_decode_message の wait は、後ほど出てきますが low-can-hat.cpp (コードはここ) で notify_one され、この can_decode_message の中で can_event_pushnotify_one されます。

ここで出てきた can_decode_messagecan_event_pushwait は後ほど出るので忘れないようにしましょう。
次に subscribe を見てみます。

subscribe

subscribe の内容 を見てみると、do_subscribe_unsubscribe をしているだけですね。この do_subscribe_unsubscribe を見てみましょう。

まず do_subscribe_unsubscribe を見ると、request が配列かそうでないかで分岐していますが、ここでは簡略化のために配列でないと仮定して見てみます。
この request は

low-can subscribe { "event": "doors.driver.open" }

の event 部分になります。
ここも読んだら process_one_subscribe_args を呼び出しているだけですね (コードはここ)。

この process_one_subscribe_args も event を取り出して、one_subscribe_unsubscribe_events を呼び出しているだけに見えますね (コードはここ)。

one_subscribe_unsubscribe_events では singleton の signals_manager_t から、 application_t::instance().get_all_signals() の 先ほどの _message_set (車の情報をデコードするための情報) から該当する デコードするための情報を取得する find_signals を呼び出しています。

今回の low-can subscribe { "event": "doors.driver.open" } のコマンドで言えば、doors.driver.opensignals.json に登録されているので、それが sf に入る形になります。
ここで出てくる event_filter は frequency や min (デコードされた値がそれ以上でないとフィルターする値) max (デコードされた値がそれ以下でないとフィルターする値) など、値をデコードされた値をフィルターする機能もありますが、今回の low-can subscribe { "event": "doors.driver.open" } では取り扱っていないので一旦無視します。

なので、次の関数は subscribe_unsubscribe_signals (リンクはここ) になります。

low-can-cb.cpp
static int subscribe_unsubscribe_signals(afb_req_t request,
					 bool subscribe,
					 const struct utils::signals_found& signals,
					 struct event_filter_t& event_filter)
{
	int rets = 0;
	utils::signals_manager_t& sm = utils::signals_manager_t::instance();

	std::lock_guard<std::mutex> subscribed_signals_lock(sm.get_subscribed_signals_mutex());
	map_subscription& s = sm.get_subscribed_signals();

+	rets += subscribe_unsubscribe_diagnostic_messages(request, subscribe, signals.diagnostic_messages, event_filter, s, false);
+	rets += subscribe_unsubscribe_signals(request, subscribe, signals.signals, event_filter, s);

	return rets;
}

ここでは主に subscribe_unsubscribe_diagnostic_messagessubscribe_unsubscribe_signals の2つの関数を呼び出しています。
が、今回は subscribe_unsubscribe_diagnostic_messages を次回の説明に回して、subscribe_unsubscribe_signals (コードはここ) を説明したいと思います。

まず、map_subscriptions ですが、初期値には何も入っていないので、it はあまり見なくても大丈夫です。
初期値なので、呼び出されるのが create_rx_filteradd_to_event_loop の2つの関数になります。

low-can-cb.cpp
			can_subscription = std::make_shared<low_can_subscription_t>(low_can_subscription_t(event_filter));
+			if(can_subscription->create_rx_filter(sig) < 0)
				return -1;
+			if(add_to_event_loop(can_subscription) < 0)
				return -1;

この、create_rx_filtersocketCan からのデータを sendTo で 送信し、add_to_event_loop のイベントループの中で recvFrom を使って受信を行います。

sendTo と recvFrom の使い方は、

などを参考にしてみてください。

では まず、create_rx_filter の中身を見てみましょう!

create_rx_filter

low-can-subscription にある create_rx_filter を見ると (コードはこちら)、まずは low_can_subscription_t::create_rx_filter_can(*this, sig); を呼び出しているだけですね (j1939, isotp は今回ははしょるので)。

そうすると、create_rx_filter_can に前回出てきた encoder_t::encode_data が出てきていますね(コードはこちら)。

low-can-subscription.cpp
	encoder_t::encode_data(subscription.signal_, data, true, false, true);
// 省略
	cm = can_message_t( max_dlen,
			    sig->get_message()->get_id(),
			    length_msg,
			    false,
			    sig->get_message()->get_flags(),
			    data,
			    0);

ここで、subscription.signal_ のデータを使ってエンコードした結果を data に挿入します。
その後のコードは bcm( The Broadcast Manager, 詳細はこちら)という Can フレームの送信する機能 と 受信した Can フレームの変化を Can ID から認識する機能を使うためのコードになっています。

low-can-subscription.cpp
	struct bcm_msg bcm_msg = subscription.make_bcm_head(RX_SETUP, subscription.signal_->get_message()->get_id(), flags_bcm, timeout, freq);

	std::vector<canfd_frame> cfd_vect = cm.convert_to_canfd_frame_vector();

	if(cfd_vect.size() > 1) //multi
	{
		AFB_ERROR("Not implemented yet");
		return -1;
	}
	else if(cfd_vect.size() == 1)
	{
		subscription.add_one_bcm_frame(cfd_vect[0], bcm_msg);
	}
	else
	{
		AFB_ERROR("No data available");
		return -1;
	}

	return create_rx_filter_bcm(subscription, bcm_msg);

ここでの bcm_msgadd_one_bcm_frame で、データを受け渡しています。
では、ここの最後に出てきた create_rx_filter_bcm を見てみましょう。

create_rx_filter_bcmここ にあって、ここで bcm を通じて socketCan 通信をしていきます。
まず、

low-can-subscription.cpp
	// Make sure that socket is opened.
	if(subscription.open_socket(subscription, "", CAN_PROTOCOL) < 0)
		return -1;

で、ソケット通信を始めています。ここの中身を見てみましょう。
open_socket を見ると、

low-can-subscription.cpp
	int ret = -1;
	if(! subscription.socket_)
	{
		if(flags & CAN_PROTOCOL)
		{
+			subscription.socket_ = std::make_shared<utils::socketcan_bcm_t>();
			if( subscription.signal_ )
+				ret = subscription.socket_->open(subscription.signal_->get_message()->get_bus_device_name());
			else if(! subscription.diagnostic_message_.empty())
				ret = subscription.socket_->open(application_t::instance().get_diagnostic_manager().get_bus_device_name());
			else if(! bus_name.empty())
				ret = subscription.socket_->open(bus_name);

			subscription.index_ = (int)subscription.socket_->socket();
		}

subscription.signal_ はあるので、socketcan_bcm_topen(device_name) が呼び出されます(コードはこちら)。

open(device_name) の内容は、下のようになっています。

socketcan-bcm.cpp
	int socketcan_bcm_t::open(std::string device_name)
	{
		close();

		struct ifreq ifr;
+		socket_ = socketcan_t::open(PF_CAN, SOCK_DGRAM, CAN_BCM);

		// Attempts to open a socket to CAN bus
+		tx_address_.can_family = AF_CAN;
+		::strcpy(ifr.ifr_name, device_name.c_str());
		AFB_DEBUG("BCM socket ifr_name is : %s", ifr.ifr_name);
+		if(::ioctl(socket_, SIOCGIFINDEX, &ifr) < 0)
		{
			AFB_ERROR("ioctl failed. Error was : %s", strerror(errno));
			close();
			return -1;
		}

+		tx_address_.can_ifindex = ifr.ifr_ifindex;
+		if(connect((struct sockaddr *)&tx_address_, sizeof(tx_address_)) < 0)
		{
			AFB_ERROR("Connect failed. %s", strerror(errno));
			close();
			return -1;
		}
		// Needed because of using systemD event loop. See sd_event_add_io manual.
+		fcntl(socketcan_t::socket_, F_SETFL, O_NONBLOCK);

		return socket_;
	}

socketcan::open(コードはこちら)を見れば分かるんですが、ここらへんで Can の ソケット通信をしています。
PF_CANAF_CAN のことで、socket.h で アドレスが振られています(こちらのコードを参照)。
fcntl でファイルディスクリプターを作っているのは、sd_event_add_io というイベントループを呼び出す関数で必要になるからです。

open_socket に話を戻しましょう(コードはここ)。
とは言っても 残りは、subscription.index_ に 先ほど _socket を登録しているだけです。

では、もう一度 create_rx_filter_bcm に戻りましょう(コードはここ)。
subscription.socket_ に socket が登録されたので、write_message を呼び出すことができます。
write_message(コードはこちら)では、

		if (::sendto(socket(), &obj, size, 0, (const struct sockaddr*)&get_tx_address(), sizeof(get_tx_address())) < 0)

で、sendtobcm_msgobjtx_address に送信します。

ここまでで、create_rx_filter の中身を見てみましたので、次は add_to_event_loop を見てみましょう。

add_to_event_loop

復習ですが、今まで見てきた create_rx_filter の部分は

low-can-cb.cpp
			can_subscription = std::make_shared<low_can_subscription_t>(low_can_subscription_t(event_filter));
+			if(can_subscription->create_rx_filter(sig) < 0)
				return -1;
+			if(add_to_event_loop(can_subscription) < 0)
				return -1;

になります(コードはこちら)。

この create_rx_filter の次に add_to_event_loop が呼ばれます。
では、この add_to_event_loop を見てみましょう。

add_to_event_loop の内容は、下のようなもののみになります。

low-can-cb.cpp
static int add_to_event_loop(std::shared_ptr<low_can_subscription_t>& can_subscription)
{
		struct sd_event_source* event_source = nullptr;
		return ( sd_event_add_io(afb_daemon_get_event_loop(),
			&event_source,
			can_subscription->get_socket()->socket(),
			EPOLLIN,
			read_message,
			can_subscription.get()));
}

sd_event_add_io の使い方(こちら)を見ないといけないのですが、
簡単に言えば、第二引数が null の場合は 第五引数の関数が第二引数・第三引数のファイルディスクリプターやsocketなど・第四引数・第六引数を取って、第一引数のループを回すイメージになると思います。
今回は、read_message(&event_source, can_subscription->get_socket()->socket(), EPOLLIN, can_subscription.get()) のループを回しています。

では、この read_message を見てみましょう(コードはこちら)。
まずは、

low-can-hat.cpp
				std::shared_ptr<message_t> message = s->read_message();

の部分で、socketcan_bcm_t::read_message() を呼び出しています。
この socketcan_bcm_t::read_message()こちらにあるんですが、内容は、

socketcan-bcm.cpp
		ssize_t nbytes = ::recvfrom(socket(), &msg, sizeof(msg), 0, (struct sockaddr*)&addr, &addrlen);

で、先ほど sendTo されたデータを msg に格納しています。
その後で、msg.fd_frames のデータの型が canfd_frame なので、それを can_message_t に変える関数・can_message_t::convert_from_frame も呼び、can_message_t の型で返しています。

さて、元の read_message(sd_event_source *event_source, int fd, uint32_t revents, void *userdata) に戻る(コードはこちら)と、

low-can-hat.cpp
					push_n_notify(message);

で、push_n_notify が呼び出されます。
push_n_notify では、

low-can-hat.cpp
static void push_n_notify(std::shared_ptr<message_t> m)
{
	can_bus_t& cbm = application_t::instance().get_can_bus_manager();
	{
		std::lock_guard<std::mutex> can_message_lock(cbm.get_can_message_mutex());
	 	cbm.push_new_can_message(m);
	}
	cbm.get_new_can_message_cv().notify_one();
}

ここで、push_new_can_messagecan_message_q_msg を push して、前に出てきた can_decode_message(コードはこちら)で wait された部分が再開します。

can-bus.cpp
		new_can_message_cv_.wait(can_message_lock);
		while(!can_message_q_.empty())
		{
+			std::shared_ptr<message_t>  message = next_can_message();
			can_message_lock.unlock();

			{
				std::lock_guard<std::mutex> subscribed_signals_lock(sm.get_subscribed_signals_mutex());
				map_subscription& s = sm.get_subscribed_signals();
				if(application_t::instance().get_diagnostic_manager().is_diagnostic_response(message))
					process_diagnostic_signals(application_t::instance().get_diagnostic_manager(), message, s);
				else
					process_signals(message, s);
			}
			can_message_lock.lock();
		}
+		new_decoded_can_message_.notify_one();
		can_message_lock.unlock();

message で 先ほど追加した can_message_q_ からデータを取得します(process_signalsmap_subscription& s は初期化されてデータがセットされていないので無視していい)。
最後に、2つ目の wait の new_decoded_can_message_ を再開します。

can-bus.cpp
		new_decoded_can_message_.wait(decoded_can_message_lock);
		while(!vehicle_message_q_.empty())
		{
			std::pair<int, openxc_VehicleMessage> v_message = next_vehicle_message();
			decoded_can_message_lock.unlock();
			{
				std::lock_guard<std::mutex> subscribed_signals_lock(sm.get_subscribed_signals_mutex());
				map_subscription& s = sm.get_subscribed_signals();
				if(s.find(v_message.first) != s.end() && afb_event_is_valid(s[v_message.first]->get_event()))
				{
					jo = json_object_new_object();
					jsonify_vehicle(v_message.second, s[v_message.first]->get_signal(), jo);
					if(afb_event_push(s[v_message.first]->get_event(), jo) == 0)
					{
						if(v_message.second.has_diagnostic_response)
							on_no_clients(s[v_message.first], v_message.second.diagnostic_response.pid, s);
						else
							on_no_clients(s[v_message.first], s);
					}
				}
			}
			decoded_can_message_lock.lock();
		}
		decoded_can_message_lock.unlock();

が、vehicle_message_q_ に値はないので、中身のループは再現されません。

2回目の add_to_event_loop まで

なので、一旦 add_to_event_loop のループを抜けて、次の処理に移ります(コードはこちら)。
次は、subscribe_unsubscribe_signal になりますが、ここで can_subscription に afb のイベントを作成して subscribe し、signals_manager_tsubscribed_signals_ (型は map_subscription) に socket, can_subscription を代入します。

low-can-cb.cpp
	// Susbcription part
	if(subscribe)
	{
		/* There is no valid request to subscribe so this must be an
		 * internal permanent diagnostic request. Skip the subscription
		 * part and don't register it into the current "low-can"
		 * subsciptions.
		 */
		if(! request)
			return 0;

		// Event doesn't exist , so let's create it
+		if ((ret = can_subscription->subscribe(request)) < 0)
			return ret;

		if(! subscription_exists)
+				s[sub_index] = can_subscription;

		return ret;
	}

これで、前回のループでは new_decoded_can_message_ の中の vehicle_message_q_ に値はなく、動かなかったループが動くようになります。

ここで、もう一度 can_decode_message (ここ) と can_event_push (ここ) の While ループが再開して、wait で止まっています (ここ) が、add_to_event_loop のループ で read_message が呼ばれ、その中の push_n_notify の中で cbm.get_new_can_message_cv().notify_one(); され、wait されていた while ループが再度動きます。

まず can_decode_message ですが、流れは前回と同じですが、process_signals (ここ) の中の s.find(subscription_id)s.end() にならないので、中身の処理が動きます (ここ)。

まず、process_signals の中身ですが、

can-bus.cpp
		// messages
		if(subscription->get_message_definition() != nullptr)
		{
			openxc_DynamicField dynamicField_tmp;
			json_object *signal_json_tmp;
			decoded_message = build_DynamicField_json(json_object_new_array());
			for(std::shared_ptr<signal_t> sig : subscription->get_message_definition()->get_signals())
			{
				signal_json_tmp = json_object_new_object();
+				dynamicField_tmp = decoder_t::translate_signal(*sig, message, &send);
				json_object_object_add(signal_json_tmp,"name", json_object_new_string(sig->get_name().c_str()));
				jsonify_DynamicField(dynamicField_tmp,signal_json_tmp);
				if(sig != nullptr && sig->get_unit() != "")
					json_object_object_add(signal_json_tmp, "unit", json_object_new_string(sig->get_unit().c_str()));
				json_object_array_add(decoded_message.json_value,signal_json_tmp);
			}
		}
		else // signal
		{
+			decoded_message = decoder_t::translate_signal(*subscription->get_signal(), message, &send);
		}

subscription の signal が複数だと for文で・1つだと 単純に decoder_t::translate_signal を呼び出しています。
これも前回の「デコード・エンコード」で出てきた「デコード」、つまり データフレーム から必要な情報を抽出する作業になりますね。

次の build_SimpleMessagebuild_VehicleMessage (ここ) は、Vehicle_Message という形に変えているのですが、decoded_message を特に触れていないので一旦はしょります。
最後に、ここvehicle_message_q_socket_idVehicle_messagedecoded_message を代入しています。

そして次は2回目の can_event_push になりますが、ここでは vehicle_message_q_ に内容が入っているので、ループの中が再現されます。

can-bus.cpp
		while(!vehicle_message_q_.empty())
		{
+			std::pair<int, openxc_VehicleMessage> v_message = next_vehicle_message();
			decoded_can_message_lock.unlock();
			{
				std::lock_guard<std::mutex> subscribed_signals_lock(sm.get_subscribed_signals_mutex());
				map_subscription& s = sm.get_subscribed_signals();
+				if(s.find(v_message.first) != s.end() && afb_event_is_valid(s[v_message.first]->get_event()))
				{
					jo = json_object_new_object();
					jsonify_vehicle(v_message.second, s[v_message.first]->get_signal(), jo);
					if(afb_event_push(s[v_message.first]->get_event(), jo) == 0)
					{
+						if(v_message.second.has_diagnostic_response)
							on_no_clients(s[v_message.first], v_message.second.diagnostic_response.pid, s);
						else
							on_no_clients(s[v_message.first], s);
					}
				}
			}
			decoded_can_message_lock.lock();
		}
		decoded_can_message_lock.unlock();

先ほどの vehicle_message_q_next_vehicle_message を元に抽出され、まずは vehicle_message_q_ の1つ目の要素 socket_id が s にあるかで条件分岐します。今回はあるので、条件を進むと、afb_event_push (ここの Sample tuto 2 の login成功時・logout時で呼び出されています) しています。

この afb_event_push

ON-EVENT low-can/messages.doors.driver.open({"event":"low-can\/messages.doors.driver.open","data":{"name":"messages.doors.driver.open","value":true, "timestamp": 1505812906020023},"jtype":"afb-event"})

の結果が表示されます。
ここまでが、subscribe の概要でした。

概要まとめ

ざっくりと概要をまとめると、

  1. init_binding で メッセージのどこに何の情報があるかの情報を取得する。
  2. can_decode_message, can_event_push の while が飛ぶ。
  3. low-can-cb.cppsubscribe_unsubscribe_signalscreate_rx_filteradd_to_event_loop が呼ばれる。
  4. まず create_rx_filter で データを encode して、bcm を通じて encode したデータを sendTo で送信。
  5. add_to_event_loop で recvFrom で受信。
  6. 5で 受信したデータを、1回目の add_to_event_loop では特に subscribe することはないが、2回目の add_to_event_loop で subscribe し、結果を表示。

のようになると思います。
ここではあくまでも subscribe のループ(add_to_event_loop から can_decode_message, can_event_push の while につなげる流れ)を作っているだけなので、初期化の時の sendTo しか呼ばれていません。もし Can パケットを送りたい場合は send_message などを投げる必要があります。これに関して言えば、また別の機会に見れたらいいと思います。

C, C++ 初心者の備忘録でしたが、ここまで長い中お読みいただき、ありがとうございました。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0