Skip to content

Instantly share code, notes, and snippets.

@wizche
Created February 6, 2025 15:23
Show Gist options
  • Save wizche/b1ce1cba33d91f48a83baab9f816d05b to your computer and use it in GitHub Desktop.
Save wizche/b1ce1cba33d91f48a83baab9f816d05b to your computer and use it in GitHub Desktop.
BACnet loves scapy
from scapy.all import *
from scapy.layers.inet import IP, UDP
from enum import Enum
import logging
logging.getLogger("scapy").setLevel(logging.DEBUG)
BACNET_PORT = 47808
def bindLayers():
bind_layers(UDP, BVLC, sport=BACNET_PORT)
bind_layers(UDP, BVLC, dport=BACNET_PORT)
bind_layers(BVLC, NPDU)
bind_layers(NPDU, APDU)
class BVLCFunction(Enum):
RESULT = 0
WRITE_BDT = 1
READ_BDT = 2
READ_BDT_ACK = 3
FORWARDED_NPDU = 4
REGISTER_FD = 5
READ_FDT = 6
READ_FDT_ACK= 7
DEL_FDT_ENTRY = 8
DISTRIBUTE_BROADCAST_TO_NETWORK = 9
ORIGINAL_UNICAST_NPDU = 10
ORIGINAL_BROADCAST_NPDU = 11
class BVLC(Packet):
name = 'BVLC'
fields_desc = [
XByteField('type', 0x81),
ByteEnumField('function', 0, BVLCFunction),
ShortField('length', None)]
# From: `21 FORMAL DESCRIPTION OF APPLICATION PROTOCOL DATA UNITS`
_bacnet_apdu_type = {1: "BACnet-Unconfirmed-Request-PDU"}
# From: `BACnetUnconfirmedServiceChoice ::= ENUMERATED {`
_bacnet_unconfirmed_service_choice = {8: "who-Is", 0: "i-am"}
class BACnetTagLen(BitFieldLenField):
def getfield(self, pkt, s):
if not isinstance(s, tuple):
warning("BACnetTagLengetfield.getfield() - not aligned correctly!")
return None, None
# Check the length encoding
s, bn = s
v = s[0] & 0b111
#print(v & 0b1000, bin(v), s, s[0] & 0b111)
if (v & 0b1000) == 0:
# 1 byte value
return s[1:], s[0] & 0b111
elif (v & 0b101) == 0b101:
# the length of the value is encoded in the next byte (i.e. s[1])
return s[2:], s[1]
else:
warning("BACnetTagLen.getfield() - length encoding not supported")
return None, None
def addfield(self, pkt, s, val):
fld, fval = pkt.getfield_and_val(self.length_of)
raw_fval = fld.i2m(pkt, fval)
len_raw_fval = len(raw_fval)
if len_raw_fval == 1:
_len = orb(s[2] << 5)
return s[0] + struct.pack("!B", _len)
elif len_raw_fval > 1 and len_raw_fval < 256:
_tag_number_class = orb((s[2] << 3) + 0b101)
_len = orb(len_raw_fval)
return s[0] + struct.pack("!B", _tag_number_class) + struct.pack("!B", _len)
warning.warn("BACnetTagLen.addfield() - length encoding not supported")
return b""
class BACnetTag(Packet):
name = "BACnetTag"
fields_desc = [BitField("tag_number", 0, 4),
BitField("tag_class", 0, 1),
BACnetTagLen("tag_length", 1, 3, length_of="tag_value"),
XStrLenField("tag_value", b"", length_from=lambda p: p.tag_length)]
def guess_payload_class(self, payload):
return _BACnetTagWhoIS_dispatcher(payload)
def extract_padding(self, s):
return "", s
BACnetWhoISLowLimit = type("BACnetWhoISLowLimit", (BACnetTag,),
{"name": "BACnetTag - who-IS - Low Limit"})
BACnetWhoISHighLimit = type("BACnetWhoISHighLimit", (BACnetTag,),
{"name": "BACnetTag - who-IS - High Limit"})
def _BACnetTagWhoIS_dispatcher(data):
if len(data) < 1:
return Raw(data)
if orb(data[0]) >> 4 == 0:
return BACnetWhoISLowLimit(data)
elif orb(data[0]) >> 4 == 1:
return BACnetWhoISHighLimit(data)
return BACnetTag(data)
class NPDU(Packet):
name = 'NPDU'
fields_desc = [
XByteField('version', 0x81),
BitField("nsdu", 0, 1),
BitField("res1", 0, 1),
BitField("dnet_dlen_daddr", 0, 1),
BitField("res2", 0, 1),
BitField("snet_slen_saddr", 0, 1),
BitField("reply", 0, 1),
BitField("prio1", 0, 1),
BitField("prio0", 0, 1),
ConditionalField(ShortField('dest', 0x0), lambda p: p.dnet_dlen_daddr == 1),
ConditionalField(ByteField('dest_len', 0x0), lambda p: p.dnet_dlen_daddr == 1),
ConditionalField(ShortField('src_net_addr', 0x0), lambda p: p.snet_slen_saddr == 1),
ConditionalField(ByteField('src_addr_len', 0x0), lambda p: p.snet_slen_saddr == 1),
ConditionalField(ByteField('src_addr', 0x0), lambda p: p.snet_slen_saddr == 1),
ByteField('hop_count', 0x0)]
I_AM = 0
WHO_IS = 8
class APDU(Packet):
name = 'APDU'
fields_desc = [
BitEnumField("apdu_type", 1, 4, _bacnet_apdu_type),
BitField("_reserved", 0, 4),
ByteEnumField("service_choice", 8,
_bacnet_unconfirmed_service_choice),
]
def guess_payload_class(self, payload):
if self.service_choice == WHO_IS:
#print("BACnetWhoIS")
return BACnetWhoIS
if self.service_choice == I_AM:
#print("BACnetIAm")
return BACnetIAm
else:
return Raw
class BACnetWhoIS(Packet):
name = "BACnet - who-IS"
fields_desc = [
PacketListField("arguments", [], _BACnetTagWhoIS_dispatcher)]
class BACnetIAm(Packet):
name = "BACnet - I-am"
fields_desc = [
PacketField("Object_Identifier", BACnetTag(), BACnetTag),
PacketField("Max_APDU_Length_Accepted", BACnetTag(), BACnetTag),
PacketField("Segmentation_Supported", BACnetTag(), BACnetTag),
PacketField("Vendor_Identifier", BACnetTag(), BACnetTag)]
from scapy.all import *
from scapy.layers.all import UDP
from bacnet import bindLayers, BVLC, APDU, NPDU, BACnetIAm
print("Binding layers")
bindLayers()
print("reading PCAP")
pkts = rdpcap("bacnet-stack-services.cap")
whois_pkt = pkts[9]
iam_pkt = pkts[12]
#hexdump(whois_pkt[BVLC])
print(whois_pkt[NPDU].show())
#p = iam_pkt
#print(p.Object_Identifier.tag_value)
print(iam_pkt[NPDU].show())
#hexdump(ihave_pkt[BVLC])
#print(ihave_pkt[BVLC].show())
#interact(mydict=globals())
#udp_pkt = whois_pkt[UDP]
#print("\nUDP Layer:")
#print(udp_pkt[BVLC].show())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment