Skip to content

Instantly share code, notes, and snippets.

@carlos-jenkins
Created September 27, 2016 22:20
Show Gist options
  • Save carlos-jenkins/a6cbb4069ff1e7f0dcd32a08a7e55c29 to your computer and use it in GitHub Desktop.
Save carlos-jenkins/a6cbb4069ff1e7f0dcd32a08a7e55c29 to your computer and use it in GitHub Desktop.
Small Software Packet Generator
from __future__ import division
from os.path import isfile
from json import dumps
from logging import getLogger
from traceback import format_exc
from base64 import b64encode, b64decode
from multiprocessing import Value, Queue, Event, Process
log = getLogger(__name__)
def ordp(c):
"""
Helper that returns a printable binary data representation.
"""
output = []
for i in c:
j = ord(i)
if (j < 32) or (j >= 127):
output.append('.')
else:
output.append(i)
return ''.join(output)
def hexdump(p):
"""
Return a hexdump representation of binary data.
"""
output = []
l = len(p)
i = 0
while i < l:
output.append('{:04d} '.format(i))
for j in range(16):
if (i + j) < l:
output.append('{:02X} '.format(ord(p[i + j])))
else:
output.append(' ')
if (j % 16) == 7:
output.append(' ')
output.append(' ')
output.append(ordp(p[i:i + 16]))
output.append('\n')
i += 16
return ''.join(output)
def _genloop(iface, progress, confqueue, actionev, interruptev):
"""
Function for a generation subprocess.
"""
from socket import socket, AF_PACKET, SOCK_RAW
s = socket(AF_PACKET, SOCK_RAW)
s.bind((iface, 0))
def softloop(conf):
total = sum([t for p, t in conf._packets]) * conf._iterations
current = 0
for i in range(0, conf._iterations):
for p, times in conf._packets:
for t in range(0, times):
if interruptev.is_set():
return
s.send(p)
current += 1
progress.value = current / total
while True:
# Wait start event
actionev.wait()
interruptev.clear()
# Get configuration
conf = confqueue.get(False)
# Software loop
softloop(conf)
# Clear start event
actionev.clear()
def _caploop(iface, progress, confqueue, actionev, interruptev):
"""
Function for a capture subprocess.
"""
from pcapy import open_live
from time import sleep
def softloop(conf):
# Create capture device
reader = open_live(iface, 65535, 1, 100)
reader.setnonblock(1)
dumper = reader.dump_open(conf._capture_file)
# Dump callback
def _dumpcb(hdr, data):
dumper.dump(hdr, data)
# Capture packets until filled
stored = 0
capacity = conf._capacity
while True:
if interruptev.is_set():
return
if stored >= capacity:
return
stored += reader.dispatch(capacity - stored, _dumpcb)
progress.value = stored / capacity
sleep(0.01)
while True:
# Wait start event
actionev.wait()
interruptev.clear()
# Get configuration
conf = confqueue.get(False)
# Software loop
softloop(conf)
# Clear start event
actionev.clear()
COUNTERS_MAP = {
'TX_TOTAL': 'tx_packets',
'TX_BYTES': 'tx_bytes',
'TX_ERR_COLLISIONS': 'collisions',
'RX_TOTAL': 'rx_packets',
'RX_MCAST': 'multicast',
'RX_BYTES': 'rx_bytes',
'RX_ERR_DROPS': 'rx_dropped',
'RX_ERR_CRC': 'rx_crc_errors',
'RX_ERR_ALIGN': 'rx_frame_errors',
'RX_ERR_LENGTH': 'rx_length_errors',
'RX_ERR_GIANTS': 'rx_over_errors',
}
COUNTERS_ORD = [
'TX_TOTAL',
'TX_BYTES',
'TX_ERR_COLLISIONS',
'RX_TOTAL',
'RX_MCAST',
'RX_BYTES',
'RX_ERR_DROPS',
'RX_ERR_CRC',
'RX_ERR_ALIGN',
'RX_ERR_LENGTH',
'RX_ERR_GIANTS',
]
COUNTERS = set(COUNTERS_ORD)
class G(object):
def __init__(self, iterations=None):
"""
Generation parameters object.
"""
self._packets = []
self._iterations = 1
if iterations is not None:
self.iterations = iterations
@property
def iterations(self):
"""
Times to repeat the packet queue. ``0`` means to loop indefinitely.
:Default: ``1``
:Accept: ``int [> 0]``
"""
return self._iterations
@iterations.setter
def iterations(self, value):
if value < 0:
raise ValueError('Iterations must be > 0')
self._iterations = value
def add_packet(self, packet, count=1, size=None, fill='\0'):
"""
Add a packet to the queue.
:param packet: Raw hexadecimal string or Scapy Packet object.
:param int count: Amount of times to repeat this packet.
:param int size: Size to strip or extend the packet.
:param str fill: Data pattern to use to pad the packets when extending
the packet (size > len(pkt)).
"""
scapy_packet = False
try:
from scapy.packet import Packet
scapy_packet = isinstance(packet, Packet)
except ImportError:
pass
# Convert from Scapy packet if supported
if scapy_packet:
packet = str(packet)
# Convert from hexadecimal
elif packet.startswith('0x'):
packet = packet[2:].decode('hex')
# Check minimum size
orig_len = len(packet)
if size is None:
if orig_len < 60:
size = 60
else:
if size < 60:
size = 60
# Strip or extend packet
if size is not None:
if orig_len < size:
pad = (fill * (((size - orig_len) // len(fill)) + 1))
packet += pad
if len(packet) > size:
packet = packet[:size]
self._packets.append((b64encode(packet), count))
def to_dict(self):
"""
Return a JSON representation of the object.
"""
obj = {
'iterations': self._iterations,
'packets': self._packets
}
return obj
def __str__(self):
return dumps(self.to_dict(), sort_keys=True, indent=4)
def __repr__(self):
index = 1
fpkts = ['Packets:']
for pkt, count in self._packets:
raw = b64decode(pkt)
fpkts.append(
' Packet #{} x {} ({}B):'.format(index, count, len(raw))
)
fpkts.append(hexdump(raw).strip())
index += 1
if not self._packets:
fpkts.append(' <No packets>')
fpkts.append('')
out = [
'Iterations: {}'.format(self._iterations),
'\n'.join(fpkts)
]
return '\n'.join(out)
class C(object):
def __init__(self, capture_file=None, capacity=None):
"""
Capture parameters object.
"""
self._capture_file = '/tmp/_default.pcap'
self._capacity = 100
if capture_file is not None:
self._capture_file = capture_file
if capacity is not None:
self._capacity = capacity
@property
def capture_file(self):
"""
pcap filename to save captured traffic.
:Default: ``'/tmp/_default.pcap'``
:Accept: ``str (path) [Not empty]``
"""
return self._capture_file
@capture_file.setter
def capture_file(self, value):
if not value:
raise ValueError('Capture file must not be empty')
self._capture_file = value
@property
def capacity(self):
"""
Maximum number of packets to save in ``capture_file``.
:Default: ``100``
:Accept: ``int [>= 1]``
"""
return self._capacity
@capacity.setter
def capacity(self, value):
if value < 1:
raise ValueError('Capacity must be >= 1')
self._capacity - value
def to_dict(self):
obj = {
'capture_file': self._capture_file,
'capacity': self._capacity,
}
return obj
def __str__(self):
return dumps(self.to_dict(), sort_keys=True, indent=4)
def __repr__(self):
out = [
'Capacity : {}'.format(self._capacity),
'Capture file : {}'.format(self._capture_file)
]
return '\n'.join(out)
class Iface(object):
def __init__(self, iface):
"""
Implementation details:
- Each Iface (interface) object control two subprocess (in order to
escape the GIL), one for packet generation and one for packet
capture.
- Packet generation (layer 2) is done using:
::
s = socket(AF_PACKET, SOCK_RAW)
s.bind((iface, 0))
s.send(pkt)
- Packet capture is done using ``pcapy`` library:
::
reader = open_live(...)
reader.dispatch(...)
- Counters and info is read from:
::
/sys/class/net/{iface}/*
Please note that counters are cached in order to support clearing
them.
- Device listing is parsed from:
::
/proc/net/dev
"""
self._iface = iface
# Counters
self._counters_cache = {}
# Generation process
self._genprog = Value('d', 1.0)
self._genconf = Queue(maxsize=1)
self._genaction = Event()
self._genintr = Event()
self._genproc = Process(
target=_genloop,
args=(
iface,
self._genprog,
self._genconf,
self._genaction,
self._genintr
)
)
self._genparams_cache = None
# Capture process
self._capprog = Value('d', 1.0)
self._capconf = Queue(maxsize=1)
self._capaction = Event()
self._capintr = Event()
self._capproc = Process(
target=_caploop,
args=(
iface,
self._capprog,
self._capconf,
self._capaction,
self._capintr
)
)
self._capparams_cache = None
self._genproc.daemon = True
self._capproc.daemon = True
self._genproc.start()
self._capproc.start()
@staticmethod
def find_interfaces():
"""
Find all network interfaces in the system.
"""
try:
ifaces = []
with open('/proc/net/dev') as fd:
next(fd)
next(fd)
for line in fd:
iface = line.strip().split(':')[0]
ifaces.append(iface)
ifaces.sort()
return ifaces
except Exception as e:
log.error(format_exc(e))
return []
def _read_real_counters(self, counters):
"""
Read counters from kernel statistics for current interface.
"""
unknown = set(counters) - COUNTERS
if unknown:
raise Exception('Unknown counters: {}'.format(
', '.join(unknown)
))
tpl = '/sys/class/net/{}/statistics/{}'
values = {}
for c in counters:
filename = tpl.format(self._iface, COUNTERS_MAP[c])
if not isfile(filename):
values[c] = -1
continue
with open(filename) as fd:
values[c] = int(fd.read().strip())
return values
def read_counters(self, counters=None):
"""
Read given counters.
None means all, which are defined by the COUNTERS variable.
"""
if counters is None:
counters = COUNTERS
values = self._read_real_counters(counters)
for c in values:
values[c] -= self._counters_cache.get(c, 0)
return values
def clear_counters(self, counters=None):
"""
Clear given counters.
None means all, which are defined by the COUNTERS variable.
"""
if counters is None:
counters = COUNTERS
self._counters_cache.update(
self._read_real_counters(counters)
)
def config(self, genparams=None, capparams=None):
"""
Configure this interface.
"""
self._genparams_cache = genparams
self._capparams_cache = capparams
def start(self):
"""
Start this interface to execute configuration.
"""
if self.is_active():
raise Exception('Interface already started.')
if self._genparams_cache is None and self._capparams_cache is None:
raise Exception('Interface not configured.')
if self._genparams_cache is not None:
self._genconf.put(self._genparams_cache, False)
self._genprog.value = 0.0
self._genaction.set()
if self._capparams_cache is not None:
self._capconf.put(self._capparams_cache, False)
self._capprog.value = 0.0
self._capaction.set()
def stop(self):
"""
Stop this interface actions.
"""
self._genintr.set()
self._capintr.set()
def get_info(self):
"""
Return link status, mtu. mac address and speed.
"""
info = {
'iface': self._iface
}
tpl = '/sys/class/net/{}/{}'
link = tpl.format(self._iface, 'operstate')
with open(link) as fd:
info['link'] = fd.read().strip() == 'up'
mtu = tpl.format(self._iface, 'mtu')
with open(mtu) as fd:
info['mtu'] = int(fd.read().strip())
speed = tpl.format(self._iface, 'speed')
with open(speed) as fd:
info['speed'] = int(fd.read().strip())
mac = tpl.format(self._iface, 'address')
with open(mac) as fd:
info['mac'] = fd.read().strip()
return info
def is_active(self):
"""
Return if the interface is active.
"""
return self._genaction.is_set() or self._capaction.is_set()
def get_progress(self):
"""
Get the progress of the jobs being executed.
For generation, it is the completion percent of the packets defined in
the generation queue. For capture, it is the percent of the capture
file filled.
"""
return (self._genprog.value, self._capprog.value)
def to_dict(self):
"""
Return a JSON representation of the object.
"""
info = self.get_info()
info['active'] = self.is_active()
info['counters'] = self.read_counters()
gen = None
if self._genparams_cache is not None:
gen = self._genparams_cache.to_dict()
cap = None
if self._capparams_cache is not None:
cap = self._capparams_cache.to_dict()
info['config'] = {
'generation': gen,
'capture': cap,
}
info['progress'] = self.get_progress()
return info
def __str__(self):
return dumps(self.to_dict(), sort_keys=True, indent=4)
def __repr__(self):
obj = self.to_dict()
out = ['[{}]'.format(obj['iface'])]
out.append('== INFO ==')
out.append('Speed: {}'.format(obj['speed']))
out.append('Link : {}'.format('UP' if obj['speed'] else 'DOWN'))
out.append('MAC : {}'.format(obj['mac']))
out.append('MTU : {}'.format(obj['mtu']))
out.append('== COUNTERS ==')
for c in COUNTERS_ORD:
out.append('{: <18}: {}'.format(c, obj['counters'][c]))
out.append('== ACTION ==')
out.append(
'Is active: {} ({}, {})'.format(
'YES' if obj['active'] else 'NO',
obj['progress'][0],
obj['progress'][1],
)
)
out.append('== CONFIG ==')
for c, title in zip(
[self._genparams_cache, self._capparams_cache],
['Generation', 'Capture']):
if c is not None:
out.append(repr(c))
else:
out.append('No {} Configuration.'.format(title))
return '\n'.join(out)
def __del__(self):
# Terminate child processes on destroy
self._genproc.terminate()
self._capproc.terminate()
self._genproc.join()
self._capproc.join()
def discover_ports(ifaces=None):
"""
Create an iface port for each given interface name.
"""
available = Iface.find_interfaces()
if ifaces is None:
ifaces = available
else:
if not set(ifaces).issubset(set(available)):
raise Exception('Unknown interfaces {}'.format(
', '.join(sorted(set(ifaces) - set(available)))
))
ifaces.sort()
ports = {}
for seq, iface in enumerate(ifaces, 1):
ports[seq] = Iface(iface)
log.info('Using ports:')
for port in sorted(ports):
log.info('P={} :: IF={}'.format(
port, ports[port].iface
))
return ports
__all__ = ['COUNTERS', 'G', 'C', 'Iface', 'discover_ports']
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment