0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

車両診断通信プロトコル:UDSをラズパイでシミュレート

Posted at

車両診断通信プロトコルのUDSをラズパイでシミュレートしました

:v:アピールポイント:v:

  • Client と Server の両方をシミュレート
  • マルチフレーム対応

ここで説明しているのはシングルフレームだけですが、通信データが大きい場合はマルチフレーム分割、組み立て、フロー制御が自動で行われます

使用機器

手順

1. ラズパイにCANインタフェースボードを追加してCAN通信できるようにする

以下のページを参考にラズパイでCAN通信できるようにする

以下を実行し、can0インタフェースが有効な状態にしておく

$ sudo ip link set can0 up type can bitrate 500000 loopback on

ラズパイ1台でClientとServerの両方をシミュレートするので、自分が送ったデータを自分で受信できるようにするためループバックをONにする

2. ラズパイにDockerをインストール

Docker公式の Install using the convenience scriptのぺージで紹介されている以下コマンドでDockerをインストール

curl -fsSL https://get.docker.com -o get-docker.sh
sudo sh get-docker.sh

また、sudoなしでDockerコマンド実行できるようにするため、以下ページを参考にしてDockerグループにユーザを追加

3. 必要なPythonパッケージをインストールしたDockerコンテナを起動

作業用ディレクトリに移動して以下を実施

3.1 Dockerfileを作成

Dockerfile
FROM    python:3
WORKDIR /mnt
RUN apt-get update && \
    apt-get install -y --no-install-recommends can-utils && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*
RUN     pip3 install --upgrade pip
RUN     pip3 install python-can can-isotp udsoncan

使用するPythonパッケージ

確認用にcandumpコマンドを使用するのでcan-utilsもインストール

3.2 イメージをビルド

docker image build -t myudsoncan:latest -f Dockerfile .

3.3 コンテナを起動

docker container run --name myudsoncan --network host --rm -v .:/mnt -it myudsoncan:latest bash

以降、このコンソール画面を コンソール① と呼ぶ

3.4 candumpを実行

CAN通信をモニタするためにコンソール①でcandumpを実行しておく。
CAN通信前なので、まだ何も表示されない。

# candump can0

4. UDS Clientをシミュレート

4.1 UDS Client のpythonスクリプトを作成

以下のpythonスクリプトを作業用ディレクトリに格納する。
ECUReset(0x11) サービスのリクエストを送信して、レスポンスを受信するスクリプトとなっている。

uds_client.py
uds_client.py
import can
from udsoncan.connections import PythonIsoTpConnection
from udsoncan.client import Client
import udsoncan.configs
import isotp

# Refer to isotp documentation for full details about parameters
isotp_params = {
 'stmin': 32,                            # Will request the sender to wait 32ms between consecutive frame. 0-127ms or 100-900ns with values from 0xF1-0xF9
 'blocksize': 8,                         # Request the sender to send 8 consecutives frames before sending a new flow control message
 'wftmax': 0,                            # Number of wait frame allowed before triggering an error
 'tx_data_length': 8,                    # Link layer (CAN layer) works with 8 byte payload (CAN 2.0)
 'tx_data_min_length': None,
 'tx_padding': 0,                        # Will pad all transmitted CAN messages with byte 0x00.
 'rx_flowcontrol_timeout': 1000,         # Triggers a timeout if a flow control is awaited for more than 1000 milliseconds
 'rx_consecutive_frame_timeout': 1000,   # Triggers a timeout if a consecutive frame is awaited for more than 1000 milliseconds
 'max_frame_size': 4095,                 # Limit the size of receive frame.
 'can_fd': False,                        # Does not set the can_fd flag on the output CAN messages
 'bitrate_switch': False,                # Does not set the bitrate_switch flag on the output CAN messages
 'rate_limit_enable': False,             # Disable the rate limiter
 'rate_limit_max_bitrate': 1000000,      # Ignored when rate_limit_enable=False. Sets the max bitrate when rate_limit_enable=True
 'rate_limit_window_size': 0.2,          # Ignored when rate_limit_enable=False. Sets the averaging window size for bitrate calculation when rate_limit_enable=True
 'listen_mode': False,                   # Does not use the listen_mode which prevent transmission.
}

uds_config = udsoncan.configs.default_client_config.copy()

bus = can.Bus(channel='can0', interface='socketcan')

notifier = can.Notifier(bus, [can.Printer()])                                       # Add a debug listener that print all messages
tp_addr = isotp.Address(isotp.AddressingMode.Normal_11bits, txid=0x123, rxid=0x456) # Network layer addressing scheme
stack = isotp.NotifierBasedCanStack(bus=bus, notifier=notifier, address=tp_addr, params=isotp_params)  # Network/Transport layer (IsoTP protocol). Register a new listenenr
conn = PythonIsoTpConnection(stack)                                                 # interface between Application and Transport layer
with Client(conn, config=uds_config) as client:                                     # Application layer (UDS protocol)
    try:
        client.ecu_reset(reset_type=1)
        print('Success!')
    except:
        print('Reset failed')

python-udsoncanのドキュメントで示されている以下2つのExampleを元にしている

4.2 UDS Client の実行

  1. candump実行中のコンソール①とは別にコンソール画面(コンソール②と呼ぶ)を起動
  2. コンソール②から以下コマンドでuds_client.pyを実行
    docker exec -it myudsoncan python uds_client.py
  3. クライアントからのリクエストが送信されるが、サーバからの応答が無いのでタイムアウトエラーとなる
コンソール①:candumpの結果
  can0  123   [8]  02 11 01 00 00 00 00 00

実際はloopbackしているので二重に表示される

コンソール②:uds_client.pyの結果
Timestamp: 1741431963.046044    ID:      123    S Rx                DL:  8    02 11 01 00 00 00 00 00     Channel: can0
[TimeoutException] : Did not receive response in time. P2 timeout time has expired (timeout=1.000 sec)
Reset failed

5. UDS Serverをシミュレート

5.1 UDS Server のpythonスクリプトを作成

以下のpythonスクリプトを作業用ディレクトリに格納する。
ECUReset(0x11) サービスのリクエストを受信したら、ポジティブレスポンスを送信するスクリプトとなっている。

uds_server.py
uds_server.py
import can
from udsoncan.connections import PythonIsoTpConnection
from udsoncan.client import Client
import udsoncan.configs
import isotp
from udsoncan import Request,Response
from udsoncan.services import ECUReset

# Refer to isotp documentation for full details about parameters
isotp_params = {
 'stmin': 32,                            # Will request the sender to wait 32ms between consecutive frame. 0-127ms or 100-900ns with values from 0xF1-0xF9
 'blocksize': 8,                         # Request the sender to send 8 consecutives frames before sending a new flow control message
 'wftmax': 0,                            # Number of wait frame allowed before triggering an error
 'tx_data_length': 8,                    # Link layer (CAN layer) works with 8 byte payload (CAN 2.0)
 'tx_data_min_length': None,
 'tx_padding': 0,                        # Will pad all transmitted CAN messages with byte 0x00.
 'rx_flowcontrol_timeout': 1000,         # Triggers a timeout if a flow control is awaited for more than 1000 milliseconds
 'rx_consecutive_frame_timeout': 1000,   # Triggers a timeout if a consecutive frame is awaited for more than 1000 milliseconds
 'max_frame_size': 4095,                 # Limit the size of receive frame.
 'can_fd': False,                        # Does not set the can_fd flag on the output CAN messages
 'bitrate_switch': False,                # Does not set the bitrate_switch flag on the output CAN messages
 'rate_limit_enable': False,             # Disable the rate limiter
 'rate_limit_max_bitrate': 1000000,      # Ignored when rate_limit_enable=False. Sets the max bitrate when rate_limit_enable=True
 'rate_limit_window_size': 0.2,          # Ignored when rate_limit_enable=False. Sets the averaging window size for bitrate calculation when rate_limit_enable=True
 'listen_mode': False,                   # Does not use the listen_mode which prevent transmission.
}

uds_config = udsoncan.configs.default_client_config.copy()

bus = can.Bus(channel='can0', interface='socketcan')

notifier = can.Notifier(bus, [can.Printer()])                                       # Add a debug listener that print all messages
# tp_addr = isotp.Address(isotp.AddressingMode.Normal_11bits, txid=0x123, rxid=0x456) # Network layer addressing scheme
tp_addr = isotp.Address(isotp.AddressingMode.Normal_11bits, txid=0x456, rxid=0x123) # txid,rxidを入れ換え
stack = isotp.NotifierBasedCanStack(bus=bus, notifier=notifier, address=tp_addr, params=isotp_params)  # Network/Transport layer (IsoTP protocol). Register a new listenenr
conn = PythonIsoTpConnection(stack)                                                 # interface between Application and Transport layer

# 受信待ち
conn.open()
payload = conn.wait_frame(timeout=None)

# リクエスト解析
request = Request.from_payload(payload)
if request.service and hasattr(request.service, "_sid"):
    sid = request.service._sid  # SID を取得

    # Reset要求にポジティブレスポンスを返す
    if sid == 0x11:
        response = Response(service=ECUReset, code=Response.Code.PositiveResponse, data=b'\x01')
        payload = response.get_payload()
        conn.send(payload)
    else:
        print('未対応サービスのリクエスト')
else:
    print('無効なリクエスト')

conn.close()

txid、rxidをクライアントの反対にしています

5.2 UDS Server の実行

  1. さらに別のコンソール画面(コンソール③と呼ぶ)を起動
  2. コンソール③から以下コマンドでuds_server.pyを実行
    docker exec -it myudsoncan python uds_server.py
  3. コンソール②から以下コマンドでuds_client.pyを実行
    docker exec -it myudsoncan python uds_client.py
  4. クライアントからリクエストが送信され、サーバからレスポンスが返る
コンソール①:candumpの結果
  can0  123   [8]  02 11 01 00 00 00 00 00
  can0  456   [8]  02 51 01 00 00 00 00 00

実際はloopbackしているので二重に表示される

コンソール②:uds_client.pyの結果
Timestamp: 1741437827.288421    ID:      123    S Rx                DL:  8    02 11 01 00 00 00 00 00     Channel: can0
Timestamp: 1741437827.293298    ID:      456    S Rx                DL:  8    02 51 01 00 00 00 00 00     Channel: can0
Timestamp: 1741437827.293432    ID:      456    S Tx                DL:  8    02 51 01 00 00 00 00 00     Channel: can0
Success!
コンソール③:uds_server.pyの結果
Timestamp: 1741437827.288421    ID:      123    S Rx                DL:  8    02 11 01 00 00 00 00 00     Channel: can0
Timestamp: 1741437827.288548    ID:      123    S Tx                DL:  8    02 11 01 00 00 00 00 00     Channel: can0
Timestamp: 1741437827.293298    ID:      456    S Rx                DL:  8    02 51 01 00 00 00 00 00     Channel: can0

以上です

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?