Created
June 23, 2025 00:53
-
-
Save 0x77dev/9fa613bb15a3aaccf40f89539abec2ea to your computer and use it in GitHub Desktop.
Livox LiDAR Protocol Wireshark plugin
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| -- Livox Mid-40 Protocol Dissector | |
| -- Based on Livox SDK Communication Protocol v1.3.0 | |
| -- Compatible with older Wireshark versions | |
| local livox_proto = Proto("livox", "Livox Mid-40 Protocol") | |
| -- Protocol fields | |
| local fields = livox_proto.fields | |
| fields.sof = ProtoField.uint8("livox.sof", "Start of Frame", base.HEX) | |
| fields.version = ProtoField.uint8("livox.version", "Protocol Version", base.DEC) | |
| fields.length = ProtoField.uint16("livox.length", "Frame Length", base.DEC) | |
| fields.cmd_type = ProtoField.uint8("livox.cmd_type", "Command Type", base.HEX, { | |
| [0x00] = "CMD (Request)", | |
| [0x01] = "ACK (Response)", | |
| [0x02] = "MSG (Message)" | |
| }) | |
| fields.seq_num = ProtoField.uint16("livox.seq_num", "Sequence Number", base.DEC) | |
| fields.crc16 = ProtoField.uint16("livox.crc16", "Header CRC16", base.HEX) | |
| fields.data = ProtoField.bytes("livox.data", "Data") | |
| fields.crc32 = ProtoField.uint32("livox.crc32", "Frame CRC32", base.HEX) | |
| -- Command data fields | |
| fields.cmd_set = ProtoField.uint8("livox.cmd_set", "Command Set", base.HEX, { | |
| [0x00] = "General Command Set", | |
| [0x01] = "LiDAR Command Set", | |
| [0x02] = "Hub Command Set" | |
| }) | |
| fields.cmd_id = ProtoField.uint8("livox.cmd_id", "Command ID", base.HEX) | |
| fields.cmd_data = ProtoField.bytes("livox.cmd_data", "Command Data") | |
| -- Point cloud data fields | |
| fields.pc_version = ProtoField.uint8("livox.pc.version", "Packet Version", base.DEC) | |
| fields.slot_id = ProtoField.uint8("livox.pc.slot_id", "Slot ID", base.DEC) | |
| fields.lidar_id = ProtoField.uint8("livox.pc.lidar_id", "LiDAR ID", base.DEC, { | |
| [1] = "Mid-100 Left / Mid-40 / Tele-15 / Horizon", | |
| [2] = "Mid-100 Middle", | |
| [3] = "Mid-100 Right" | |
| }) | |
| fields.reserved = ProtoField.uint8("livox.pc.reserved", "Reserved", base.HEX) | |
| fields.status_code = ProtoField.uint32("livox.pc.status_code", "Status Code", base.HEX) | |
| fields.timestamp_type = ProtoField.uint8("livox.pc.timestamp_type", "Timestamp Type", base.DEC, { | |
| [0] = "No sync source", | |
| [1] = "PTP", | |
| [3] = "GPS", | |
| [4] = "PPS" | |
| }) | |
| fields.data_type = ProtoField.uint8("livox.pc.data_type", "Data Type", base.DEC, { | |
| [0] = "Cartesian Coordinate", | |
| [1] = "Spherical Coordinate", | |
| [6] = "IMU Data" | |
| }) | |
| fields.timestamp = ProtoField.uint64("livox.pc.timestamp", "Timestamp", base.DEC) | |
| fields.point_data = ProtoField.bytes("livox.pc.point_data", "Point Cloud Data") | |
| -- Broadcast message fields | |
| fields.broadcast_code = ProtoField.string("livox.broadcast_code", "Broadcast Code") | |
| fields.dev_type = ProtoField.uint8("livox.dev_type", "Device Type", base.DEC, { | |
| [0] = "Livox Hub", | |
| [1] = "Mid-40", | |
| [2] = "Tele-15", | |
| [3] = "Horizon", | |
| [6] = "Mid-70", | |
| [7] = "Avia" | |
| }) | |
| -- Status code breakdown | |
| fields.temp_status = ProtoField.uint8("livox.status.temp", "Temperature Status", base.DEC, { | |
| [0] = "Normal", | |
| [1] = "High or Low", | |
| [2] = "Extremely High or Low" | |
| }) | |
| fields.volt_status = ProtoField.uint8("livox.status.volt", "Voltage Status", base.DEC, { | |
| [0] = "Normal", | |
| [1] = "High", | |
| [2] = "Extremely High" | |
| }) | |
| fields.motor_status = ProtoField.uint8("livox.status.motor", "Motor Status", base.DEC, { | |
| [0] = "Normal", | |
| [1] = "Warning", | |
| [2] = "Error" | |
| }) | |
| function livox_proto.dissector(buffer, pinfo, tree) | |
| local length = buffer:len() | |
| if length == 0 then return end | |
| pinfo.cols.protocol = livox_proto.name | |
| local subtree = tree:add(livox_proto, buffer(), "Livox Protocol Data") | |
| -- Check if this is a control command frame (starts with 0xAA) | |
| if length >= 11 and buffer(0,1):uint() == 0xAA then | |
| dissect_control_frame(buffer, pinfo, subtree) | |
| -- Check if this is point cloud data (starts with version 5) | |
| elseif length >= 18 and buffer(0,1):uint() == 0x05 then | |
| dissect_point_cloud(buffer, pinfo, subtree) | |
| else | |
| subtree:add(fields.data, buffer()) | |
| pinfo.cols.info = "Unknown Livox Data" | |
| end | |
| end | |
| function dissect_control_frame(buffer, pinfo, subtree) | |
| local length = buffer:len() | |
| -- Header fields | |
| subtree:add_le(fields.sof, buffer(0,1)) | |
| subtree:add_le(fields.version, buffer(1,1)) | |
| local frame_length = buffer(2,2):le_uint() | |
| subtree:add_le(fields.length, buffer(2,2)) | |
| subtree:add_le(fields.cmd_type, buffer(4,1)) | |
| subtree:add_le(fields.seq_num, buffer(5,2)) | |
| subtree:add_le(fields.crc16, buffer(7,2)) | |
| -- Data section | |
| if length > 9 then | |
| local data_length = length - 13 -- Total - header - crc32 | |
| if data_length > 0 then | |
| local data_tree = subtree:add(fields.data, buffer(9, data_length)) | |
| dissect_command_data(buffer(9, data_length), pinfo, data_tree) | |
| end | |
| end | |
| -- CRC32 | |
| if length >= 13 then | |
| subtree:add_le(fields.crc32, buffer(length-4, 4)) | |
| end | |
| local cmd_type = buffer(4,1):uint() | |
| local cmd_type_str = "" | |
| if cmd_type == 0x00 then cmd_type_str = "CMD" | |
| elseif cmd_type == 0x01 then cmd_type_str = "ACK" | |
| elseif cmd_type == 0x02 then cmd_type_str = "MSG" | |
| end | |
| pinfo.cols.info = "Livox Control Frame [" .. cmd_type_str .. "]" | |
| end | |
| function dissect_command_data(buffer, pinfo, tree) | |
| local length = buffer:len() | |
| if length < 2 then return end | |
| local cmd_set = buffer(0,1):uint() | |
| local cmd_id = buffer(1,1):uint() | |
| tree:add_le(fields.cmd_set, buffer(0,1)) | |
| tree:add_le(fields.cmd_id, buffer(1,1)) | |
| if length > 2 then | |
| local cmd_data = buffer(2, length-2) | |
| local data_tree = tree:add(fields.cmd_data, cmd_data) | |
| -- Parse specific commands | |
| if cmd_set == 0x00 and cmd_id == 0x00 then | |
| -- Broadcast message | |
| if cmd_data:len() >= 19 then | |
| data_tree:add(fields.broadcast_code, cmd_data(0,16)) | |
| data_tree:add_le(fields.dev_type, cmd_data(16,1)) | |
| end | |
| pinfo.cols.info = "Livox Broadcast Message" | |
| elseif cmd_set == 0x00 and cmd_id == 0x01 then | |
| pinfo.cols.info = "Livox Handshake" | |
| elseif cmd_set == 0x00 and cmd_id == 0x03 then | |
| pinfo.cols.info = "Livox Heartbeat" | |
| elseif cmd_set == 0x00 and cmd_id == 0x04 then | |
| pinfo.cols.info = "Livox Start/Stop Sampling" | |
| else | |
| pinfo.cols.info = "Livox Command [" .. string.format("0x%02x:0x%02x", cmd_set, cmd_id) .. "]" | |
| end | |
| end | |
| end | |
| function dissect_point_cloud(buffer, pinfo, subtree) | |
| local length = buffer:len() | |
| if length < 18 then return end | |
| subtree:add_le(fields.pc_version, buffer(0,1)) | |
| subtree:add_le(fields.slot_id, buffer(1,1)) | |
| subtree:add_le(fields.lidar_id, buffer(2,1)) | |
| subtree:add_le(fields.reserved, buffer(3,1)) | |
| -- Status code with breakdown (check if bit operations are available) | |
| local status_code = buffer(4,4):le_uint() | |
| local status_tree = subtree:add_le(fields.status_code, buffer(4,4)) | |
| -- Try to break down status code bits (may not work on all Wireshark versions) | |
| local success, temp_status = pcall(function() return bit.band(status_code, 0x03) end) | |
| if success then | |
| local volt_status = bit.band(bit.rshift(status_code, 2), 0x03) | |
| local motor_status = bit.band(bit.rshift(status_code, 4), 0x03) | |
| status_tree:add(fields.temp_status, temp_status) | |
| status_tree:add(fields.volt_status, volt_status) | |
| status_tree:add(fields.motor_status, motor_status) | |
| end | |
| subtree:add_le(fields.timestamp_type, buffer(8,1)) | |
| subtree:add_le(fields.data_type, buffer(9,1)) | |
| subtree:add_le(fields.timestamp, buffer(10,8)) | |
| -- Point cloud data | |
| if length > 18 then | |
| local data_type = buffer(9,1):uint() | |
| local point_tree = subtree:add(fields.point_data, buffer(18, length-18)) | |
| local type_str = "" | |
| if data_type == 0 then type_str = "Cartesian" | |
| elseif data_type == 1 then type_str = "Spherical" | |
| elseif data_type == 6 then type_str = "IMU" | |
| else type_str = "Unknown" | |
| end | |
| pinfo.cols.info = "Livox Point Cloud [" .. type_str .. "]" | |
| end | |
| end | |
| -- Register the protocol on standard ports | |
| local udp_port = DissectorTable.get("udp.port") | |
| udp_port:add(55000, livox_proto) -- Broadcast port | |
| udp_port:add(65000, livox_proto) -- Control port | |
| -- Also register on common data ports (you can add more as needed) | |
| udp_port:add(56000, livox_proto) | |
| udp_port:add(57000, livox_proto) | |
| udp_port:add(58000, livox_proto) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment