AGL とは?
Automotive Grade Linux (以下、AGL) という コネクテッドカーに活用するオープンプラットフォームを開発する OSSのプロジェクト。
コネクテッドカーに関する 知見がつまっています。
can-low-level とは?
私の記事
を参考にしてもらえれば分かりますが、CAN 通信を、コマンドで解析できるようにするツール(リンクはこちら)になります。
前回は、この can-low-level のデコード・エンコードを見てみましたので、今回は そのソケット通信の部分を読んでみたいと思います。
概要
前回は Can のフレームからどのように車の情報を取得する方法(デコード)と、車の情報から Can のフレームを作る方法(エンコード)を見てみました。
今回は 実際に Can の情報が取得されて、デコードされる様子を見てみます。
では始めましょう!
※ 私自身も C, C++ 初心者なので、不明瞭な部分がまだあります。間違っている部分があったら、ご指摘ください。
low-can コマンドが叩かれてから、preinit されるまで
まず、can-low-level では 下のようなコマンドが叩いて、該当の Can メッセージを受信するのですが、
low-can subscribe { "event": "doors.driver.open" }
その定義は、low-can-apidef.json から afb が自動生成する hファイル low-can-apidef.h に書いてあります。
static const struct afb_verb_v2 _afb_verbs_v2_low_can[] = {
{
.verb = "subscribe",
.callback = subscribe,
.auth = NULL,
.info = "Subscribe to CAN signals events",
.session = AFB_SESSION_NONE_V2
},
{
.verb = "unsubscribe",
.callback = unsubscribe,
.auth = NULL,
.info = "Unsubscribe previously suscribed signals.",
.session = AFB_SESSION_NONE_V2
},
// 以下 省略
ここでは subscribe
コマンドを見ていますが、他にも unsubcribe
, get
, list
, auth
, write
コマンドがあります。それぞれの内容の定義は low-can-cb.cpp にあります。
ですが ここで定義された subscirbe
などを実行する前には 初期化作業を行います。それが [low-can-apidef.h] の preinit 部分 と init の部分 です。
const struct afb_binding_v2 afbBindingV2 = {
.api = "low-can",
.specification = _afb_description_v2_low_can,
.info = "",
.verbs = _afb_verbs_v2_low_can,
+ .preinit = NULL,
+ .init = initv2,
.onevent = NULL,
.noconcurrency = 0
};
このファイルでは preinit は 何も呼ばれず、init では initv2 が呼ばれますが、master ブランチでは low-can-apidef.json ファイルの /info/x-binding-c-generator/preinit
の load_config と /info/x-binding-c-generator/init
の init_binding が呼ばれます。
この load_config
が 呼ばれる時に、おそらく (すみません確証がないのですが...) example配下のc++ファイル が呼ばれ、singleton の application_t の add_message_set
で デコードするための車のデータが登録されます(「0x21 番地 から bit_position 52 の bit_length 1 の場所に steering_wheel.cruise.cancel の boolean 情報を登録してある
」など)。
この add_message_set
で登録された application_t の message_set_
を使って socketCan から受信した情報をデコードします。
ここまでをまとめると、
low-can subscribe { "event": "doors.driver.open" }
を叩くと、preinitで登録された関数
→ initで登録された関数
→ subscribe
の順番で処理が走るのですが、まずは preinit
の部分で Can メッセージを各車の情報にデコードするための情報が登録されます。
次に、init_binding
の部分を読んでみましょう。
init_binding
この init_binding
(コードはこちら) では、主に start_threads
(コードはこちら と こちら) という関数を呼び出していて、それ以後の処理は 最初にコマンドを叩いた段階ではあまり関係ありません。
void can_bus_t::start_threads()
{
is_decoding_ = true;
+ th_decoding_ = std::thread(&can_bus_t::can_decode_message, this);
th_decoding_.detach();
is_pushing_ = true;
+ th_pushing_ = std::thread(&can_bus_t::can_event_push, this);
th_pushing_.detach();
}
ここでは、condition_variable で wait される can_decode_message
と can_event_push
の 2つの関数が呼ばれます。この can_decode_message
の wait は、後ほど出てきますが low-can-hat.cpp
(コードはここ) で notify_one
され、この can_decode_message
の中で can_event_push
が notify_one
されます。
ここで出てきた can_decode_message
と can_event_push
の wait
は後ほど出るので忘れないようにしましょう。
次に subscribe
を見てみます。
subscribe
subscribe の内容 を見てみると、do_subscribe_unsubscribe
をしているだけですね。この do_subscribe_unsubscribe
を見てみましょう。
まず do_subscribe_unsubscribe
を見ると、request が配列かそうでないかで分岐していますが、ここでは簡略化のために配列でないと仮定して見てみます。
この request は
low-can subscribe { "event": "doors.driver.open" }
の event 部分になります。
ここも読んだら process_one_subscribe_args
を呼び出しているだけですね (コードはここ)。
この process_one_subscribe_args
も event を取り出して、one_subscribe_unsubscribe_events
を呼び出しているだけに見えますね (コードはここ)。
one_subscribe_unsubscribe_events
では singleton の signals_manager_t
から、 application_t::instance().get_all_signals()
の 先ほどの _message_set
(車の情報をデコードするための情報) から該当する デコードするための情報を取得する find_signals を呼び出しています。
今回の low-can subscribe { "event": "doors.driver.open" }
のコマンドで言えば、doors.driver.open
が signals.json に登録されているので、それが sf に入る形になります。
ここで出てくる event_filter
は frequency や min (デコードされた値がそれ以上でないとフィルターする値) max (デコードされた値がそれ以下でないとフィルターする値) など、値をデコードされた値をフィルターする機能もありますが、今回の low-can subscribe { "event": "doors.driver.open" }
では取り扱っていないので一旦無視します。
なので、次の関数は subscribe_unsubscribe_signals
(リンクはここ) になります。
static int subscribe_unsubscribe_signals(afb_req_t request,
bool subscribe,
const struct utils::signals_found& signals,
struct event_filter_t& event_filter)
{
int rets = 0;
utils::signals_manager_t& sm = utils::signals_manager_t::instance();
std::lock_guard<std::mutex> subscribed_signals_lock(sm.get_subscribed_signals_mutex());
map_subscription& s = sm.get_subscribed_signals();
+ rets += subscribe_unsubscribe_diagnostic_messages(request, subscribe, signals.diagnostic_messages, event_filter, s, false);
+ rets += subscribe_unsubscribe_signals(request, subscribe, signals.signals, event_filter, s);
return rets;
}
ここでは主に subscribe_unsubscribe_diagnostic_messages
と subscribe_unsubscribe_signals
の2つの関数を呼び出しています。
が、今回は subscribe_unsubscribe_diagnostic_messages
を次回の説明に回して、subscribe_unsubscribe_signals
(コードはここ) を説明したいと思います。
まず、map_subscription
の s
ですが、初期値には何も入っていないので、it
はあまり見なくても大丈夫です。
初期値なので、呼び出されるのが create_rx_filter
と add_to_event_loop
の2つの関数になります。
can_subscription = std::make_shared<low_can_subscription_t>(low_can_subscription_t(event_filter));
+ if(can_subscription->create_rx_filter(sig) < 0)
return -1;
+ if(add_to_event_loop(can_subscription) < 0)
return -1;
この、create_rx_filter
で socketCan
からのデータを sendTo
で 送信し、add_to_event_loop
のイベントループの中で recvFrom
を使って受信を行います。
sendTo と recvFrom の使い方は、
などを参考にしてみてください。
では まず、create_rx_filter
の中身を見てみましょう!
create_rx_filter
low-can-subscription
にある create_rx_filter
を見ると (コードはこちら)、まずは low_can_subscription_t::create_rx_filter_can(*this, sig);
を呼び出しているだけですね (j1939, isotp は今回ははしょるので)。
そうすると、create_rx_filter_can
に前回出てきた encoder_t::encode_data
が出てきていますね(コードはこちら)。
encoder_t::encode_data(subscription.signal_, data, true, false, true);
// 省略
cm = can_message_t( max_dlen,
sig->get_message()->get_id(),
length_msg,
false,
sig->get_message()->get_flags(),
data,
0);
ここで、subscription.signal_
のデータを使ってエンコードした結果を data
に挿入します。
その後のコードは bcm
( The Broadcast Manager, 詳細はこちら)という Can フレームの送信する機能 と 受信した Can フレームの変化を Can ID から認識する機能を使うためのコードになっています。
struct bcm_msg bcm_msg = subscription.make_bcm_head(RX_SETUP, subscription.signal_->get_message()->get_id(), flags_bcm, timeout, freq);
std::vector<canfd_frame> cfd_vect = cm.convert_to_canfd_frame_vector();
if(cfd_vect.size() > 1) //multi
{
AFB_ERROR("Not implemented yet");
return -1;
}
else if(cfd_vect.size() == 1)
{
subscription.add_one_bcm_frame(cfd_vect[0], bcm_msg);
}
else
{
AFB_ERROR("No data available");
return -1;
}
return create_rx_filter_bcm(subscription, bcm_msg);
ここでの bcm_msg
に add_one_bcm_frame
で、データを受け渡しています。
では、ここの最後に出てきた create_rx_filter_bcm
を見てみましょう。
create_rx_filter_bcm
は ここ にあって、ここで bcm
を通じて socketCan 通信をしていきます。
まず、
// Make sure that socket is opened.
if(subscription.open_socket(subscription, "", CAN_PROTOCOL) < 0)
return -1;
で、ソケット通信を始めています。ここの中身を見てみましょう。
open_socket
を見ると、
int ret = -1;
if(! subscription.socket_)
{
if(flags & CAN_PROTOCOL)
{
+ subscription.socket_ = std::make_shared<utils::socketcan_bcm_t>();
if( subscription.signal_ )
+ ret = subscription.socket_->open(subscription.signal_->get_message()->get_bus_device_name());
else if(! subscription.diagnostic_message_.empty())
ret = subscription.socket_->open(application_t::instance().get_diagnostic_manager().get_bus_device_name());
else if(! bus_name.empty())
ret = subscription.socket_->open(bus_name);
subscription.index_ = (int)subscription.socket_->socket();
}
subscription.signal_
はあるので、socketcan_bcm_t
の open(device_name)
が呼び出されます(コードはこちら)。
open(device_name)
の内容は、下のようになっています。
int socketcan_bcm_t::open(std::string device_name)
{
close();
struct ifreq ifr;
+ socket_ = socketcan_t::open(PF_CAN, SOCK_DGRAM, CAN_BCM);
// Attempts to open a socket to CAN bus
+ tx_address_.can_family = AF_CAN;
+ ::strcpy(ifr.ifr_name, device_name.c_str());
AFB_DEBUG("BCM socket ifr_name is : %s", ifr.ifr_name);
+ if(::ioctl(socket_, SIOCGIFINDEX, &ifr) < 0)
{
AFB_ERROR("ioctl failed. Error was : %s", strerror(errno));
close();
return -1;
}
+ tx_address_.can_ifindex = ifr.ifr_ifindex;
+ if(connect((struct sockaddr *)&tx_address_, sizeof(tx_address_)) < 0)
{
AFB_ERROR("Connect failed. %s", strerror(errno));
close();
return -1;
}
// Needed because of using systemD event loop. See sd_event_add_io manual.
+ fcntl(socketcan_t::socket_, F_SETFL, O_NONBLOCK);
return socket_;
}
socketcan::open
(コードはこちら)を見れば分かるんですが、ここらへんで Can の ソケット通信をしています。
PF_CAN
は AF_CAN
のことで、socket.h で アドレスが振られています(こちらのコードを参照)。
fcntl
でファイルディスクリプターを作っているのは、sd_event_add_io
というイベントループを呼び出す関数で必要になるからです。
open_socket
に話を戻しましょう(コードはここ)。
とは言っても 残りは、subscription.index_
に 先ほど _socket
を登録しているだけです。
では、もう一度 create_rx_filter_bcm
に戻りましょう(コードはここ)。
subscription.socket_
に socket が登録されたので、write_message
を呼び出すことができます。
write_message
(コードはこちら)では、
if (::sendto(socket(), &obj, size, 0, (const struct sockaddr*)&get_tx_address(), sizeof(get_tx_address())) < 0)
で、sendto
で bcm_msg
の obj
を tx_address
に送信します。
ここまでで、create_rx_filter
の中身を見てみましたので、次は add_to_event_loop
を見てみましょう。
add_to_event_loop
復習ですが、今まで見てきた create_rx_filter の部分は
can_subscription = std::make_shared<low_can_subscription_t>(low_can_subscription_t(event_filter));
+ if(can_subscription->create_rx_filter(sig) < 0)
return -1;
+ if(add_to_event_loop(can_subscription) < 0)
return -1;
になります(コードはこちら)。
この create_rx_filter
の次に add_to_event_loop
が呼ばれます。
では、この add_to_event_loop
を見てみましょう。
add_to_event_loop
の内容は、下のようなもののみになります。
static int add_to_event_loop(std::shared_ptr<low_can_subscription_t>& can_subscription)
{
struct sd_event_source* event_source = nullptr;
return ( sd_event_add_io(afb_daemon_get_event_loop(),
&event_source,
can_subscription->get_socket()->socket(),
EPOLLIN,
read_message,
can_subscription.get()));
}
sd_event_add_io の使い方(こちら)を見ないといけないのですが、
簡単に言えば、第二引数が null の場合は 第五引数の関数が第二引数・第三引数のファイルディスクリプターやsocketなど・第四引数・第六引数を取って、第一引数のループを回すイメージになると思います。
今回は、read_message(&event_source, can_subscription->get_socket()->socket(), EPOLLIN, can_subscription.get())
のループを回しています。
では、この read_message
を見てみましょう(コードはこちら)。
まずは、
std::shared_ptr<message_t> message = s->read_message();
の部分で、socketcan_bcm_t::read_message()
を呼び出しています。
この socketcan_bcm_t::read_message()
はこちらにあるんですが、内容は、
ssize_t nbytes = ::recvfrom(socket(), &msg, sizeof(msg), 0, (struct sockaddr*)&addr, &addrlen);
で、先ほど sendTo
されたデータを msg
に格納しています。
その後で、msg.fd_frames
のデータの型が canfd_frame
なので、それを can_message_t
に変える関数・can_message_t::convert_from_frame
も呼び、can_message_t
の型で返しています。
さて、元の read_message(sd_event_source *event_source, int fd, uint32_t revents, void *userdata)
に戻る(コードはこちら)と、
push_n_notify(message);
で、push_n_notify
が呼び出されます。
push_n_notify
では、
static void push_n_notify(std::shared_ptr<message_t> m)
{
can_bus_t& cbm = application_t::instance().get_can_bus_manager();
{
std::lock_guard<std::mutex> can_message_lock(cbm.get_can_message_mutex());
cbm.push_new_can_message(m);
}
cbm.get_new_can_message_cv().notify_one();
}
ここで、push_new_can_message
が can_message_q_
に msg
を push して、前に出てきた can_decode_message
(コードはこちら)で wait された部分が再開します。
new_can_message_cv_.wait(can_message_lock);
while(!can_message_q_.empty())
{
+ std::shared_ptr<message_t> message = next_can_message();
can_message_lock.unlock();
{
std::lock_guard<std::mutex> subscribed_signals_lock(sm.get_subscribed_signals_mutex());
map_subscription& s = sm.get_subscribed_signals();
if(application_t::instance().get_diagnostic_manager().is_diagnostic_response(message))
process_diagnostic_signals(application_t::instance().get_diagnostic_manager(), message, s);
else
process_signals(message, s);
}
can_message_lock.lock();
}
+ new_decoded_can_message_.notify_one();
can_message_lock.unlock();
message で 先ほど追加した can_message_q_
からデータを取得します(process_signals
は map_subscription& s
は初期化されてデータがセットされていないので無視していい)。
最後に、2つ目の wait の new_decoded_can_message_
を再開します。
new_decoded_can_message_.wait(decoded_can_message_lock);
while(!vehicle_message_q_.empty())
{
std::pair<int, openxc_VehicleMessage> v_message = next_vehicle_message();
decoded_can_message_lock.unlock();
{
std::lock_guard<std::mutex> subscribed_signals_lock(sm.get_subscribed_signals_mutex());
map_subscription& s = sm.get_subscribed_signals();
if(s.find(v_message.first) != s.end() && afb_event_is_valid(s[v_message.first]->get_event()))
{
jo = json_object_new_object();
jsonify_vehicle(v_message.second, s[v_message.first]->get_signal(), jo);
if(afb_event_push(s[v_message.first]->get_event(), jo) == 0)
{
if(v_message.second.has_diagnostic_response)
on_no_clients(s[v_message.first], v_message.second.diagnostic_response.pid, s);
else
on_no_clients(s[v_message.first], s);
}
}
}
decoded_can_message_lock.lock();
}
decoded_can_message_lock.unlock();
が、vehicle_message_q_
に値はないので、中身のループは再現されません。
2回目の add_to_event_loop まで
なので、一旦 add_to_event_loop
のループを抜けて、次の処理に移ります(コードはこちら)。
次は、subscribe_unsubscribe_signal
になりますが、ここで can_subscription に afb のイベントを作成して subscribe し、signals_manager_t
の subscribed_signals_
(型は map_subscription) に socket
, can_subscription
を代入します。
// Susbcription part
if(subscribe)
{
/* There is no valid request to subscribe so this must be an
* internal permanent diagnostic request. Skip the subscription
* part and don't register it into the current "low-can"
* subsciptions.
*/
if(! request)
return 0;
// Event doesn't exist , so let's create it
+ if ((ret = can_subscription->subscribe(request)) < 0)
return ret;
if(! subscription_exists)
+ s[sub_index] = can_subscription;
return ret;
}
これで、前回のループでは new_decoded_can_message_
の中の vehicle_message_q_
に値はなく、動かなかったループが動くようになります。
ここで、もう一度 can_decode_message
(ここ) と can_event_push
(ここ) の While ループが再開して、wait で止まっています (ここ) が、add_to_event_loop
のループ で read_message
が呼ばれ、その中の push_n_notify
の中で cbm.get_new_can_message_cv().notify_one();
され、wait されていた while ループが再度動きます。
まず can_decode_message
ですが、流れは前回と同じですが、process_signals
(ここ) の中の s.find(subscription_id)
が s.end()
にならないので、中身の処理が動きます (ここ)。
まず、process_signals
の中身ですが、
// messages
if(subscription->get_message_definition() != nullptr)
{
openxc_DynamicField dynamicField_tmp;
json_object *signal_json_tmp;
decoded_message = build_DynamicField_json(json_object_new_array());
for(std::shared_ptr<signal_t> sig : subscription->get_message_definition()->get_signals())
{
signal_json_tmp = json_object_new_object();
+ dynamicField_tmp = decoder_t::translate_signal(*sig, message, &send);
json_object_object_add(signal_json_tmp,"name", json_object_new_string(sig->get_name().c_str()));
jsonify_DynamicField(dynamicField_tmp,signal_json_tmp);
if(sig != nullptr && sig->get_unit() != "")
json_object_object_add(signal_json_tmp, "unit", json_object_new_string(sig->get_unit().c_str()));
json_object_array_add(decoded_message.json_value,signal_json_tmp);
}
}
else // signal
{
+ decoded_message = decoder_t::translate_signal(*subscription->get_signal(), message, &send);
}
subscription の signal が複数だと for文で・1つだと 単純に decoder_t::translate_signal
を呼び出しています。
これも前回の「デコード・エンコード」で出てきた「デコード」、つまり データフレーム から必要な情報を抽出する作業になりますね。
次の build_SimpleMessage
と build_VehicleMessage
(ここ) は、Vehicle_Message という形に変えているのですが、decoded_message
を特に触れていないので一旦はしょります。
最後に、ここ で vehicle_message_q_
に socket_id
と Vehicle_message
の decoded_message
を代入しています。
そして次は2回目の can_event_push
になりますが、ここでは vehicle_message_q_
に内容が入っているので、ループの中が再現されます。
while(!vehicle_message_q_.empty())
{
+ std::pair<int, openxc_VehicleMessage> v_message = next_vehicle_message();
decoded_can_message_lock.unlock();
{
std::lock_guard<std::mutex> subscribed_signals_lock(sm.get_subscribed_signals_mutex());
map_subscription& s = sm.get_subscribed_signals();
+ if(s.find(v_message.first) != s.end() && afb_event_is_valid(s[v_message.first]->get_event()))
{
jo = json_object_new_object();
jsonify_vehicle(v_message.second, s[v_message.first]->get_signal(), jo);
if(afb_event_push(s[v_message.first]->get_event(), jo) == 0)
{
+ if(v_message.second.has_diagnostic_response)
on_no_clients(s[v_message.first], v_message.second.diagnostic_response.pid, s);
else
on_no_clients(s[v_message.first], s);
}
}
}
decoded_can_message_lock.lock();
}
decoded_can_message_lock.unlock();
先ほどの vehicle_message_q_
が next_vehicle_message
を元に抽出され、まずは vehicle_message_q_ の1つ目の要素 socket_id
が s にあるかで条件分岐します。今回はあるので、条件を進むと、afb_event_push
(ここの Sample tuto 2 の login成功時・logout時で呼び出されています) しています。
この afb_event_push
で
ON-EVENT low-can/messages.doors.driver.open({"event":"low-can\/messages.doors.driver.open","data":{"name":"messages.doors.driver.open","value":true, "timestamp": 1505812906020023},"jtype":"afb-event"})
の結果が表示されます。
ここまでが、subscribe の概要でした。
概要まとめ
ざっくりと概要をまとめると、
-
init_binding
で メッセージのどこに何の情報があるかの情報を取得する。 -
can_decode_message
,can_event_push
の while が飛ぶ。 -
low-can-cb.cpp
のsubscribe_unsubscribe_signals
でcreate_rx_filter
とadd_to_event_loop
が呼ばれる。 - まず
create_rx_filter
で データを encode して、bcm を通じて encode したデータを sendTo で送信。 -
add_to_event_loop
で recvFrom で受信。 - 5で 受信したデータを、1回目の
add_to_event_loop
では特に subscribe することはないが、2回目のadd_to_event_loop
で subscribe し、結果を表示。
のようになると思います。
ここではあくまでも subscribe のループ(add_to_event_loop
から can_decode_message
, can_event_push
の while につなげる流れ)を作っているだけなので、初期化の時の sendTo しか呼ばれていません。もし Can パケットを送りたい場合は send_message
などを投げる必要があります。これに関して言えば、また別の機会に見れたらいいと思います。
C, C++ 初心者の備忘録でしたが、ここまで長い中お読みいただき、ありがとうございました。