使用したもの
- pycharm
- python3.8
- pyserial
- Wi-SUNモジュール J11
UART IFコマンドとは
モジュールをシリアル通信を使って制御するためのコマンドのことです。
J11との接続
J11とは以下のパラメータで接続します。
項目 | 値 |
---|---|
ボーレート | 115,200bps |
データビット長 | 8bit |
パリティ | なし |
ストップビット | なし |
フロー制御 | Disable(可変) |
今回はpyserialで通信するのでこのように書きます。
import serial
ser = serial.Serial(PORT, BAUD, timeout=0.1)
Pingを送信する
とりあえずpingを送信することが目標ですが、ただそれだけしかできないプログラムを作っても意味がないので、任意のコマンドを発行できるプログラムを作成しました。
一連の流れ
現在Wi-SUNが二台しかないので、今回は中継機を挟まずに、PANコーディネータ、エンドデバイス間でPingを送信します。
PANコーディネータ(親機)をA、エンドデバイス(子機)をCと呼ぶことにします。
A、Cともに流れは共通ですが、各コマンドのパラメータが異なります。
- ハードウェアリセット要求する
- 初期設定要求する
- HAN 動作開始要求する
- HAN PANA開始要求する
- HAN PANA認証情報設定要求する
- HAN 受入れ接続モード切り替え要求する
PANAとは
PANAとは、Protocol for Carrying Authentication for Network Accessの略です。
PANAはUDP/IP上でEAPメッセージを運ぶ、クライアント・サーバ型の認証プロトコルです。
PANAのクライアントはPaC(PANA Client)、サーバはPAA(PANA Authentication Agent)と呼ばれます。
任意のUART IFコマンドを発行するために
PDFの11ページに記載されているように、J11のコマンドはヘッダ部は12バイトで固定長、データ部は最大1361バイトまでの可変長です。
コマンドのフォーマット
こちらも同様に上記PDFの11ページに記載があります。
コマンドの種類
J11のUART IFコマンドには、要求コマンド、応答コマンド、通知コマンドの3つがあります。
基本的にユーザ側からモジュールに送信するのは(当然ですが)要求コマンドのみになります。
また、各コマンド間には一部の例外を除いて規則性があります。
例えば、モジュールのステータスを取得するコマンド(0x0001
)の応答コマンドは0x2001
といったように、
ある要求コマンドに対する応答コマンドは、要求コマンドの値に16進数で2000を足したものになります。
同様に、ある要求コマンドに対応する通知コマンドが存在する場合は、要求コマンドの値に16進数で6000を足したものになります。
メッセージ長の計算
ヘッダ部チェックサムを計算するためにはメッセージ長を求める必要があります。
メッセージ長はヘッダ部チェックサム、データ部チェックサム、データの合計「長」です。
つまりコマンド以外に何もデータを送信しない場合のメッセージ長は4となります(つまり0x0004
)。
def msg_len(data):
data_len = len(data)
msg = 4 + data_len
result = msg.to_bytes(2, byteorder='big')
return result
ヘッダ部チェックサムの計算
ヘッダ部チェックサムはユニークコード、コマンドコード、メッセージ長の値をすべて足し合わせたものです。
この関数で引数として与えているものはすべてbytearrayです。
sum()で配列の中身をすべて足し、足し合わせていく際に0xFFFF
(10進数で65535)を超えた場合は
桁あふれを無視する必要があるので、
3行目でチェックサムの値が0xFFFF
を超えなくなるまで引くようにしています。
def head_checksum(unique, cmd, msg):
head_sum = sum(unique) + sum(cmd) + sum(msg)
while head_sum >= 65535:
head_sum = head_sum - 65535
result = head_sum.to_bytes(2, byteorder='big')
return result
データ部チェックサムの計算
データ部チェックサムはデータをすべて足し合わせたものです。
データは送信しない場合があるのでデータがない場合は0x0000
を返すようにしています。
ヘッダ部チェックサム同様桁あふれを無視するようにしています。
この関数もヘッダ部チェックサム同様引数はbytearrayです。
def data_checksum(data):
if data is not None:
data_sum = sum(data)
while data_sum >= 65535:
data_sum = data_sum - 65535
result = data_sum.to_bytes(2, byteorder='big')
else:
result = b'\x00\x00'
return result
パケットの構築
ヘッダ部チェックサム、メッセージ長、データ部チェックサムが揃ったのでコマンドのパケットを構築します。
この関数の引数は、unq_typeがJ11のどの種類のコマンドを作るのか(Requestしかありませんが)を指定するもので、
cmdはコマンドコード、dataはデータをそれぞれbytearrayで表現したものです。
def build_packet(unq_type, cmd, data):
unique = bytearray()
if unq_type == 'Request':
unique.extend([0xD0, 0xEA, 0x83, 0xFC])
elif unq_type == 'Response':
unique.extend([0xD0, 0xF9, 0xEE, 0x5D])
elif unq_type == 'Notify':
unique.extend([0xD0, 0xF9, 0xEE, 0x5D])
if data is not None:
packet = unique + cmd + msg_len(data) + head_checksum(unique, cmd, msg_len(data)) + data_checksum(data) + data
else:
packet = unique + cmd + msg_len('') + head_checksum(unique, cmd, msg_len('')) + data_checksum(None)
print('Sending Packet :' + ' '.join(pkt_parser(packet)).upper())
return packet
もしペイロードがなければ、ユニークコード、コマンドコード、メッセージ長、ヘッダ部チェックサム、
データ部チェックサムの順番で結合したものをパケットとし、
ペイロードがある場合はその後ろにデータを結合したものをパケットとして返します。
コマンドの送信
上の関数でパケットを構築したのでモジュールにコマンドを送信します。
この関数の引数は、cmdが辞書にあるコマンドの名前、dataはペイロードをbytarrayで表現したものです。
def send_cmd(cmd, data):
ser.write(build_packet('Request', Request_cmd[cmd], data))
receive = pkt_parser(ser.readline())
rcv_cmd = receive[4] + receive[5]
if rcv_cmd == '{:04x}'.format(8192 + sum(Request_cmd[cmd])):
print('Response Cmd :' + rcv_cmd)
if len(receive) >= 12:
print('Receive Data :' + ' '.join(receive[12:]).upper())
elif rcv_cmd == '{:04x}'.format(24576 + sum(Request_cmd[cmd])):
print('Notify Cmd :' + rcv_cmd)
if len(receive) >= 12:
print('Receive Data :' + ' '.join(receive[12:]).upper())
return 0
モジュールにコマンドを書き込んだあと、受信したパケットを後述する解析関数に渡しています。
上のコマンドの種類で説明したように、要求コマンドに対しての応答もしくは通知は一意に定まるので、
モジュールから正しいコマンドが返ってきているか確認し、正しければ返ってきたコマンドと、
データがある場合はそのデータを表示します。
(判定部分はもっといい方法がありそうです)
パケットを解析する
モジュールから返ってくるコマンドはすべてバイナリで返ってくるので若干可読性に欠けます。
なので返ってきたコマンドを読みやすくします。
def pkt_parser(rcv_data):
result = re.split('(..)', rcv_data.hex())[1::2]
return result
もっとコマンドを送信しやすくする
send_cmdでも手動でパケットを組み立てるよりはだいぶ使いやすくなっていますが、
ペイロードをbytearrayで渡す必要があります。
それだと長いペイロードを送信する際に大変なので、ペイロードもバイナリの文字列で渡せるようにする(b'\x00\x01'
を0001
と書ける)ために次の関数を作りました。
def user_cmd(cmd, data):
byte_data = bytearray()
array_data = re.split('(..)', data)[1::2]
for i in range(len(array_data)):
byte_data.append(int(array_data[i], 16))
send_cmd(cmd, byte_data)
文字列で受け取ったペイロードを2文字づつに分割し、それをバイナリに変換しています。
モジュールのセットアップ
とても長くなってしまいましたが、ようやくモジュールのセットアップをします。
def device_setup(mode):
print('Device Setup Start')
hard_reset()
# 機器のモード設定
print('Starting Initialization...')
if mode == 'a':
send_cmd('InitSetting', b'\x01\x00\x10\x00')
elif mode == 'b':
send_cmd('InitSetting', b'\x02\x00\x10\x00')
elif mode == 'c':
send_cmd('InitSetting', b'\x03\x00\x10\x00')
print('Initialization Complete')
# HAN 動作開始
print('Starting HAN Action...')
if mode == 'a':
han_action(b'\xCA\xFE')
elif mode == 'b':
han_action(b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF')
elif mode == 'c':
han_action(b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF')
# PANA認証開始
if mode == 'a':
# 機器Aでは、PANA認証を開始してから認証情報を入力する
print('Starting HAN PANA...')
send_cmd('PANAStart', None)
print('HAN PANA Started')
print('Starting HAN PANA AUTH...')
mac = input('Enter MAC Address:')
pana_pass = input('Enter PANA Password:')
pana_auth(mac, pana_pass)
else:
# 機器B,Cでは、認証情報を入力してからPANA認証を開始する
# また、PANコーディネータ以外はパスワードのみをWi-SUNに送信する
print('Starting HAN PANA AUTH...')
# pana_pass = input('Enter PANA Password:')
pana_auth('', '0123456789abcdef')
print('Starting HAN PANA...')
send_cmd('PANAStart', None)
time.sleep(1)
print('Request PANA Auth to PAN Coordinator...')
rcv = notify_cmd()
if rcv[12] == '01':
print('Auth Dest :' + str(rcv[13:]))
else:
print('HAN PANA Start Failed :' + rcv[12])
# エンドデバイスではモード切替はできないのでスキップする
if mode == 'a' or mode == 'b':
print('HAN Connect mode changing...')
mode = input('Select han mode(init mode: 01 norm mode: 02):')
raw_mode = int(mode, 16)
send_cmd('ChangeAcceptanceConnectMode', raw_mode.to_bytes(1, byteorder='big'))
print('-------------------------')
print('Receive Data: 01 HAN Changed Selected Mode')
print('Receive Data: 20 HAN Already Selected Mode')
print('-------------------------')
print('Device Setup Completed!!')
else:
print('Device Setup Completed!!')
まだ紹介していない関数が多数登場しています...
その紹介に移ります
ハードウェアリセット
def hard_reset():
print('Hardware Resetting...')
ser.write(build_packet('Request', Request_cmd['HardReset'], None))
time.sleep(1)
check = pkt_parser(ser.readline())
if check[4] + check[5] == '6019':
print('Hardware Reset Complete')
else:
print('Got Error... Error Code :' + check[12])
HAN 動作開始
def han_action(parameter):
ser.write(build_packet('Request', Request_cmd['ActionStart'], parameter))
time.sleep(3)
receive1 = pkt_parser(ser.readline())
receive2 = pkt_parser(ser.readline())
receive = receive1 + receive2
print(receive)
if receive[4] + receive[5] == '200a' and receive[12] == '01':
print('HAN Action Started')
print('------------------')
print('Channel :' + str(int(receive[13], 16)))
print('------------------')
else:
print('Got Error... Error Code :' + receive[12])
return 1
return 0
PANA 認証情報設定
def pana_auth(mac, pana_pass):
byte_mac = bytearray()
byte_pass = bytearray()
array_mac = re.split('(..)', mac)[1::2]
array_pass = re.split('(.)', pana_pass)[1::2]
for i in range(len(array_mac)):
byte_mac.append(int(array_mac[i], 16))
for i in range(len(array_pass)):
byte_pass.append(ord(array_pass[i]))
payload = byte_mac + byte_pass
ser.write(build_packet('Request', Request_cmd['PANAAuthInfoSetting'], payload))
receive = pkt_parser(ser.readline())
if receive[4] + receive[5] == '202c' and receive[12] == '01':
print('PANA Auth Setting Complete')
else:
print('Got Error... Error Code :' + receive[12])
return 1
return 0
HAN 受入れ接続モード切り替え
親機の受入れモードは、子機と接続するために最初は初期接続モードである必要があります。
受入れ接続モードの切り替えに関してはsend_cmdで、モジュールに対して切り替え要求を行っています。
print('HAN Connect mode changing...')
mode = input('Select han mode(init mode: 01 norm mode: 02):')
raw_mode = int(mode, 16)
send_cmd('ChangeAcceptanceConnectMode', raw_mode.to_bytes(1, byteorder='big'))
print('-------------------------')
print('Receive Data: 01 HAN Changed Selected Mode')
print('Receive Data: 20 HAN Already Selected Mode')
print('-------------------------')
送受信する
def main():
print('Wi-SUN Port is :' + PORT + '\n')
mode = input('Enter Machine Mode :')
device_setup(mode)
while True:
send = input('Enter Cmd Name :')
data = input('If you have a payload. pls enter :')
user_cmd(send, data)
time.sleep(2)
while ser.in_waiting:
notify_cmd()
デバイスのセットアップが完了すると、任意のコマンドを入力できるようになります。
通知コマンドが返ってくる場合があるので、2秒待ってからシリアルのバッファにまだ読んでいないデータがある場合はnotify_cmdを呼び、通知コマンドの中身を表示します。
Pingの送信
前置きが恐ろしいほど長くなってしまいましたが、これでようやくpingを送信する準備が整いました。
シリアルポートを指定し、A側のプログラム、C側のプログラムに分けて実行します。
Device Setup Completed!!
Enter Cmd Name :
と表示されたら、コマンドの名前を入力します。
今回はPingを飛ばしたいのでSendPing
と入力します。
すると
Enter Cmd Name :SendPing
If you have a payload. pls enter :
と出るのでペイロードを入力します。
Ping送信のペイロードは、送信先のipv6アドレス、送信するデータのサイズ、送信するデータの形式、送信するデータです。
送信先のipv6アドレスは、FE80000000000000XXXXXXXXXXXXXXXX
(Xは送信先のMACアドレス、MACアドレスの最初の1バイトの下位2bit目は反転する)
例えば、001D129F00000111
というMACアドレスだった場合、送信先のipv6アドレスはFE80000000000000021D129F00000111
となります。
送信するデータのサイズは、文字通り送るデータの大きさを指定します。今回は0010
(16byte)にしました。
送信するデータの形式ですが、このモジュールでは、最初からasciiコードの'a'から'z'を繰り返すデータパターンと、asciiコードの0001
からインクリメントするデータパターンが用意されています。
今回は一番目のパターンを使用するので、0001
となります。
送信するデータですが、この部分はユーザが任意のデータを送信する際に使用するので今回は必要ありません。
つまり
If you have a payload. pls enter :
に入力するのは
FE80000000000000021S129F00000111001001
となります。
これを入力すると、まず送信元のモジュールへシリアル通信で送信したパケットが表示されます。
その後、pingを送信したことを示す"20d1"が返ってきます。
そして送信先から応答があると、通知コマンドとして結果が返ってきます。
参考文献
- J11_UART_IFコマンド仕様書_第1.1版
- TeraTermサンプルマクロ md5sum:2552cc1eb976c21de419fe3a305835a3