-
-
Save joshgachnang/97552d7170c36e28a472 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import binascii | |
import socket | |
import struct | |
import sys | |
import select | |
LLDP_ETHERTYPE = 0x88cc | |
s1 = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0x88cc) | |
s1.bind(('ens9f0', 0x88cc)) | |
s2 = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0x88cc) | |
s2.bind(('ens9f1', 0x88cc)) | |
def _parse_tlv(buff): | |
"""Iterate over a buffer and generate structured TLV data. | |
:param buff: An ethernet packet with the header trimmed off (first | |
14 bytes) | |
""" | |
lldp_info = [] | |
while buff: | |
tlvhdr = struct.unpack('!H', buff[:2])[0] | |
tlvtype = (tlvhdr & 0xfe00) >> 9 | |
tlvlen = (tlvhdr & 0x01ff) | |
tlvdata = buff[2:tlvlen + 2] | |
buff = buff[tlvlen + 2:] | |
lldp_info.append((tlvtype, tlvdata)) | |
return lldp_info | |
def _receive_lldp_packets(sock): | |
"""Receive LLDP packets and process them. | |
:param sock: A bound socket | |
:return: A list of tuples in the form (lldp_type, lldp_data) | |
""" | |
pkt = sock.recv(1600) | |
# Filter invalid packets | |
if not pkt or len(pkt) < 14: | |
return | |
# Skip header (dst MAC, src MAC, ethertype) | |
pkt = pkt[14:] | |
return _parse_tlv(pkt) | |
def get_lldp_info(interface_name=None): | |
"""Get LLDP info from the switch(es) the agent is connected to. | |
Listens on either a single or all interfaces for LLDP packets, then | |
parses them. If no LLDP packets are received before lldp_timeout, | |
returns a dictionary in the form {'interface': [],...}. | |
:param interface_name: The interface to listen for packets on. If | |
None, will listen on each interface. | |
:return: A dictionary in the form {'interface': [lldp_info],...} | |
""" | |
lldp_info = {} | |
read_list = {} | |
interfaces = ['ens9f0', 'ens9f1'] | |
for interface in interfaces: | |
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, | |
LLDP_ETHERTYPE) | |
s.bind((interface, LLDP_ETHERTYPE)) | |
# Add interface name to object for returning data later | |
read_list[interface] = s | |
if not read_list: | |
return {} | |
while read_list: | |
r = select.select(read_list.values(), [], [], 30) | |
if not r[0]: | |
# Empty read list means timeout | |
break | |
for s in r[0]: | |
# Find interface name matching socket ready for read | |
for interface_name, sock in read_list.items(): | |
if s == sock: | |
lldp_info[interface_name] = ( | |
_receive_lldp_packets(s)) | |
# Remove successfully read sockets from read_list | |
del read_list[interface_name] | |
return lldp_info | |
if __name__ == '__main__': | |
print get_lldp_info() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment