2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Windows環境でUDS通信をシミュレートする

Posted at

Windows環境でUDS通信をシミュレートしました。

:v: アピールポイント :v:

  • USBシリアルデバイス(CANable)をWSLのDockerコンテナから利用
  • 過去のQiita記事のPythonスクリプトを流用

ラズパイとは異なり、クライアントとサーバを同時にシミュレートできないため、CANバス上に対向機器が必要です。

  • 「Windows環境で」といいつつ、WSL(Ubuntu)で実行しています
  • CANableはLinuxではSocketCANインタフェース(can0など)ではなくシリアルデバイス(/dev/ttyACM0)と認識されるため、candumpコマンドやループバック設定が使えません(変更にはFirmware書きかえが必要)

使用機器

手順

1. WSL2(Ubuntu)+Docker環境を構築

WindowsでWSL2(Ubuntu)+Docker環境を準備します。Docker Desktopは使用しません。設定手順は割愛します。

2. WSL2(Ubuntu)からUSBシリアルデバイスを利用可能にする

CANableをPCに接続するとUSBシリアルデバイスとして認識されます。これをWSL2(Ubuntu)から使用できるように設定します。

2.1 usbipd-winをインストール

以下のリンクからMSIインストーラをダウンロードし、実行後にPCを再起動します。

2.2 USBシリアルデバイスをWSLに接続

  1. 管理者権限のコマンドプロンプトで、USBデバイスのBUSIDを確認
    usbipd list
    
    出力例:
    Connected:
    BUSID  VID:PID    DEVICE                                   STATE
    1-1    16d0:117e  USB シリアル デバイス (COM4)            Not shared
    
  2. USBデバイスを共有
    usbipd bind --busid 1-1
    
  3. WSL2(Ubuntu)を起動
  4. USBデバイスをWSL2に接続
    usbipd attach --busid 1-1 --wsl
    
  5. WSL2内でデバイスが認識されていることを確認
    ls -l /dev/ttyACM0
    
    出力例:
    crw-rw---- 1 root dialout 166, 0 Mar  9 12:47 /dev/ttyACM0
    

3. 必要なPythonパッケージをインストールしたDockerコンテナを起動

作業ディレクトリに移動し、以下の手順を実行します。

3.1 Dockerfileの作成

FROM    python:3
WORKDIR /mnt
RUN apt-get update && \
    apt-get install -y --no-install-recommends can-utils && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*
RUN     pip3 install --upgrade pip
RUN     pip3 install python-can can-isotp udsoncan pyserial

使用するPythonパッケージ:

can-utilsは必須ではない

3.2 Dockerイメージのビルド

docker image build -t myudsoncan:latest -f Dockerfile .

3.3 コンテナの起動

docker container run --name myudsoncan --network host --rm --device=/dev/ttyACM0:/dev/ttyACM0 -v .:/mnt -it myudsoncan:latest bash

/dev/ttyACM0 をコンテナと連携

4. UDS Clientをシミュレート

4.1 UDS Client のPythonスクリプトを作成

ECUReset(0x11)サービスのリクエストを送信し、レスポンスを受信するスクリプトを作業用ディレクトリに保存します。

uds_client.py
import can
from udsoncan.connections import PythonIsoTpConnection
from udsoncan.client import Client
import udsoncan.configs
import isotp

# Refer to isotp documentation for full details about parameters
isotp_params = {
 'stmin': 32,                            # Will request the sender to wait 32ms between consecutive frame. 0-127ms or 100-900ns with values from 0xF1-0xF9
 'blocksize': 8,                         # Request the sender to send 8 consecutives frames before sending a new flow control message
 'wftmax': 0,                            # Number of wait frame allowed before triggering an error
 'tx_data_length': 8,                    # Link layer (CAN layer) works with 8 byte payload (CAN 2.0)
 'tx_data_min_length': None,
 'tx_padding': 0,                        # Will pad all transmitted CAN messages with byte 0x00.
 'rx_flowcontrol_timeout': 1000,         # Triggers a timeout if a flow control is awaited for more than 1000 milliseconds
 'rx_consecutive_frame_timeout': 1000,   # Triggers a timeout if a consecutive frame is awaited for more than 1000 milliseconds
 'max_frame_size': 4095,                 # Limit the size of receive frame.
 'can_fd': False,                        # Does not set the can_fd flag on the output CAN messages
 'bitrate_switch': False,                # Does not set the bitrate_switch flag on the output CAN messages
 'rate_limit_enable': False,             # Disable the rate limiter
 'rate_limit_max_bitrate': 1000000,      # Ignored when rate_limit_enable=False. Sets the max bitrate when rate_limit_enable=True
 'rate_limit_window_size': 0.2,          # Ignored when rate_limit_enable=False. Sets the averaging window size for bitrate calculation when rate_limit_enable=True
 'listen_mode': False,                   # Does not use the listen_mode which prevent transmission.
}

uds_config = udsoncan.configs.default_client_config.copy()

# bus = can.Bus(channel='can0', interface='socketcan')
bus = can.Bus(interface='slcan', channel='/dev/ttyACM0', bitrate=500000)

notifier = can.Notifier(bus, [can.Printer()])                                       # Add a debug listener that print all messages
tp_addr = isotp.Address(isotp.AddressingMode.Normal_11bits, txid=0x123, rxid=0x456) # Network layer addressing scheme
stack = isotp.NotifierBasedCanStack(bus=bus, notifier=notifier, address=tp_addr, params=isotp_params)  # Network/Transport layer (IsoTP protocol). Register a new listenenr
conn = PythonIsoTpConnection(stack)                                                 # interface between Application and Transport layer
with Client(conn, config=uds_config) as client:                                     # Application layer (UDS protocol)
    try:
        client.ecu_reset(reset_type=1)
        print('Success!')
    except:
        print('Reset failed')

ラズパイ用スクリプトからの変更点は以下のみ

- bus = can.Bus(channel='can0', interface='socketcan')
+ bus = can.Bus(interface='slcan', channel='/dev/ttyACM0', bitrate=500000)

4.2 UDS Client の実行

docker exec -it myudsoncan python uds_client.py

ECUReset(0x11) サービスのリクエストを送信して、レスポンスを待ちます。1秒でタイムアウト。

5. UDS Serverをシミュレート

5.1 UDS Server のPythonスクリプトを作成

uds_server.py
import can
from udsoncan.connections import PythonIsoTpConnection
from udsoncan.client import Client
import udsoncan.configs
import isotp
from udsoncan import Request,Response
from udsoncan.services import ECUReset

# Refer to isotp documentation for full details about parameters
isotp_params = {
 'stmin': 32,                            # Will request the sender to wait 32ms between consecutive frame. 0-127ms or 100-900ns with values from 0xF1-0xF9
 'blocksize': 8,                         # Request the sender to send 8 consecutives frames before sending a new flow control message
 'wftmax': 0,                            # Number of wait frame allowed before triggering an error
 'tx_data_length': 8,                    # Link layer (CAN layer) works with 8 byte payload (CAN 2.0)
 'tx_data_min_length': None,
 'tx_padding': 0,                        # Will pad all transmitted CAN messages with byte 0x00.
 'rx_flowcontrol_timeout': 1000,         # Triggers a timeout if a flow control is awaited for more than 1000 milliseconds
 'rx_consecutive_frame_timeout': 1000,   # Triggers a timeout if a consecutive frame is awaited for more than 1000 milliseconds
 'max_frame_size': 4095,                 # Limit the size of receive frame.
 'can_fd': False,                        # Does not set the can_fd flag on the output CAN messages
 'bitrate_switch': False,                # Does not set the bitrate_switch flag on the output CAN messages
 'rate_limit_enable': False,             # Disable the rate limiter
 'rate_limit_max_bitrate': 1000000,      # Ignored when rate_limit_enable=False. Sets the max bitrate when rate_limit_enable=True
 'rate_limit_window_size': 0.2,          # Ignored when rate_limit_enable=False. Sets the averaging window size for bitrate calculation when rate_limit_enable=True
 'listen_mode': False,                   # Does not use the listen_mode which prevent transmission.
}

uds_config = udsoncan.configs.default_client_config.copy()

# bus = can.Bus(channel='can0', interface='socketcan')
bus = can.Bus(interface='slcan', channel='/dev/ttyACM0', bitrate=500000)

notifier = can.Notifier(bus, [can.Printer()])                                       # Add a debug listener that print all messages
# tp_addr = isotp.Address(isotp.AddressingMode.Normal_11bits, txid=0x123, rxid=0x456) # Network layer addressing scheme
tp_addr = isotp.Address(isotp.AddressingMode.Normal_11bits, txid=0x456, rxid=0x123) # txid,rxidを入れ換え
stack = isotp.NotifierBasedCanStack(bus=bus, notifier=notifier, address=tp_addr, params=isotp_params)  # Network/Transport layer (IsoTP protocol). Register a new listenenr
conn = PythonIsoTpConnection(stack)                                                 # interface between Application and Transport layer

# 受信待ち
conn.open()
payload = conn.wait_frame(timeout=None)

# リクエスト解析
request = Request.from_payload(payload)
if request.service and hasattr(request.service, "_sid"):
    sid = request.service._sid  # SID を取得

    # Reset要求にポジティブレスポンスを返す
    if sid == 0x11:
        response = Response(service=ECUReset, code=Response.Code.PositiveResponse, data=b'\x01')
        payload = response.get_payload()
        conn.send(payload)
    else:
        print('未対応サービスのリクエスト')
else:
    print('無効なリクエスト')

conn.close()

ラズパイ用スクリプトからの変更点は以下のみ

- bus = can.Bus(channel='can0', interface='socketcan')
+ bus = can.Bus(interface='slcan', channel='/dev/ttyACM0', bitrate=500000)

5.2 UDS Server の実行

docker exec -it myudsoncan python uds_server.py

CANバスに接続した別の機器からの ECUReset(0x11) サービスのリクエストを受信したら、ポジティブレスポンスを返します。

以上です

2
0
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?