モチベーション
LiDARのViewer開いてcsvファイルをいちいち作るのが面倒なのでLiDARから取得した点群の情報を一気に解析したい!
前提知識
LiDARから送られるデータについて
LiDARからは取得したある範囲のデータがバイナリデータとして敷き詰められてそれが1パケットとしてUDPでPCに設定されたIPに送られてきます。
このパケットが1フレーム間で何個も送られてくることで全体の点群を構成しています。
基本的な解析方法
- LiDARのパケットデータの構造が書いてあるデータシートを見つけてくる
- データ通りに1パケットの情報を解析したコードを書く
- while分で回してあげる
具体的に考えてみる
実際にパケットファイルからPythonでデータを取得して、点群表示するところまでやってみます。
解析する対象
Livox HAP
座標がそのまま取り出せるため本題であるバイナリを読む処理のみに集中したコードが書けるため
パケット解析
livox texh社が公開しているlvx2ファイルのパケット形式が下のPDFファイルに示されています
https://terra-1-g.djicdn.com/65c028cd298f4669a7f0e40e50ba1131/LVX2%20Specifications.pdf
パケットの構成は
- public header block
- private header block
- device info block
- point data block
今回の目的は点群情報を取得することなので名前的にpoint data blockからデータをとってくる感じっぽいです
では実際にpoint cloud blockを見てみるとN+1個のフレームに分かれていて、このフレームは1つのFrame HeaderかM+1個のPackageに分かれています。そのパッケージの中にPointデータがP+1個入っています。このPointデータにx, y, z(2 Bytes), reflectivity(1 bytes), tag(1bytes)が入っています。(PDF内ではすべてNと表現されていますが、すべて違う値を示しています。)
図にしたものが下の図です(手書きで見ずらいですが)
よってこれらの文字の値L, Nを求める必要があります。これらの数もPDFファイルに書かれています。
L = 最初から29個目のバイナリ
N = NextOffset - Current Offset
M = 不明
P = 96(Livox HAP は 1frame にパッケージ存在とPDFの一番下に記載)
よって処理としては以下のようになります
- 各フレームのCurrent Offset, Next Offsetを取得
- 各フレームのFrame Headerを飛ばして1package分である96個分の点群情報をloop処理して取得
コードを書く
あとは上の処理をコードに書くだけです
import open3d as o3d
import numpy as np
def livoxFileReadHook(ReadFileName):
# Input: Read FileName
# Output: frameInfo[frameBegin, frameEnd]
t = 0
points = []
deg = []
lidar_pos = []
rf = open(ReadFileName, 'rb')
Idx = 29
d = rf.read()
FSIZE = len(d)
DeviceCount = d[Idx]
print("size", FSIZE)
# RETURN
frameInfo = []
for i in range(DeviceCount):
Idx += 63
print("Device", DeviceCount)
print("Fsize", FSIZE)
# end = 0
while (Idx < FSIZE) :
# Next Offset
nxt = int.from_bytes(d[Idx+8:(Idx+16)],'little')
print("Idx", Idx, "cframe", int.from_bytes(d[Idx:(Idx+8)],'little'))
frameInfo.append([int.from_bytes(d[Idx:(Idx+8)],'little'), int.from_bytes(d[Idx+8:(Idx+16)],'little')])
print("next offset", nxt)
Idx = int.from_bytes(d[Idx+8:(Idx+16)],'little')
t += 1
frameInfo = np.array(frameInfo)
return frameInfo
def livoxFileReadPoint(d, frame):
# Input: Read File NameReadFil, eName
# Output: [y,z,x,distance(r),Lidar ID]
points = []
# print("current frame", frame[0])
print("current frame", frame[0])
pointsPerFrame = []
Idx = frame[0] + 24
while Idx < frame[1] :
dtype = d[Idx + 17]
Idx = Idx + 27
if dtype==1:
# print("dtype",1)
for i in range(96):
B2D_X = int.from_bytes(d[Idx:Idx+4],'little', signed=True)
B2D_Y = int.from_bytes(d[Idx+4:Idx+8],'little', signed=True)
B2D_Z = int.from_bytes(d[Idx+8:Idx+12],'little', signed=True)
# print(B2D_X)
point_tmp = ([B2D_X,
B2D_Y,
B2D_Z,
d[Idx+12]
])
# print(B2D_X, B2D_Y, B2D_X)
# print(len(point_tmp))
pointsPerFrame.append(point_tmp)
# print("point_tmp", point_tmp)
Idx = Idx + 14
elif dtype==2:
print("dtype",2)
points.append(pointsPerFrame)
return pointsPerFrame
def main:
frame = livoxFileReadHook("../data/2023-06-12_18-19-16.lvx2")
vis = o3d.visualization.Visualizer()
vis.create_window()
scale = 10
line = np.linspace(0, scale, scale+1)
points = []
for x in line:
for y in line:
for z in line:
points.append([x,y,z])
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(np.array(points))
vis.add_geometry(pcd)
rf = open("../data/2023-06-12_18-19-16.lvx2", 'rb')
d = rf.read()
for f in frame:
# time.sleep(time_num)
pcd.points = o3d.utility.Vector3dVector(
np.array(livoxFileReadPoint(d, f))[:, :3]
)
vis.update_geometry(pcd)
vis.poll_events()
vis.update_renderer()
rf.close
vis.destroy_window()
終わりに
LiDARの生データを取得できるのは快適でよいですね
つたない文章ですが読んでいただきありがとうございました。