はじめに
以下の記事の続きです。
前回の記事では、Raspberry Pi Pico2上でZephyr RTOSとPigweedを組み合わせ、USB経由でのRPC (Remote Procedure Call) を実現しました。
今回はこの環境をさらに拡張し、「RPCによる制御」と「デバイスのログ出力」を同一のUSB接続上で共存させます。さらに、ログ出力には pw_tokenizer を採用し、バイナリ化による通信帯域の節約とフラッシュメモリの効率化を図ります。
主な実装内容は以下の通りです。
-
pw_log / pw_tokenizer: ログを文字列ではなくトークン(ハッシュ値)として送信
-
HDLC Multiplexing: RPCパケットとログパケットを論理アドレスで多重化
-
Host Side: Pythonスクリプト(またはpw_console)で、ログの復元(sdetokenize)とRPC実行を同時に処理
完成物はこちらです。
PigweedのHDLCフォーマットについて
シリアル通信(UART/USB-CDC)は、そのままでは単なるバイトストリームであり、データの区切りが分かりません。そこでPigweedでは、データを構造化して送受信するために HDLC (High-Level Data Link Control) ライクなフレーミングを採用しています。
PigweedのHDLCフレームは以下のような構造になっています。
[Flag (0x7E)] [Address] [Control] [ Payload (Data) ] [CRC-32] [Flag (0x7E)]
この中で最も重要なのが Address(アドレス) フィールドです。 Pigweedはこのアドレスを使って、1つの物理的なシリアル接続の中に「複数の論理チャンネル」を作ることができます。これを利用して、RPC用のデータとログ用のデータを混在させて送受信することが可能になります。
HDLCフォーマットを使ったRPC
前回の記事で実装したRPCも、実はこのHDLCフレームの中に pw_rpc のパケット(Protocol Buffersでシリアライズされたデータ)を包んで送信していました。
RPCにはデフォルトでアドレス82=ord('R')が割り当てられています。
- Address 82: RPCパケット (Request / Response)
ホスト側のツール(pw_console や Pythonクライアント)は、受信したHDLCフレームのアドレスを見て「これはRPCのレスポンスだ」と判断し、Protocol Buffersとしてデコードして関数呼び出しの結果を返します。
HDLCを使ったLogとTokenize
今回は、このパイプラインにもう一つの論理チャンネル「ログ」を追加します。
- Address 1: Logパケット
単純に文字列を流すこともできますが、組み込み機器では「通信帯域」と「バイナリサイズ」が貴重です。そこで Tokenized Loggingを使用します。これは、長いログ文字列(例: "Sensor A value: 123")をデバイスに持たせず、コンパイル時に計算した 32bitのトークン(ハッシュ値) と引数データだけを送信する仕組みです。
情報の復元(Detokenize)は、コンパイル時に生成されたデータベース(.elf ファイルなど)を使ってPC側で行います。
今回の構成では、USBケーブルの中を以下の図のように2種類のパケットが多重化されて流れることになります。
これにより、デバッグポート(UARTピン)を別途接続することなく、USBケーブル1本で「高度な制御」と「詳細なログ確認」の両立が可能になります。
実装
Firmware(デバイス側)とPC(ホスト側)の実装について解説します。
Firmware
Kconfigの設定
ZephyrのログシステムをPigweedにフックします。
ログはTokenizeして出力するようにします。
後述の排他制御のため、ISR中にログが出力されないようにします。
CONFIG_LOG=y
CONFIG_LOG_BACKEND_UART=y
CONFIG_PIGWEED_LOG_ZEPHYR=y
CONFIG_PIGWEED_TOKENIZER=y
CONFIG_PIGWEED_LOG_TOKENIZED=y
CONFIG_LOG_MODE_DEFERRED=y
排他制御用のmutexを作成
ログ出力とRPCに同じ通信経路を使うため、排他制御が必要です。
書き込み競合を防ぐため、k_mutex を定義します。
struct k_mutex usb_write_lock;
...main
k_mutex_init(&usb_write_lock);
ログ出力するglobal関数の実装
下記の関数を実装すれば、Zephyrのログ出力がUSB経由になります。
extern "C" void pw_log_tokenized_HandleLog(uint32_t metadata,
const uint8_t log_buffer[],
size_t size_bytes) {
k_mutex_lock(&usb_write_lock, K_FOREVER);
pw::hdlc::WriteUIFrame(pw::hdlc::kDefaultLogAddress,
pw::as_bytes(pw::span(log_buffer, size_bytes)),
serial_writer);
k_mutex_unlock(&usb_write_lock);
}
ここで、kDefaultLogAddress=1 となっています。
RPCのresponseをハンドルするClassの作成
RPCのresponseもMutexになるようにRpcChannelOutputを継承したクラスを作り、RPCの処理をそちらに差し替えます。
class ThreadSafeHdlcChannelOutput : public pw::hdlc::RpcChannelOutput {
public:
ThreadSafeHdlcChannelOutput(pw::stream::Writer& writer, uint8_t address,
const char* name)
: pw::hdlc::RpcChannelOutput(writer, address, name) {}
// Override Send to make it thread-safe.
pw::Status Send(pw::ConstByteSpan buffer) override {
k_mutex_lock(&usb_write_lock, K_FOREVER);
pw::Status status = pw::hdlc::RpcChannelOutput::Send(buffer);
k_mutex_unlock(&usb_write_lock);
return status;
}
};
...
ThreadSafeHdlcChannelOutput hdlc_channel_output(
serial_writer, pw::hdlc::kDefaultRpcAddress, "HDLC_OUT");
pw::rpc::Channel channels[] = {
pw::rpc::Channel::Create<1>(&hdlc_channel_output)};
ここで、kDefaultRpcAddress = 82('R')です。
この構成により、USBケーブルの中では以下のようにデータが整理されて流れます。
-
アドレス82: RPCパケット(ThreadSafeHdlcChannelOutput経由)
-
アドレス1: ログパケット(pw_log_tokenized_HandleLog経由)
- データ送信は排他となる
PC (Pythonスクリプト)
以下のように、Device Logをdetokenizeしてlog出力する関数を作成し、HdlcRpcClientに登録します。
# create logger
device_log = logging.getLogger("device")
# Create detokenizer
detokenizer = detokenize.Detokenizer(os.path.realpath(elf_path)) # "build/zephyr/zephyr.elf"
# function to process device log
def detoken(data: bytes):
result = detokenizer.detokenize(data)
text = str(result)
msg_match = re.search(r"msg♦(.*?)■", text)
file_match = re.search(r"file♦(.*?)($|■)", text)
message = msg_match.group(1) if msg_match else text
if file_match:
full_path = file_match.group(1)
filename = os.path.basename(full_path)
else:
filename = "unknown"
device_log.info(f"{filename}, {message}")
# attach detoken to RPC/Log client
client = HdlcRpcClient(ser, [service_pb2], default_channels(write), output=detoken)
内部ソースを見たところ、HdlcRpcClientでRPC(Channel=82)とログ出力(Channel=1)を識別して処理を分けているので、これで動きます。
動作確認
単体スクリプト
python tools/client.py
で出力は下記のようになります。
# Echo
Sending Echo...
INFO:device:main.cpp, Echo requested: 14
# LED ON
Turning LED ON...
INFO:device:main.cpp, LED ON requested
LED ON Success
# LED OFF
Turning LED OFF...
INFO:device:main.cpp, LED OFF requested
LED OFF Success
`INFO:device:...'がデバイスのログです。
LED点灯だけでなく、ログからもHostからの指令を受けとれていることが確認できました!
pw_consoleへの組み込み
前回からの差分は、PwConsoleEmbedに渡すloggerの変更です。
ログ出力用のloggerを"Device Logs" として登録します。
loggers = {
"Host Logs": [logging.getLogger(__package__), logging.getLogger(__name__)],
"Device Logs": [logging.getLogger("device")],
}
Consoleのユーザーインターフェースはこのようになりました。
デバイスのログ、Pythonのログ、コマンド結果、コマンドが1画面に表示されます!!
まとめ
本記事では、pw_logとpw_tokenizer を導入し、デバイスのログをトークン化して USB 経由で出力する環境を構築しました。
HDLCフレーミングによる多重化や、排他制御の実装など、導入には仕組みの理解と実装の手間がかかりますが、それに見合う以下のメリットが得られました。
-
通信量の節約: 文字列そのものを送らないため、帯域を圧迫しない
-
単一ポートでの共存: RPCによる操作とログ出力を1つの経路で両立できる
-
プログラムサイズの削減: ログ文字列がバイナリに含まれないため、肥大化を抑えられる
USB一本で開発できて、素晴らしいですね!
