Skip to content

Instantly share code, notes, and snippets.

@joshgachnang
Created September 26, 2014 18:22
Show Gist options
  • Save joshgachnang/97552d7170c36e28a472 to your computer and use it in GitHub Desktop.
Save joshgachnang/97552d7170c36e28a472 to your computer and use it in GitHub Desktop.
import binascii
import socket
import struct
import sys
import select
LLDP_ETHERTYPE = 0x88cc
s1 = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0x88cc)
s1.bind(('ens9f0', 0x88cc))
s2 = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0x88cc)
s2.bind(('ens9f1', 0x88cc))
def _parse_tlv(buff):
"""Iterate over a buffer and generate structured TLV data.
:param buff: An ethernet packet with the header trimmed off (first
14 bytes)
"""
lldp_info = []
while buff:
tlvhdr = struct.unpack('!H', buff[:2])[0]
tlvtype = (tlvhdr & 0xfe00) >> 9
tlvlen = (tlvhdr & 0x01ff)
tlvdata = buff[2:tlvlen + 2]
buff = buff[tlvlen + 2:]
lldp_info.append((tlvtype, tlvdata))
return lldp_info
def _receive_lldp_packets(sock):
"""Receive LLDP packets and process them.
:param sock: A bound socket
:return: A list of tuples in the form (lldp_type, lldp_data)
"""
pkt = sock.recv(1600)
# Filter invalid packets
if not pkt or len(pkt) < 14:
return
# Skip header (dst MAC, src MAC, ethertype)
pkt = pkt[14:]
return _parse_tlv(pkt)
def get_lldp_info(interface_name=None):
"""Get LLDP info from the switch(es) the agent is connected to.
Listens on either a single or all interfaces for LLDP packets, then
parses them. If no LLDP packets are received before lldp_timeout,
returns a dictionary in the form {'interface': [],...}.
:param interface_name: The interface to listen for packets on. If
None, will listen on each interface.
:return: A dictionary in the form {'interface': [lldp_info],...}
"""
lldp_info = {}
read_list = {}
interfaces = ['ens9f0', 'ens9f1']
for interface in interfaces:
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
LLDP_ETHERTYPE)
s.bind((interface, LLDP_ETHERTYPE))
# Add interface name to object for returning data later
read_list[interface] = s
if not read_list:
return {}
while read_list:
r = select.select(read_list.values(), [], [], 30)
if not r[0]:
# Empty read list means timeout
break
for s in r[0]:
# Find interface name matching socket ready for read
for interface_name, sock in read_list.items():
if s == sock:
lldp_info[interface_name] = (
_receive_lldp_packets(s))
# Remove successfully read sockets from read_list
del read_list[interface_name]
return lldp_info
if __name__ == '__main__':
print get_lldp_info()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment