Created
February 6, 2025 15:23
-
-
Save wizche/b1ce1cba33d91f48a83baab9f816d05b to your computer and use it in GitHub Desktop.
BACnet loves scapy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from scapy.all import * | |
from scapy.layers.inet import IP, UDP | |
from enum import Enum | |
import logging | |
logging.getLogger("scapy").setLevel(logging.DEBUG) | |
BACNET_PORT = 47808 | |
def bindLayers(): | |
bind_layers(UDP, BVLC, sport=BACNET_PORT) | |
bind_layers(UDP, BVLC, dport=BACNET_PORT) | |
bind_layers(BVLC, NPDU) | |
bind_layers(NPDU, APDU) | |
class BVLCFunction(Enum): | |
RESULT = 0 | |
WRITE_BDT = 1 | |
READ_BDT = 2 | |
READ_BDT_ACK = 3 | |
FORWARDED_NPDU = 4 | |
REGISTER_FD = 5 | |
READ_FDT = 6 | |
READ_FDT_ACK= 7 | |
DEL_FDT_ENTRY = 8 | |
DISTRIBUTE_BROADCAST_TO_NETWORK = 9 | |
ORIGINAL_UNICAST_NPDU = 10 | |
ORIGINAL_BROADCAST_NPDU = 11 | |
class BVLC(Packet): | |
name = 'BVLC' | |
fields_desc = [ | |
XByteField('type', 0x81), | |
ByteEnumField('function', 0, BVLCFunction), | |
ShortField('length', None)] | |
# From: `21 FORMAL DESCRIPTION OF APPLICATION PROTOCOL DATA UNITS` | |
_bacnet_apdu_type = {1: "BACnet-Unconfirmed-Request-PDU"} | |
# From: `BACnetUnconfirmedServiceChoice ::= ENUMERATED {` | |
_bacnet_unconfirmed_service_choice = {8: "who-Is", 0: "i-am"} | |
class BACnetTagLen(BitFieldLenField): | |
def getfield(self, pkt, s): | |
if not isinstance(s, tuple): | |
warning("BACnetTagLengetfield.getfield() - not aligned correctly!") | |
return None, None | |
# Check the length encoding | |
s, bn = s | |
v = s[0] & 0b111 | |
#print(v & 0b1000, bin(v), s, s[0] & 0b111) | |
if (v & 0b1000) == 0: | |
# 1 byte value | |
return s[1:], s[0] & 0b111 | |
elif (v & 0b101) == 0b101: | |
# the length of the value is encoded in the next byte (i.e. s[1]) | |
return s[2:], s[1] | |
else: | |
warning("BACnetTagLen.getfield() - length encoding not supported") | |
return None, None | |
def addfield(self, pkt, s, val): | |
fld, fval = pkt.getfield_and_val(self.length_of) | |
raw_fval = fld.i2m(pkt, fval) | |
len_raw_fval = len(raw_fval) | |
if len_raw_fval == 1: | |
_len = orb(s[2] << 5) | |
return s[0] + struct.pack("!B", _len) | |
elif len_raw_fval > 1 and len_raw_fval < 256: | |
_tag_number_class = orb((s[2] << 3) + 0b101) | |
_len = orb(len_raw_fval) | |
return s[0] + struct.pack("!B", _tag_number_class) + struct.pack("!B", _len) | |
warning.warn("BACnetTagLen.addfield() - length encoding not supported") | |
return b"" | |
class BACnetTag(Packet): | |
name = "BACnetTag" | |
fields_desc = [BitField("tag_number", 0, 4), | |
BitField("tag_class", 0, 1), | |
BACnetTagLen("tag_length", 1, 3, length_of="tag_value"), | |
XStrLenField("tag_value", b"", length_from=lambda p: p.tag_length)] | |
def guess_payload_class(self, payload): | |
return _BACnetTagWhoIS_dispatcher(payload) | |
def extract_padding(self, s): | |
return "", s | |
BACnetWhoISLowLimit = type("BACnetWhoISLowLimit", (BACnetTag,), | |
{"name": "BACnetTag - who-IS - Low Limit"}) | |
BACnetWhoISHighLimit = type("BACnetWhoISHighLimit", (BACnetTag,), | |
{"name": "BACnetTag - who-IS - High Limit"}) | |
def _BACnetTagWhoIS_dispatcher(data): | |
if len(data) < 1: | |
return Raw(data) | |
if orb(data[0]) >> 4 == 0: | |
return BACnetWhoISLowLimit(data) | |
elif orb(data[0]) >> 4 == 1: | |
return BACnetWhoISHighLimit(data) | |
return BACnetTag(data) | |
class NPDU(Packet): | |
name = 'NPDU' | |
fields_desc = [ | |
XByteField('version', 0x81), | |
BitField("nsdu", 0, 1), | |
BitField("res1", 0, 1), | |
BitField("dnet_dlen_daddr", 0, 1), | |
BitField("res2", 0, 1), | |
BitField("snet_slen_saddr", 0, 1), | |
BitField("reply", 0, 1), | |
BitField("prio1", 0, 1), | |
BitField("prio0", 0, 1), | |
ConditionalField(ShortField('dest', 0x0), lambda p: p.dnet_dlen_daddr == 1), | |
ConditionalField(ByteField('dest_len', 0x0), lambda p: p.dnet_dlen_daddr == 1), | |
ConditionalField(ShortField('src_net_addr', 0x0), lambda p: p.snet_slen_saddr == 1), | |
ConditionalField(ByteField('src_addr_len', 0x0), lambda p: p.snet_slen_saddr == 1), | |
ConditionalField(ByteField('src_addr', 0x0), lambda p: p.snet_slen_saddr == 1), | |
ByteField('hop_count', 0x0)] | |
I_AM = 0 | |
WHO_IS = 8 | |
class APDU(Packet): | |
name = 'APDU' | |
fields_desc = [ | |
BitEnumField("apdu_type", 1, 4, _bacnet_apdu_type), | |
BitField("_reserved", 0, 4), | |
ByteEnumField("service_choice", 8, | |
_bacnet_unconfirmed_service_choice), | |
] | |
def guess_payload_class(self, payload): | |
if self.service_choice == WHO_IS: | |
#print("BACnetWhoIS") | |
return BACnetWhoIS | |
if self.service_choice == I_AM: | |
#print("BACnetIAm") | |
return BACnetIAm | |
else: | |
return Raw | |
class BACnetWhoIS(Packet): | |
name = "BACnet - who-IS" | |
fields_desc = [ | |
PacketListField("arguments", [], _BACnetTagWhoIS_dispatcher)] | |
class BACnetIAm(Packet): | |
name = "BACnet - I-am" | |
fields_desc = [ | |
PacketField("Object_Identifier", BACnetTag(), BACnetTag), | |
PacketField("Max_APDU_Length_Accepted", BACnetTag(), BACnetTag), | |
PacketField("Segmentation_Supported", BACnetTag(), BACnetTag), | |
PacketField("Vendor_Identifier", BACnetTag(), BACnetTag)] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from scapy.all import * | |
from scapy.layers.all import UDP | |
from bacnet import bindLayers, BVLC, APDU, NPDU, BACnetIAm | |
print("Binding layers") | |
bindLayers() | |
print("reading PCAP") | |
pkts = rdpcap("bacnet-stack-services.cap") | |
whois_pkt = pkts[9] | |
iam_pkt = pkts[12] | |
#hexdump(whois_pkt[BVLC]) | |
print(whois_pkt[NPDU].show()) | |
#p = iam_pkt | |
#print(p.Object_Identifier.tag_value) | |
print(iam_pkt[NPDU].show()) | |
#hexdump(ihave_pkt[BVLC]) | |
#print(ihave_pkt[BVLC].show()) | |
#interact(mydict=globals()) | |
#udp_pkt = whois_pkt[UDP] | |
#print("\nUDP Layer:") | |
#print(udp_pkt[BVLC].show()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment