この記事は?
お仕事でCOBSエンコード・デコードを扱うことになったので、そのお勉強した記録です。
pythonで書いたサンプルコードがあります。
COBSの詳しい解説はしてないです。
COBSって?
主にRS232Cの通信で使われます。
RS232Cの通信はデータ垂れ流しなので、受信側はどこがデータの区切りか分かりません。
COBSでは、送信データに区切り値を付加することで、データの区切りが分かるようにしています。
「でも、その区切り値と同じ値が、送信データ内にもあった場合どするの?」と疑問が発生しますが、COBSエンコードでは、
「区切り値と同値のデータを、次に同値のデータが存在する位置に置き換える」
という符号化で解決しています。
復号は、その置き換えられた位置情報を元に行います。
考えた人頭いいですよね。
詳細な方式はこちらを見ると良いです。
https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing
データ区切り方法は他にもあるのでは?
一定時間、受信データの途絶で区切りを付ける方法もあると思いますが、お客さんから時間区切りは安定性に不安が残ると言われCOBSを使うことになりました。
pythonでの実装
上記のWikiのC言語のコードを参考にpythonでエンコード・デコードの処理を作成しました。
実行すれば、参考ページの「Encoding examples」と同じ結果が出力されると思います。
# 参考:
# https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing
def cobs_encode(raw_data):
overhead_and_tail = 1 + 1
# 通常、オーバーヘッドバイトはデータの先頭にのみ挿入される。
# しかし、254バイト連続で0x00が出現しない場合、新しくオーバーヘッドバイトが挿入される。
# このmarginは、データ内に一度も0x00が出現しなかった場合に増加するバイト数を示す。
# ※:オーバヘッドバイトには、次の0x00に置き換える位置、または、次のオーバヘッドバイトの位置が入る。
margin = int(len(raw_data) / 254)
# エンコードしたデータ格納先
enc_data = bytearray(len(raw_data) + overhead_and_tail + margin)
raw_idx = 0
length = len(raw_data)
enc_idx = 1 # エンコード配列の参照先(enc_data[0]は、オーバーヘッドバイトの格納先なので1から開始)
code_idx = 0 # code(次に0x00が出現する場所)を入れる位置を管理する用の変数
code = 1 # 次に0x00が出現する場所
while (length > 0):
if raw_data[raw_idx] != 0x00:
# データが0x00でなければ、値をそのままエンコード配列に入れる。
enc_data[enc_idx] = raw_data[raw_idx]
enc_idx += 1
# 次に0x00が出現する場所インクリメント
code += 1
if code == 0xff:
# 254バイト連続して0x00でなければ、0x00出現時と同じ処理をする。
# (codeの意味が0x00出現時とは異なるので注意)
enc_data[code_idx] = code # この場合のcodeは次のオーバヘッドバイトの位置を示す。
code = 1
code_idx = enc_idx #ここのインデックスの位置が次のオーバヘッドバイトになる。
if length > 1:
enc_idx += 1
else:
#最後のデータの場合、enc_idxをインクリメントするとデータ量が1多くなる。
pass
else:
# データが0x00であれば、
# code(次に0x00が出現する場所)を入れる位置(code_index)に、codeを入れて、
# code_indexを更新、及びcodeのリセットを行う。
enc_data[code_idx] = code
code = 1
code_idx = enc_idx
enc_idx += 1
raw_idx += 1
length -= 1
enc_data[code_idx] = code
enc_data[enc_idx] = 0x00
# +1 は終端バイト分
return enc_data[:enc_idx + 1]
def cobs_decode(enc_data):
dec_data = bytearray(len(enc_data)) # デコード結果格納先配列
enc_idx = 0
dec_idx = 0
next_0x00 = 0 # 次に0x00に置き換える位置
next_is_overhead = True # 次がオーバヘッドバイト
while enc_idx < len(enc_data):
if next_0x00 != 0:
# エンコード前が0x00ではないものは、
# そのままデコード結果格納先配列に格納する。
dec_data[dec_idx] = enc_data[enc_idx]
dec_idx += 1
enc_idx += 1
else:
if enc_data[enc_idx] == 0x00:
# 終端コード(0x00)発見時は処理を終了する。
# Note:
# 参考URLに記載されているコードでは、0x00に置き換え処理の後に、
# この処理をしていた。
# その場合、最後のループでdec_idxがインクリメントされてからループを抜ける
# ことになるので、置き換え処理で格納された0x00を含む、1バイト多い配列が返される。
break
if next_is_overhead == True:
# オーバヘッドバイトは置き換えをしない。
# (オーバヘッドバイトには置き換え前の値がない)
pass
else:
# エンコード前が0x00だったバイトを0x00に置き換える。
dec_data[dec_idx] = 0
dec_idx += 1
# 次の0x00に置き換える場所を取得する。
next_0x00 = enc_data[enc_idx]
enc_idx += 1
if next_0x00 == 0xff:
next_is_overhead = True
else:
next_is_overhead = False
next_0x00 -= 1
return dec_data[:dec_idx]
def test_enc_and_dec(case_num, src):
print('test_enc_and_dec:case_num', case_num)
enc = cobs_encode(src)
dec = cobs_decode(enc)
if len(src) < 10:
print('src:', [f'{i:02x}' for i in src])
print('enc:', [f'{i:02x}' for i in enc])
print('dec:', [f'{i:02x}' for i in dec])
else:
print('src:', [f'{i:02x}' for i in src[:5]] , '...', [f'{i:02x}' for i in src[-5:]])
print('enc:', [f'{i:02x}' for i in enc[:5]] , '...', [f'{i:02x}' for i in enc[-5:]])
print('dec:', [f'{i:02x}' for i in dec[:5]] , '...', [f'{i:02x}' for i in dec[-5:]])
print('src == enc:', bytearray(src) == dec)
print('')
test_enc_and_dec(1, [0x00])
test_enc_and_dec(2, [0x00, 0x00])
test_enc_and_dec(3, [0x00, 0x11, 0x00])
test_enc_and_dec(4, [0x11, 0x22, 0x00, 0x33])
test_enc_and_dec(5, [0x11, 0x22, 0x33, 0x44])
test_enc_and_dec(6, [0x11, 0x00, 0x00, 0x00])
test_enc_and_dec(7, [i + 1 for i in range(254)])
test_enc_and_dec(8, [i + 0 for i in range(255)])
test_enc_and_dec(9, [i + 1 for i in range(255)])
test_enc_and_dec(10, [i + 2 for i in range(254)] + [0])
test_enc_and_dec(11, [i + 3 for i in range(253)] + [0, 1])
あとがき
参考ページのコード、多項式や条件式の中で後置インクリメントが多発していたので解読に苦労しました。
昔は処理の高速化と、プログラム容量削減のために。こういうコードが主流だったんでしょうか。
式中のインクリメントはバグの温床なので、やめて欲しいですなー(´・ω・`)
以上