Created
April 24, 2016 12:15
-
-
Save syakesaba/f1271154262bf3c3292995828b09e3fa to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## This file is part of Scapy | |
## See http://www.secdev.org/projects/scapy for more informations | |
## Copyright (C) Philippe Biondi <[email protected]> | |
## This program is published under a GPLv2 license | |
""" | |
Fields: basic data structures that make up parts of packets. | |
""" | |
import struct,copy,socket | |
from .config import conf | |
from .volatile import * | |
from .data import * | |
from .utils import * | |
from .base_classes import BasePacket,Gen,Net | |
############ | |
## Fields ## | |
############ | |
class Field: | |
"""For more informations on how this work, please refer to | |
http://www.secdev.org/projects/scapy/files/scapydoc.pdf | |
chapter ``Adding a New Field''""" | |
islist=0 | |
holds_packets=0 | |
def __init__(self, name, default, fmt="H"): | |
self.name = name | |
if fmt[0] in "@=<>!": | |
self.fmt = fmt | |
else: | |
self.fmt = "!"+fmt | |
self.default = self.any2i(None,default) | |
self.sz = struct.calcsize(self.fmt) | |
self.owners = [] | |
def register_owner(self, cls): | |
self.owners.append(cls) | |
def i2len(self, pkt, x): | |
"""Convert internal value to a length usable by a FieldLenField""" | |
return self.sz | |
def i2count(self, pkt, x): | |
"""Convert internal value to a number of elements usable by a FieldLenField. | |
Always 1 except for list fields""" | |
return 1 | |
def i2b(self, pkt, x): | |
"""Convert internal value to internal value""" | |
if type(x) is str: | |
x = bytes([ ord(i) for i in x ]) | |
return x | |
def i2dict(self, pkt, x): | |
return { self.name: x } | |
def h2i(self, pkt, x): | |
"""Convert human value to internal value""" | |
if type(x) is str: | |
x = bytes([ ord(i) for i in x ]) | |
return x | |
def i2h(self, pkt, x): | |
"""Convert internal value to human value""" | |
return x | |
def m2i(self, pkt, x): | |
"""Convert machine value to internal value""" | |
return x | |
def i2m(self, pkt, x): | |
"""Convert internal value to machine value""" | |
if x is None: | |
x = 0 | |
return x | |
def any2i(self, pkt, x): | |
"""Try to understand the most input values possible and make an internal value from them""" | |
return self.h2i(pkt, x) | |
def i2repr(self, pkt, x): | |
"""Convert internal value to a nice representation""" | |
return repr(self.i2h(pkt,x)) | |
def addfield(self, pkt, s, val): | |
"""Add an internal value to a string""" | |
return s+struct.pack(self.fmt, self.i2m(pkt,val)) | |
def getfield(self, pkt, s): | |
"""Extract an internal value from a string""" | |
return s[self.sz:], self.m2i(pkt, struct.unpack(self.fmt, s[:self.sz])[0]) | |
def do_copy(self, x): | |
if hasattr(x, "copy"): | |
return x.copy() | |
if type(x) is list: | |
x = x[:] | |
for i in range(len(x)): | |
if isinstance(x[i], BasePacket): | |
x[i] = x[i].copy() | |
return x | |
def __repr__(self): | |
return "<Field (%s).%s>" % (",".join(x.__name__ for x in self.owners),self.name) | |
def copy(self): | |
return copy.deepcopy(self) | |
def randval(self): | |
"""Return a volatile object whose value is both random and suitable for this field""" | |
fmtt = self.fmt[-1] | |
if fmtt in "BHIQ": | |
return {"B":RandByte,"H":RandShort,"I":RandInt, "Q":RandLong}[fmtt]() | |
elif fmtt == "s": | |
if self.fmt[0] in "0123456789": | |
l = int(self.fmt[:-1]) | |
else: | |
l = int(self.fmt[1:-1]) | |
return RandBin(l) | |
else: | |
warning("no random class for [%s] (fmt=%s)." % (self.name, self.fmt)) | |
class Emph: | |
fld = b"" | |
def __init__(self, fld): | |
self.fld = fld | |
def __getattr__(self, attr): | |
return getattr(self.fld,attr) | |
def __hash__(self): | |
return hash(self.fld) | |
def __eq__(self, other): | |
return self.fld == other | |
class ActionField: | |
_fld = None | |
def __init__(self, fld, action_method, **kargs): | |
self._fld = fld | |
self._action_method = action_method | |
self._privdata = kargs | |
def any2i(self, pkt, val): | |
getattr(pkt, self._action_method)(val, self._fld, **self._privdata) | |
return getattr(self._fld, "any2i")(pkt, val) | |
def __getattr__(self, attr): | |
return getattr(self._fld,attr) | |
class ConditionalField: | |
fld = None | |
def __init__(self, fld, cond): | |
self.fld = fld | |
self.cond = cond | |
def _evalcond(self,pkt): | |
return self.cond(pkt) | |
def getfield(self, pkt, s): | |
if self._evalcond(pkt): | |
return self.fld.getfield(pkt,s) | |
else: | |
return s,None | |
def addfield(self, pkt, s, val): | |
if self._evalcond(pkt): | |
return self.fld.addfield(pkt,s,val) | |
else: | |
return s | |
def __getattr__(self, attr): | |
return getattr(self.fld,attr) | |
class PadField: | |
"""Add bytes after the proxified field so that it ends at the specified | |
alignment from its begining""" | |
_fld = None | |
def __init__(self, fld, align, padwith=None): | |
self._fld = fld | |
self._align = align | |
self._padwith = padwith or b"" | |
def padlen(self, flen): | |
return -flen%self._align | |
def getfield(self, pkt, s): | |
remain,val = self._fld.getfield(pkt,s) | |
padlen = self.padlen(len(s)-len(remain)) | |
return remain[padlen:], val | |
def addfield(self, pkt, s, val): | |
sval = self._fld.addfield(pkt, b"", val) | |
return s+sval+struct.pack("%is" % (self.padlen(len(sval))), self._padwith) | |
def __getattr__(self, attr): | |
return getattr(self._fld,attr) | |
class MACField(Field): | |
def __init__(self, name, default): | |
Field.__init__(self, name, default, "6s") | |
def i2m(self, pkt, x): | |
if x is None: | |
return b"\0\0\0\0\0\0" | |
return mac2str(x) | |
def m2i(self, pkt, x): | |
return str2mac(x) | |
def any2i(self, pkt, x): | |
if type(x) is bytes and len(x) is 6: | |
x = self.m2i(pkt, x) | |
return x | |
def i2repr(self, pkt, x): | |
x = self.i2h(pkt, x) | |
if self in conf.resolve: | |
x = conf.manufdb._resolve_MAC(x) | |
return x | |
def randval(self): | |
return RandMAC() | |
class IPField(Field): | |
def __init__(self, name, default): | |
Field.__init__(self, name, default, "4s") | |
def h2i(self, pkt, x): | |
if type(x) is str: | |
try: | |
inet_aton(x) | |
except socket.error: | |
x = Net(x) | |
elif type(x) is list: | |
x = [self.h2i(pkt, n) for n in x] | |
return x | |
def resolve(self, x): | |
if self in conf.resolve: | |
try: | |
ret = socket.gethostbyaddr(x)[0] | |
except: | |
pass | |
else: | |
if ret: | |
return ret | |
return x | |
def i2m(self, pkt, x): | |
return inet_aton(x) | |
def m2i(self, pkt, x): | |
return inet_ntoa(x) | |
def any2i(self, pkt, x): | |
return self.h2i(pkt,x) | |
def i2repr(self, pkt, x): | |
return self.resolve(self.i2h(pkt, x)) | |
def randval(self): | |
return RandIP() | |
class SourceIPField(IPField): | |
def __init__(self, name, dstname): | |
IPField.__init__(self, name, None) | |
self.dstname = dstname | |
def i2m(self, pkt, x): | |
if x is None: | |
iff,x,gw = pkt.route() | |
if x is None: | |
x = "0.0.0.0" | |
return IPField.i2m(self, pkt, x) | |
def i2h(self, pkt, x): | |
if x is None: | |
dst=getattr(pkt,self.dstname) | |
if isinstance(dst,Gen): | |
#r = map(conf.route.route, dst) | |
r = [ conf.route.route(i) for i in dst ] | |
r.sort() | |
if r[0] != r[-1]: | |
warning("More than one possible route for %s"%repr(dst)) | |
iff,x,gw = r[0] | |
else: | |
iff,x,gw = conf.route.route(dst) | |
return IPField.i2h(self, pkt, x) | |
class ByteField(Field): | |
def __init__(self, name, default): | |
Field.__init__(self, name, default, "B") | |
class SignedByteField(Field): | |
def __init__(self, name, default): | |
Field.__init__(self, name, default, "b") | |
class XByteField(ByteField): | |
def i2repr(self, pkt, x): | |
return lhex(self.i2h(pkt, x)) | |
class OByteField(ByteField): | |
def i2repr(self, pkt, x): | |
return "%03o"%self.i2h(pkt, x) | |
class X3BytesField(XByteField): | |
def __init__(self, name, default): | |
Field.__init__(self, name, default, "!I") | |
def addfield(self, pkt, s, val): | |
return s+struct.pack(self.fmt, self.i2m(pkt,val))[1:4] | |
def getfield(self, pkt, s): | |
return s[3:], self.m2i(pkt, struct.unpack(self.fmt, b"\x00"+s[:3])[0]) | |
class ShortField(Field): | |
def __init__(self, name, default): | |
Field.__init__(self, name, default, "H") | |
class LEShortField(Field): | |
def __init__(self, name, default): | |
Field.__init__(self, name, default, "<H") | |
class XShortField(ShortField): | |
def i2repr(self, pkt, x): | |
return lhex(self.i2h(pkt, x)) | |
class IntField(Field): | |
def __init__(self, name, default): | |
Field.__init__(self, name, default, "I") | |
class SignedIntField(Field): | |
def __init__(self, name, default): | |
Field.__init__(self, name, default, "i") | |
def randval(self): | |
return RandSInt() | |
class LEIntField(Field): | |
def __init__(self, name, default): | |
Field.__init__(self, name, default, "<I") | |
class LESignedIntField(Field): | |
def __init__(self, name, default): | |
Field.__init__(self, name, default, "<i") | |
def randval(self): | |
return RandSInt() | |
class XIntField(IntField): | |
def i2repr(self, pkt, x): | |
return lhex(self.i2h(pkt, x)) | |
class LongField(Field): | |
def __init__(self, name, default): | |
Field.__init__(self, name, default, "Q") | |
class LELongField(Field): | |
def __init__(self, name, default): | |
Field.__init__(self, name, default, "<Q") | |
class XLongField(LongField): | |
def i2repr(self, pkt, x): | |
return lhex(self.i2h(pkt, x)) | |
class IEEEFloatField(Field): | |
def __init__(self, name, default): | |
Field.__init__(self, name, default, "f") | |
class IEEEDoubleField(Field): | |
def __init__(self, name, default): | |
Field.__init__(self, name, default, "d") | |
class StrField(Field): | |
def __init__(self, name, default, fmt="H", remain=0): | |
Field.__init__(self,name,default,fmt) | |
self.remain = remain | |
#def i2h(self, pkt, x): | |
def i2repr(self, pkt, x): | |
try: | |
if type(x) is bytes: | |
x = x.decode('ascii') | |
except UnicodeDecodeError: | |
pass | |
return repr(x) | |
#def i2repr(self, pkt, x): | |
# return repr(self.i2h(pkt,x)) | |
def i2len(self, pkt, i): | |
return len(i) | |
def i2m(self, pkt, x): | |
if x is None: | |
x = b"" | |
elif type(x) is not bytes: | |
x=str(x).encode('ascii') | |
return x | |
def addfield(self, pkt, s, val): | |
return s+self.i2m(pkt, val) | |
def getfield(self, pkt, s): | |
if self.remain == 0: | |
return b"",self.m2i(pkt, s) | |
else: | |
return s[-self.remain:],self.m2i(pkt, s[:-self.remain]) | |
def randval(self): | |
return RandBin(RandNum(0,1200)) | |
class PacketField(StrField): | |
holds_packets=1 | |
def __init__(self, name, default, cls, remain=0): #is remain used somewhere? | |
StrField.__init__(self, name, default, remain=remain) | |
self.cls = cls | |
def i2m(self, pkt, i): | |
return bytes(i) | |
def m2i(self, pkt, m): | |
return self.cls(m) | |
def getfield(self, pkt, s): | |
i = self.m2i(pkt, s) | |
remain = b"" | |
if conf.padding_layer in i: | |
r = i[conf.padding_layer] | |
del(r.underlayer.payload) | |
remain = r.load | |
return remain,i | |
class PacketLenField(PacketField): | |
holds_packets=1 | |
def __init__(self, name, default, cls, length_from=None): | |
PacketField.__init__(self, name, default, cls) | |
self.length_from = length_from | |
def getfield(self, pkt, s): | |
l = self.length_from(pkt) | |
try: | |
i = self.m2i(pkt, s[:l]) | |
except Exception: | |
if conf.debug_dissector: | |
raise | |
i = conf.raw_layer(load=s[:l]) | |
return s[l:],i | |
class PacketListField(PacketField): | |
islist = 1 | |
holds_packets=1 | |
def __init__(self, name, default, cls, count_from=None, length_from=None): | |
if default is None: | |
default = [] # Create a new list for each instance | |
PacketField.__init__(self, name, default, cls) | |
self.count_from = count_from | |
self.length_from = length_from | |
def any2i(self, pkt, x): | |
if type(x) is not list: | |
return [x] | |
else: | |
return x | |
def i2count(self, pkt, val): | |
if type(val) is list: | |
return len(val) | |
return 1 | |
def i2len(self, pkt, val): | |
return sum( len(p) for p in val ) | |
def do_copy(self, x): | |
#return map(lambda p:p.copy(), x) | |
return [ i.copy() for i in x ] | |
def getfield(self, pkt, s): | |
c = l = None | |
if self.length_from is not None: | |
l = self.length_from(pkt) | |
elif self.count_from is not None: | |
c = self.count_from(pkt) | |
lst = [] | |
ret = b"" | |
remain = s | |
if l is not None: | |
remain,ret = s[:l],s[l:] | |
while remain: | |
if c is not None: | |
if c <= 0: | |
break | |
c -= 1 | |
try: | |
p = self.m2i(pkt,remain) | |
except Exception: | |
if conf.debug_dissector: | |
raise | |
p = conf.raw_layer(load=remain) | |
remain = b"" | |
else: | |
if conf.padding_layer in p: | |
pad = p[conf.padding_layer] | |
remain = pad.load | |
del(pad.underlayer.payload) | |
else: | |
remain = b"" | |
lst.append(p) | |
return remain+ret,lst | |
def addfield(self, pkt, s, val): | |
return s+b"".join([ bytes(i) for i in val ]) | |
class StrFixedLenField(StrField): | |
def __init__(self, name, default, length=None, length_from=None): | |
StrField.__init__(self, name, default) | |
self.length_from = length_from | |
if length is not None: | |
self.length_from = lambda pkt,length=length: length | |
def i2repr(self, pkt, v): | |
if type(v) is bytes: | |
v = v.rstrip(b"\0") | |
return repr(v) | |
def getfield(self, pkt, s): | |
l = self.length_from(pkt) | |
return s[l:], self.m2i(pkt,s[:l]) | |
def addfield(self, pkt, s, val): | |
l = self.length_from(pkt) | |
return s+struct.pack("%is"%l,self.i2m(pkt, val)) | |
def randval(self): | |
try: | |
l = self.length_from(None) | |
except: | |
l = RandNum(0,200) | |
return RandBin(l) | |
class StrFixedLenEnumField(StrFixedLenField): | |
def __init__(self, name, default, length=None, enum=None, length_from=None): | |
StrFixedLenField.__init__(self, name, default, length=length, length_from=length_from) | |
self.enum = enum | |
def i2repr(self, pkt, v): | |
r = v.rstrip("\0") | |
rr = repr(r) | |
if v in self.enum: | |
rr = "%s (%s)" % (rr, self.enum[v]) | |
elif r in self.enum: | |
rr = "%s (%s)" % (rr, self.enum[r]) | |
return rr | |
class NetBIOSNameField(StrFixedLenField): | |
def __init__(self, name, default, length=31): | |
StrFixedLenField.__init__(self, name, default, length) | |
def i2m(self, pkt, x): | |
l = self.length_from(pkt)//2 | |
if x is None: | |
x = b"" | |
x += b" "*(l) | |
x = x[:l] | |
#x = b"".join(map(lambda x: chr(0x41+(ord(x)>>4))+chr(0x41+(ord(x)&0xf)), x)) | |
x = b"".join([ bytes([0x41+(i>>4),0x41+(i&0xf)]) for i in x ]) | |
x = b" "+x | |
return x | |
def m2i(self, pkt, x): | |
x = x.strip(b"\x00").strip(b" ") | |
#return b"".join(map(lambda x,y: chr((((ord(x)-1)&0xf)<<4)+((ord(y)-1)&0xf)), x[::2],x[1::2])) | |
return b"".join(map(lambda x,y: bytes([(((x-1)&0xf)<<4)+((y-1)&0xf)]), x[::2],x[1::2])) | |
class StrLenField(StrField): | |
def __init__(self, name, default, fld=None, length_from=None): | |
StrField.__init__(self, name, default) | |
self.length_from = length_from | |
def getfield(self, pkt, s): | |
l = self.length_from(pkt) | |
return s[l:], self.m2i(pkt,s[:l]) | |
class FieldListField(Field): | |
islist=1 | |
def __init__(self, name, default, field, length_from=None, count_from=None): | |
if default is None: | |
default = [] # Create a new list for each instance | |
Field.__init__(self, name, default) | |
self.count_from = count_from | |
self.length_from = length_from | |
self.field = field | |
def i2count(self, pkt, val): | |
if type(val) is list: | |
return len(val) | |
return 1 | |
def i2len(self, pkt, val): | |
return sum( self.field.i2len(pkt,v) for v in val ) | |
def i2m(self, pkt, val): | |
if val is None: | |
val = [] | |
return val | |
def any2i(self, pkt, x): | |
if type(x) is not list: | |
return [x] | |
else: | |
return x | |
def addfield(self, pkt, s, val): | |
val = self.i2m(pkt, val) | |
for v in val: | |
s = self.field.addfield(pkt, s, v) | |
return s | |
def getfield(self, pkt, s): | |
c = l = None | |
if self.length_from is not None: | |
l = self.length_from(pkt) | |
elif self.count_from is not None: | |
c = self.count_from(pkt) | |
val = [] | |
ret=b"" | |
if l is not None: | |
s,ret = s[:l],s[l:] | |
while s: | |
if c is not None: | |
if c <= 0: | |
break | |
c -= 1 | |
s,v = self.field.getfield(pkt, s) | |
val.append(v) | |
return s+ret, val | |
class FieldLenField(Field): | |
def __init__(self, name, default, length_of=None, fmt = "H", count_of=None, adjust=lambda pkt,x:x, fld=None): | |
Field.__init__(self, name, default, fmt) | |
self.length_of=length_of | |
self.count_of=count_of | |
self.adjust=adjust | |
if fld is not None: | |
FIELD_LENGTH_MANAGEMENT_DEPRECATION(self.__class__.__name__) | |
self.length_of = fld | |
def i2m(self, pkt, x): | |
if x is None: | |
if self.length_of is not None: | |
fld,fval = pkt.getfield_and_val(self.length_of) | |
f = fld.i2len(pkt, fval) | |
else: | |
fld,fval = pkt.getfield_and_val(self.count_of) | |
f = fld.i2count(pkt, fval) | |
x = self.adjust(pkt,f) | |
return x | |
class StrNullField(StrField): | |
def addfield(self, pkt, s, val): | |
return s+self.i2m(pkt, val)+b"\x00" | |
def getfield(self, pkt, s): | |
l = s.find(b"\x00") | |
if l < 0: | |
#XXX \x00 not found | |
return "",s | |
return s[l+1:],self.m2i(pkt, s[:l]) | |
def randval(self): | |
return RandTermString(RandNum(0,1200),b"\x00") | |
class StrStopField(StrField): | |
def __init__(self, name, default, stop, additionnal=0): | |
Field.__init__(self, name, default) | |
self.stop=stop | |
self.additionnal=additionnal | |
def getfield(self, pkt, s): | |
l = s.find(self.stop) | |
if l < 0: | |
return b"",s | |
# raise Scapy_Exception,"StrStopField: stop value [%s] not found" %stop | |
l += len(self.stop)+self.additionnal | |
return s[l:],s[:l] | |
def randval(self): | |
return RandTermString(RandNum(0,1200),self.stop) | |
class LenField(Field): | |
def i2m(self, pkt, x): | |
if x is None: | |
x = len(pkt.payload) | |
return x | |
class BCDFloatField(Field): | |
def i2m(self, pkt, x): | |
return int(256*x) | |
def m2i(self, pkt, x): | |
return x/256.0 | |
class BitField(Field): | |
def __init__(self, name, default, size): | |
Field.__init__(self, name, default) | |
self.rev = size < 0 | |
self.size = abs(size) | |
def reverse(self, val): | |
if self.size == 16: | |
val = socket.ntohs(val) | |
elif self.size == 32: | |
val = socket.ntohl(val) | |
return val | |
def addfield(self, pkt, s, val): | |
val = self.i2m(pkt, val) | |
if type(s) is tuple: | |
s,bitsdone,v = s | |
else: | |
bitsdone = 0 | |
v = 0 | |
if self.rev: | |
val = self.reverse(val) | |
v <<= self.size | |
v |= val & ((1<<self.size) - 1) | |
bitsdone += self.size | |
while bitsdone >= 8: | |
bitsdone -= 8 | |
s = s+struct.pack("!B", v >> bitsdone) | |
v &= (1<<bitsdone)-1 | |
if bitsdone: | |
return s,bitsdone,v | |
else: | |
return s | |
def getfield(self, pkt, s): | |
if type(s) is tuple: | |
s,bn = s | |
else: | |
bn = 0 | |
# we don't want to process all the string | |
nb_bytes = (self.size+bn-1)//8 + 1 | |
w = s[:nb_bytes] | |
# split the substring byte by byte | |
bs = struct.unpack('!%dB' % nb_bytes , w) | |
b = 0 | |
for c in range(nb_bytes): | |
b |= int(bs[c]) << (nb_bytes-c-1)*8 | |
# get rid of high order bits | |
b &= (1 << (nb_bytes*8-bn)) - 1 | |
# remove low order bits | |
b = b >> (nb_bytes*8 - self.size - bn) | |
if self.rev: | |
b = self.reverse(b) | |
bn += self.size | |
s = s[bn//8:] | |
bn = bn%8 | |
b = self.m2i(pkt, b) | |
if bn: | |
return (s,bn),b | |
else: | |
return s,b | |
def randval(self): | |
return RandNum(0,2**self.size-1) | |
class BitFieldLenField(BitField): | |
def __init__(self, name, default, size, length_of=None, count_of=None, adjust=lambda pkt,x:x): | |
BitField.__init__(self, name, default, size) | |
self.length_of=length_of | |
self.count_of=count_of | |
self.adjust=adjust | |
def i2m(self, pkt, x): | |
#return FieldLenField.i2m.im_func(self, pkt, x) | |
return FieldLenField.i2m(self, pkt, x) | |
class XBitField(BitField): | |
def i2repr(self, pkt, x): | |
return lhex(self.i2h(pkt,x)) | |
class EnumField(Field): | |
def __init__(self, name, default, enum, fmt = "H"): | |
i2s = self.i2s = {} | |
s2i = self.s2i = {} | |
if type(enum) is list: | |
keys = range(len(enum)) | |
else: | |
keys = enum.keys() | |
if list(filter(lambda x: type(x) is str, keys)): | |
i2s,s2i = s2i,i2s | |
for k in keys: | |
i2s[k] = enum[k] | |
s2i[enum[k]] = k | |
Field.__init__(self, name, default, fmt) | |
def any2i_one(self, pkt, x): | |
if type(x) is str: | |
x = self.s2i[x] | |
return x | |
def i2repr_one(self, pkt, x): | |
if self not in conf.noenum and not isinstance(x,VolatileValue) and x in self.i2s: | |
return self.i2s[x] | |
return repr(x) | |
def any2i(self, pkt, x): | |
if type(x) is list: | |
return list(map(lambda z,pkt=pkt:self.any2i_one(pkt,z), x)) | |
else: | |
return self.any2i_one(pkt,x) | |
def i2repr(self, pkt, x): | |
if type(x) is list: | |
return list(map(lambda z,pkt=pkt:self.i2repr_one(pkt,z), x)) | |
else: | |
return self.i2repr_one(pkt,x) | |
class CharEnumField(EnumField): | |
def __init__(self, name, default, enum, fmt = "1s"): | |
EnumField.__init__(self, name, default, enum, fmt) | |
k = list(self.i2s.keys()) | |
if k and len(k[0]) != 1: | |
self.i2s,self.s2i = self.s2i,self.i2s | |
def any2i_one(self, pkt, x): | |
if len(x) != 1: | |
x = self.s2i[x] | |
return x | |
class BitEnumField(BitField,EnumField): | |
def __init__(self, name, default, size, enum): | |
EnumField.__init__(self, name, default, enum) | |
self.rev = size < 0 | |
self.size = abs(size) | |
def any2i(self, pkt, x): | |
return EnumField.any2i(self, pkt, x) | |
def i2repr(self, pkt, x): | |
return EnumField.i2repr(self, pkt, x) | |
class ShortEnumField(EnumField): | |
def __init__(self, name, default, enum): | |
EnumField.__init__(self, name, default, enum, "H") | |
class LEShortEnumField(EnumField): | |
def __init__(self, name, default, enum): | |
EnumField.__init__(self, name, default, enum, "<H") | |
class ByteEnumField(EnumField): | |
def __init__(self, name, default, enum): | |
EnumField.__init__(self, name, default, enum, "B") | |
class IntEnumField(EnumField): | |
def __init__(self, name, default, enum): | |
EnumField.__init__(self, name, default, enum, "I") | |
class SignedIntEnumField(EnumField): | |
def __init__(self, name, default, enum): | |
EnumField.__init__(self, name, default, enum, "i") | |
def randval(self): | |
return RandSInt() | |
class LEIntEnumField(EnumField): | |
def __init__(self, name, default, enum): | |
EnumField.__init__(self, name, default, enum, "<I") | |
class XShortEnumField(ShortEnumField): | |
def i2repr_one(self, pkt, x): | |
if self not in conf.noenum and not isinstance(x,VolatileValue) and x in self.i2s: | |
return self.i2s[x] | |
return lhex(x) | |
class MultiEnumField(EnumField): | |
def __init__(self, name, default, enum, depends_on, fmt = "H"): | |
self.depends_on = depends_on | |
self.i2s_multi = enum | |
self.s2i_multi = {} | |
self.s2i_all = {} | |
for m in enum: | |
self.s2i_multi[m] = s2i = {} | |
for k,v in enum[m].items(): | |
s2i[v] = k | |
self.s2i_all[v] = k | |
Field.__init__(self, name, default, fmt) | |
def any2i_one(self, pkt, x): | |
if type (x) is str: | |
v = self.depends_on(pkt) | |
if v in self.s2i_multi: | |
s2i = self.s2i_multi[v] | |
if x in s2i: | |
return s2i[x] | |
return self.s2i_all[x] | |
return x | |
def i2repr_one(self, pkt, x): | |
v = self.depends_on(pkt) | |
if v in self.i2s_multi: | |
return self.i2s_multi[v].get(x,x) | |
return x | |
class BitMultiEnumField(BitField,MultiEnumField): | |
def __init__(self, name, default, size, enum, depends_on): | |
MultiEnumField.__init__(self, name, default, enum) | |
self.rev = size < 0 | |
self.size = abs(size) | |
def any2i(self, pkt, x): | |
return MultiEnumField.any2i(self, pkt, x) | |
def i2repr(self, pkt, x): | |
return MultiEnumField.i2repr(self, pkt, x) | |
# Little endian long field | |
class LELongField(Field): | |
def __init__(self, name, default): | |
Field.__init__(self, name, default, "<Q") | |
# Little endian fixed length field | |
class LEFieldLenField(FieldLenField): | |
def __init__(self, name, default, length_of=None, fmt = "<H", count_of=None, adjust=lambda pkt,x:x, fld=None): | |
FieldLenField.__init__(self, name, default, length_of=length_of, fmt=fmt, fld=fld, adjust=adjust) | |
class FlagsField(BitField): | |
def __init__(self, name, default, size, names): | |
self.multi = type(names) is list or type(names) is tuple | |
if self.multi: | |
#self.names = map(lambda x:[x], names) | |
self.names = names | |
else: | |
self.names = names | |
BitField.__init__(self, name, default, size) | |
def any2i(self, pkt, x): | |
if type(x) is str: | |
if self.multi: | |
#x = map(lambda y:[y], x.split("+")) | |
x = x.split("+") | |
y = 0 | |
for i in x: | |
y |= 1 << self.names.index(i) | |
x = y | |
return x | |
def i2repr(self, pkt, x): | |
if type(x) is list or type(x) is tuple: | |
return repr(x) | |
if self.multi: | |
r = [] | |
else: | |
r = "" | |
i=0 | |
while x: | |
if x & 1: | |
if self.multi: | |
r += [ self.names[i] ] | |
else: | |
r += self.names[i] | |
i += 1 | |
x >>= 1 | |
if self.multi: | |
r = "+".join(r) | |
return r | |
def i2dict(self, pkt, x): # Potential compatibility issue for older code. And fields to conf.noenum for old behaviour | |
if self.multi: | |
r = {} | |
else: | |
return { self.names: x } | |
for i,n in enumerate(self.names): | |
if x & 1: | |
r[n] = True | |
else: | |
r[n] = False | |
x >>= 1 | |
return r | |
class FixedPointField(BitField): | |
def __init__(self, name, default, size, frac_bits=16): | |
self.frac_bits = frac_bits | |
BitField.__init__(self, name, default, size) | |
def any2i(self, pkt, val): | |
if val is None: | |
return val | |
ival = int(val) | |
fract = int( (val-ival) * 2**self.frac_bits ) | |
return (ival << self.frac_bits) | fract | |
def i2h(self, pkt, val): | |
int_part = val >> self.frac_bits | |
frac_part = val & (1 << self.frac_bits) - 1 | |
frac_part /= 2.0**self.frac_bits | |
return int_part+frac_part | |
def i2repr(self, pkt, val): | |
return self.i2h(pkt, val) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
from scapy.packet import Packet,bind_layers | |
from scapy.fields import * | |
class Example(Packet):#必ずPacketクラスを継承すること | |
""" | |
Scapyに自作プロトコルを実装するためのモデルです。 | |
Packetクラスは基本的に受信した1つのパケットデータのすべてを常に保持します。 | |
""" | |
name = "ProtocolName" #ls()で表示されるプロトコルの説明文。クラス名と一致させる必要はない。 | |
fields_desc = [ | |
ByteEnumField("code",0,{1:"code1!",2:"code2!!",3:"code3!!!",0xFF:"OMG"}), | |
] #前から順にバイト列が決まるので注意 | |
#Dissecting: | |
# NICやファイルからパケットを読み込んだときにhookする関数群 | |
# アナライザとしての役割を果たす | |
# 元となるのはdissect関数である。 | |
# def dissect(self, s): | |
# s = self.pre_dissect(s) | |
# s = self.do_dissect(s) | |
# s = self.post_dissect(s) | |
# payl,pad = self.extract_padding(s) | |
# self.do_dissect_payload(payl) | |
# if pad and conf.padding: | |
# self.add_payload(conf.padding_layer(pad)) | |
def pre_dissect(self, s): | |
""" | |
この関数はパケットの解析の準備を担います。オーバーライドしなければ何もしません。 | |
この関数ではFCSのチェックとか、パケットの長さのチェック、 | |
その他、パケットの解析の前にすべきことを行います。 | |
@param s str 受信したバイト列全文。 | |
@return s str do_dissectにたらい回すバイト列全文 | |
""" | |
return Packet.pre_dissect(self,s) | |
def do_dissect(self,s): | |
""" | |
この関数はパケットの解析の中核を担います。オーバーライドしなければ | |
fields_descに沿った解析がいい感じに自動でなされます。 | |
@param s str pre_dieectからたらい回されたバイト列。 | |
@return s str post_dissectにたらい回すバイト列全文 | |
""" | |
return Packet.do_dissect(self,s) | |
def post_dissect(self,s): | |
""" | |
この関数はパケットの解析の後始末を担います。オーバーライドしなければ何もしません。 | |
この関数ではデータ解析後の完全性のチェックや内包圧縮データの展開を行います。 | |
@param s str do_dieectからたらい回されたバイト列。 | |
@return s str extract_paddingにたらい回すバイト列全文 | |
""" | |
return Packet.post_dissect(self,s) | |
def extract_padding(self,s): | |
""" | |
この関数ではパケット解析の後発生した | |
ペイロード(next layer)とパディング(Padding)を2つに分断します。 | |
パディングが無ければpay,Noneをreturnすること。 | |
@param s str pre_dissectで返されたバイト列。(分割前のデータ) | |
@return pay str ペイロード部。guess_payload_classに渡される | |
@return pad str パディング部。Paddingクラスに渡される | |
""" | |
return Packet.extract_padding(self,s)# return pay,pad | |
def guess_payload_class(self,pay): | |
""" | |
この関数ではこのクラスのペイロード部のプロトコルを判定し、 | |
そのプロトコルを解析しうるクラスオブジェクトを返します。 | |
この関数の中で複雑なペイロード識別を行うことができます。 | |
Note: | |
TCPの宛先ポート番号80番はHTTPクラス、というように | |
このPacketのペイロードのプロトコルがこのPacketのfields_descの値によって | |
即座に決定する場合、bind_layersに頼るべきです。 | |
(オーバーライドしない場合の標準仕様です。) | |
@param pay str extract_paddingで渡されたペイロード。 | |
@return pktClass class 推測判定したペイロードプロトコルクラス。Packetクラスを継承していること。 | |
""" | |
return Packet.guess_payload_class(self,pay) #bind_layers関数による紐付けに頼ります。 | |
def default_payload_class(self,pay): | |
""" | |
guess_payload_classでペイロードのプロトコルが推測できない場合に呼び出されます。 | |
ただし、同関数をあなたがオーバーライドしていた場合、関数の中でこの関数を | |
処理の最後にreturnとして明示的に呼び出す必要があります。 | |
本来Rawクラスがデフォルトのプロトコルですが、この関数をオーバーロードすることで | |
異なるデフォルトのプロトコルを指定できます。あまり使いません。 | |
@param pay str extract_paddingで渡されたペイロード。 | |
@return pktClass class 推測判定したペイロードプロトコルクラス。Packetクラスを継承していること。 | |
""" | |
return Packet.default_payload_class(self,pay) | |
#Building: | |
# パケットを送信するときなど、パケットを完成させるための関数群 | |
# 与えられた文字や数字をmachine語(バイナリ)になおす。 | |
# 元となるのはbuild関数である。 | |
# ****最も重要なのはpost_build関数である。 | |
# def build(self): | |
# p = self.do_build() | |
# p += self.build_padding() | |
# p = self.build_done(p) | |
# return p | |
# def do_build(self): | |
# if not self.explicit: | |
# self = self.__iter__().next() | |
# pkt = self.self_build() | |
# for t in self.post_transforms: | |
# pkt = t(pkt) | |
# pay = self.do_build_payload() | |
# p = self.post_build(pkt,pay) | |
# return p | |
def self_build(self, field_post_list=[]): | |
""" | |
デフォルトで、パケットを構成するfields_descの各フィールドを順にi2mしていく。 | |
その後、fuzzing用のtransform系関数が呼ばれ、晴れて一つのpktになる。 | |
@param field_post_list list 知らん | |
@return pkt str 各フィールドをi2mした後のパケットのバイナリ文字列 | |
""" | |
return Packet.self_build(self,field_post_list) | |
def do_build_payload(self): | |
""" | |
ペイロードに対してdo_build_payload関数を呼び出す。 | |
デフォルトでペイロードで連鎖する。 | |
@return pay str 連鎖して完成した自レイヤ以降のペイロード | |
""" | |
return Packet.do_build_payload(self) | |
def post_build(self, pkt, pay): | |
""" | |
自レイヤ以降すべてのペイロードが計算された後呼ばれる。 | |
pktはこのクラスのテキストデータであり、payはペイロード部である。 | |
この関数ではCRCなどのチェックサムを計算したり、 | |
lengthを代入するフィールドの計算をするべきである。 | |
大抵この関数を実装しないとロクに送受信できない。 | |
この関数の時点でselfには完成間近のパケットクラスが入っているので | |
fields_descのフィールド名でフィールドにアクセスできる。 | |
各フィールドの値で埋められていないところをこの関数で埋めるが、 | |
その際selfは編集せず、pktの方を編集すること。 | |
payは基本編集しないはず。 | |
@param pkt str パケットデータ。 | |
@param pay str パケットデータ。最後にpkt+payするために使う。 | |
@return p str パケットデータ。多くの場合pkt+payをreturnするだろう。 | |
""" | |
self.show() | |
print "===pkt===" | |
print pkt | |
print "===pay===" | |
print pay | |
return Packet.post_build(self, pkt, pay) | |
def build_padding(self): | |
""" | |
ペイロードに対してbuild_padding関数を呼び出す。 | |
デフォルトでペイロードで連鎖する。 | |
@return pad str 連鎖して完成した自レイヤ以降のパディング | |
""" | |
return Packet.build_padding(self) | |
def build_done(self,pkt): | |
""" | |
ペイロードに対してbuild_done関数を呼び出す。 | |
デフォルトでペイロードで連鎖する。 | |
@return pkt str 最終的に完成した全パケットデータ | |
""" | |
return Packet.build_done(self,pkt) | |
i=Example() | |
i.code=0xFF | |
i.show() | |
from scapy.main import interact | |
interact(mydict=locals()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## This file is part of Scapy | |
## See http://www.secdev.org/projects/scapy for more informations | |
## Copyright (C) Philippe Biondi <[email protected]> | |
## This program is published under a GPLv2 license | |
""" | |
Packet class. Binding mechanism. fuzz() method. | |
""" | |
import time,itertools,os | |
import sys,traceback | |
import copy | |
from .fields import StrField,ConditionalField,Emph,PacketListField | |
from .config import conf | |
from .base_classes import BasePacket,Gen,SetGen,Packet_metaclass,NewDefaultValues | |
from .volatile import VolatileValue | |
from .utils import import_hexcap,tex_escape,colgen,get_temp_file | |
from .error import Scapy_Exception,log_runtime, warning | |
import subprocess | |
try: | |
import pyx | |
except ImportError: | |
pass | |
class RawVal: | |
def __init__(self, val=b""): | |
assert type(val) == bytes | |
self.val = val | |
def __str__(self): | |
return str(self.val) | |
def __repr__(self): | |
return "<RawVal [%r]>" % self.val | |
def bytes(self): | |
return self.val | |
class Packet(BasePacket, metaclass = Packet_metaclass): | |
name=None | |
fields_desc = [] | |
aliastypes = [] | |
overload_fields = {} | |
underlayer = None | |
sent_time = None | |
payload_guess = [] | |
initialized = 0 | |
show_indent=1 | |
explicit = 0 | |
raw_packet_cache = None | |
@classmethod | |
def from_hexcap(cls): | |
return cls(import_hexcap()) | |
@classmethod | |
def upper_bonds(self): | |
for fval,upper in self.payload_guess: | |
print("%-20s %s" % (upper.__name__, ", ".join("%-12s" % ("%s=%r"%i) for i in fval.items()))) | |
@classmethod | |
def lower_bonds(self): | |
for lower,fval in self.overload_fields.items(): | |
print("%-20s %s" % (lower.__name__, ", ".join("%-12s" % ("%s=%r"%i) for i in fval.items()))) | |
def __init__(self, _pkt=b"", post_transform=None, _internal=0, _underlayer=None, **fields): | |
self.time = time.time() | |
self.sent_time = 0 | |
if self.name is None: | |
self.name = self.__class__.__name__ | |
self.aliastypes = [ self.__class__ ] + self.aliastypes | |
self.default_fields = {} | |
self.overloaded_fields = {} | |
self.fields={} | |
self.fieldtype={} | |
self.packetfields=[] | |
self.__dict__["payload"] = NoPayload() | |
self.init_fields() | |
self.underlayer = _underlayer | |
self.initialized = 1 | |
self.original = _pkt | |
if _pkt: | |
self.dissect(_pkt) | |
if not _internal: | |
self.dissection_done(self) | |
for f in fields.keys(): | |
self.fields[f] = self.get_field(f).any2i(self,fields[f]) | |
if type(post_transform) is list: | |
self.post_transforms = post_transform | |
elif post_transform is None: | |
self.post_transforms = [] | |
else: | |
self.post_transforms = [post_transform] | |
def init_fields(self): | |
self.do_init_fields(self.fields_desc) | |
def do_init_fields(self, flist): | |
for f in flist: | |
self.default_fields[f.name] = copy.deepcopy(f.default) | |
self.fieldtype[f.name] = f | |
if f.holds_packets: | |
self.packetfields.append(f) | |
def dissection_done(self,pkt): | |
"""DEV: will be called after a dissection is completed""" | |
self.post_dissection(pkt) | |
self.payload.dissection_done(pkt) | |
def post_dissection(self, pkt): | |
"""DEV: is called after the dissection of the whole packet""" | |
pass | |
def get_field(self, fld): | |
"""DEV: returns the field instance from the name of the field""" | |
return self.fieldtype[fld] | |
def add_payload(self, payload): | |
if payload is None: | |
return | |
elif not isinstance(self.payload, NoPayload): | |
self.payload.add_payload(payload) | |
else: | |
if isinstance(payload, Packet): | |
self.__dict__["payload"] = payload | |
payload.add_underlayer(self) | |
for t in self.aliastypes: | |
if t in payload.overload_fields: | |
self.overloaded_fields = payload.overload_fields[t] | |
break | |
#elif type(payload) is str: | |
elif type(payload) is bytes: | |
self.__dict__["payload"] = conf.raw_layer(load=payload) | |
else: | |
raise TypeError("payload must be either 'Packet' or 'bytes', not [%s]" % repr(payload)) | |
def remove_payload(self): | |
self.payload.remove_underlayer(self) | |
self.__dict__["payload"] = NoPayload() | |
self.overloaded_fields = {} | |
def add_underlayer(self, underlayer): | |
self.underlayer = underlayer | |
def remove_underlayer(self,other): | |
self.underlayer = None | |
def copy(self): | |
"""Returns a deep copy of the instance.""" | |
clone = self.__class__() | |
clone.fields = self.fields.copy() | |
for k in clone.fields: | |
clone.fields[k] = self.get_field(k).do_copy(clone.fields[k]) | |
clone.default_fields = self.default_fields.copy() | |
clone.overloaded_fields = self.overloaded_fields.copy() | |
clone.overload_fields = self.overload_fields.copy() | |
clone.underlayer = self.underlayer | |
clone.explicit = self.explicit | |
clone.raw_packet_cache = self.raw_packet_cache | |
clone.post_transforms = self.post_transforms[:] | |
clone.__dict__["payload"] = self.payload.copy() | |
clone.payload.add_underlayer(clone) | |
return clone | |
def getfieldval(self, attr): | |
if attr in self.fields: | |
return self.fields[attr] | |
if attr in self.overloaded_fields: | |
return self.overloaded_fields[attr] | |
if attr in self.default_fields: | |
return self.default_fields[attr] | |
return self.payload.getfieldval(attr) | |
def getbyteval(self, attr): | |
fld,v = self.getfield_and_val(attr) | |
return fld.i2b(self, v) | |
def getstrval(self, attr): | |
fld,v = self.getfield_and_val(attr) | |
return fld.i2repr(self, v) | |
def getdictval(self, attr): | |
fld,v = self.getfield_and_val(attr) | |
return fld.i2dict(self, v) | |
def getfield_and_val(self, attr): | |
if attr in self.fields: | |
return self.get_field(attr),self.fields[attr] | |
if attr in self.overloaded_fields: | |
return self.get_field(attr),self.overloaded_fields[attr] | |
if attr in self.default_fields: | |
return self.get_field(attr),self.default_fields[attr] | |
return self.payload.getfield_and_val(attr) | |
def __getattr__(self, attr): | |
if self.initialized: | |
fld,v = self.getfield_and_val(attr) | |
if fld is not None: | |
return fld.i2h(self, v) | |
return v | |
raise AttributeError(attr) | |
def setfieldval(self, attr, val): | |
if attr in self.default_fields: | |
fld = self.get_field(attr) | |
if fld is None: | |
any2i = lambda x,y: y | |
else: | |
any2i = fld.any2i | |
self.fields[attr] = any2i(self, val) | |
self.explicit = 0 | |
self.raw_packet_cache = None | |
elif attr == "payload": | |
self.remove_payload() | |
self.add_payload(val) | |
else: | |
self.payload.setfieldval(attr,val) | |
def __setattr__(self, attr, val): | |
if self.initialized: | |
try: | |
self.setfieldval(attr,val) | |
except AttributeError: | |
pass | |
else: | |
return | |
self.__dict__[attr] = val | |
def delfieldval(self, attr): | |
if attr in self.fields: | |
del(self.fields[attr]) | |
self.explicit = 0 # in case a default value must be explicited | |
self.raw_packet_cache = None | |
elif attr in self.default_fields: | |
pass | |
elif attr == "payload": | |
self.remove_payload() | |
else: | |
self.payload.delfieldval(attr) | |
def __delattr__(self, attr): | |
if self.initialized: | |
try: | |
self.delfieldval(attr) | |
except AttributeError: | |
pass | |
else: | |
return | |
if attr in self.__dict__: | |
del(self.__dict__[attr]) | |
else: | |
raise AttributeError(attr) | |
def __repr__(self): | |
s = "" | |
ct = conf.color_theme | |
for f in self.fields_desc: | |
if isinstance(f, ConditionalField) and not f._evalcond(self): | |
continue | |
if f.name in self.fields: | |
val = f.i2repr(self, self.fields[f.name]) | |
elif f.name in self.overloaded_fields: | |
val = f.i2repr(self, self.overloaded_fields[f.name]) | |
else: | |
continue | |
if isinstance(f, Emph) or f in conf.emph: | |
ncol = ct.emph_field_name | |
vcol = ct.emph_field_value | |
else: | |
ncol = ct.field_name | |
vcol = ct.field_value | |
s += " %s%s%s" % (ncol(f.name), | |
ct.punct("="), | |
vcol(val)) | |
return "%s%s %s %s%s%s"% (ct.punct("<"), | |
ct.layer_name(self.__class__.__name__), | |
s, | |
ct.punct("|"), | |
repr(self.payload), | |
ct.punct(">")) | |
#def __str__(self): | |
#TODO3 FIX | |
def __str__(self): | |
warning("Unless called manually, this could indicate deprecated use. Should be changed to bytes(self)") | |
return repr(bytes(self)) | |
def __bytes__(self): | |
return self.build() | |
def __div__(self, other): | |
if isinstance(other, Packet): | |
cloneA = self.copy() | |
cloneB = other.copy() | |
cloneA.add_payload(cloneB) | |
return cloneA | |
elif type(other) is str: | |
return self/conf.raw_layer(load=other.encode('ascii')) | |
elif type(other) is bytes: | |
return self/conf.raw_layer(load=other) | |
else: | |
return other.__rdiv__(self) | |
__truediv__ = __div__ | |
def __rdiv__(self, other): | |
if type(other) is str: | |
return conf.raw_layer(load=other.encode('ascii'))/self | |
if type(other) is bytes: | |
return conf.raw_layer(load=other)/self | |
else: | |
raise TypeError | |
__rtruediv__ = __rdiv__ | |
def __mul__(self, other): | |
if type(other) is int: | |
return [self]*other | |
else: | |
raise TypeError | |
def __rmul__(self,other): | |
return self.__mul__(other) | |
def __nonzero__(self): | |
return True | |
def __len__(self): | |
return len(bytes(self)) | |
def self_build(self, field_pos_list=None): | |
if self.raw_packet_cache is not None: | |
return self.raw_packet_cache | |
p=b"" | |
for f in self.fields_desc: | |
#print(f.name) | |
val = self.getfieldval(f.name) | |
if isinstance(val, RawVal): | |
#sval = str(val) | |
sval = bytes(val) | |
p += sval | |
if field_pos_list is not None: | |
field_pos_list.append( (f.name, sval.encode("string_escape"), len(p), len(sval) ) ) | |
else: | |
p = f.addfield(self, p, val) | |
return p | |
def do_build_payload(self): | |
return self.payload.do_build() | |
def do_build(self): | |
if not self.explicit: | |
self = next(self.__iter__()) | |
pkt = self.self_build() | |
for t in self.post_transforms: | |
pkt = t(pkt) | |
pay = self.do_build_payload() | |
p = self.post_build(pkt,pay) | |
return p | |
def build_padding(self): | |
return self.payload.build_padding() | |
def build(self): | |
p = self.do_build() | |
p += self.build_padding() | |
p = self.build_done(p) | |
return p | |
def post_build(self, pkt, pay): | |
"""DEV: called right after the current layer is build.""" | |
return pkt+pay | |
def build_done(self, p): | |
return self.payload.build_done(p) | |
def do_build_ps(self): | |
p=b"" | |
pl = [] | |
q=b"" | |
for f in self.fields_desc: | |
if isinstance(f, ConditionalField) and not f._evalcond(self): | |
continue | |
p = f.addfield(self, p, self.getfieldval(f.name) ) | |
if type(p) is bytes: | |
r = p[len(q):] | |
q = p | |
else: | |
r = b"" | |
pl.append( (f, f.i2repr(self,self.getfieldval(f.name)), r) ) | |
pkt,lst = self.payload.build_ps(internal=1) | |
p += pkt | |
lst.append( (self, pl) ) | |
return p,lst | |
def build_ps(self,internal=0): | |
p,lst = self.do_build_ps() | |
# if not internal: | |
# pkt = self | |
# while pkt.haslayer(conf.padding_layer): | |
# pkt = pkt.getlayer(conf.padding_layer) | |
# lst.append( (pkt, [ ("loakjkjd", pkt.load, pkt.load) ] ) ) | |
# p += pkt.load | |
# pkt = pkt.payload | |
return p,lst | |
def psdump(self, filename=None, **kargs): | |
"""psdump(filename=None, layer_shift=0, rebuild=1) | |
Creates an EPS file describing a packet. If filename is not provided a temporary file is created and gs is called.""" | |
canvas = self.canvas_dump(**kargs) | |
if filename is None: | |
fname = get_temp_file(autoext=".eps") | |
canvas.writeEPSfile(fname) | |
subprocess.Popen([conf.prog.psreader, fname+".eps"]) | |
else: | |
canvas.writeEPSfile(filename) | |
def pdfdump(self, filename=None, **kargs): | |
"""pdfdump(filename=None, layer_shift=0, rebuild=1) | |
Creates a PDF file describing a packet. If filename is not provided a temporary file is created and xpdf is called.""" | |
canvas = self.canvas_dump(**kargs) | |
if filename is None: | |
fname = get_temp_file(autoext=".pdf") | |
canvas.writePDFfile(fname) | |
subprocess.Popen([conf.prog.pdfreader, fname+".pdf"]) | |
else: | |
canvas.writePDFfile(filename) | |
def canvas_dump(self, layer_shift=0, rebuild=1): | |
canvas = pyx.canvas.canvas() | |
if rebuild: | |
p,t = self.__class__(bytes(self)).build_ps() | |
else: | |
p,t = self.build_ps() | |
YTXT=len(t) | |
for n,l in t: | |
YTXT += len(l) | |
YTXT = float(YTXT) | |
YDUMP=YTXT | |
XSTART = 1 | |
XDSTART = 10 | |
y = 0.0 | |
yd = 0.0 | |
xd = 0 | |
XMUL= 0.55 | |
YMUL = 0.4 | |
backcolor=colgen(0.6, 0.8, 1.0, trans=pyx.color.rgb) | |
forecolor=colgen(0.2, 0.5, 0.8, trans=pyx.color.rgb) | |
# backcolor=makecol(0.376, 0.729, 0.525, 1.0) | |
def hexstr(x): | |
s = [] | |
for c in x: | |
s.append("%02x" % c) | |
return " ".join(s) | |
def make_dump_txt(x,y,txt): | |
return pyx.text.text(XDSTART+x*XMUL, (YDUMP-y)*YMUL, r"\tt{%s}"%hexstr(txt), [pyx.text.size.Large]) | |
def make_box(o): | |
return pyx.box.rect(o.left(), o.bottom(), o.width(), o.height(), relcenter=(0.5,0.5)) | |
def make_frame(lst): | |
if len(lst) == 1: | |
b = lst[0].bbox() | |
b.enlarge(pyx.unit.u_pt) | |
return b.path() | |
else: | |
fb = lst[0].bbox() | |
fb.enlarge(pyx.unit.u_pt) | |
lb = lst[-1].bbox() | |
lb.enlarge(pyx.unit.u_pt) | |
if len(lst) == 2 and fb.left() > lb.right(): | |
return pyx.path.path(pyx.path.moveto(fb.right(), fb.top()), | |
pyx.path.lineto(fb.left(), fb.top()), | |
pyx.path.lineto(fb.left(), fb.bottom()), | |
pyx.path.lineto(fb.right(), fb.bottom()), | |
pyx.path.moveto(lb.left(), lb.top()), | |
pyx.path.lineto(lb.right(), lb.top()), | |
pyx.path.lineto(lb.right(), lb.bottom()), | |
pyx.path.lineto(lb.left(), lb.bottom())) | |
else: | |
# XXX | |
gb = lst[1].bbox() | |
if gb != lb: | |
gb.enlarge(pyx.unit.u_pt) | |
kb = lst[-2].bbox() | |
if kb != gb and kb != lb: | |
kb.enlarge(pyx.unit.u_pt) | |
return pyx.path.path(pyx.path.moveto(fb.left(), fb.top()), | |
pyx.path.lineto(fb.right(), fb.top()), | |
pyx.path.lineto(fb.right(), kb.bottom()), | |
pyx.path.lineto(lb.right(), kb.bottom()), | |
pyx.path.lineto(lb.right(), lb.bottom()), | |
pyx.path.lineto(lb.left(), lb.bottom()), | |
pyx.path.lineto(lb.left(), gb.top()), | |
pyx.path.lineto(fb.left(), gb.top()), | |
pyx.path.closepath(),) | |
def make_dump(s, shift=0, y=0, col=None, bkcol=None, larg=16): | |
c = pyx.canvas.canvas() | |
tlist = [] | |
while s: | |
dmp,s = s[:larg-shift],s[larg-shift:] | |
txt = make_dump_txt(shift, y, dmp) | |
tlist.append(txt) | |
shift += len(dmp) | |
if shift >= 16: | |
shift = 0 | |
y += 1 | |
if col is None: | |
col = pyx.color.rgb.red | |
if bkcol is None: | |
col = pyx.color.rgb.white | |
c.stroke(make_frame(tlist),[col,pyx.deco.filled([bkcol]),pyx.style.linewidth.Thick]) | |
for txt in tlist: | |
c.insert(txt) | |
return c, tlist[-1].bbox(), shift, y | |
last_shift,last_y=0,0.0 | |
while t: | |
bkcol = next(backcolor) | |
proto,fields = t.pop() | |
y += 0.5 | |
pt = pyx.text.text(XSTART, (YTXT-y)*YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % proto.name, [ pyx.text.size.Large]) | |
y += 1 | |
ptbb=pt.bbox() | |
ptbb.enlarge(pyx.unit.u_pt*2) | |
canvas.stroke(ptbb.path(),[pyx.color.rgb.black, pyx.deco.filled([bkcol])]) | |
canvas.insert(pt) | |
for fname, fval, fdump in fields: | |
col = next(forecolor) | |
ft = pyx.text.text(XSTART, (YTXT-y)*YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(fname.name)) | |
if isinstance(fval, str): | |
if len(fval) > 18: | |
fval = fval[:18]+"[...]" | |
else: | |
fval="" | |
vt = pyx.text.text(XSTART+3, (YTXT-y)*YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(fval)) | |
y += 1.0 | |
if fdump: | |
dt,target,last_shift,last_y = make_dump(fdump, last_shift, last_y, col, bkcol) | |
dtb = dt.bbox() | |
dtb=target | |
vtb = vt.bbox() | |
bxvt = make_box(vtb) | |
bxdt = make_box(dtb) | |
dtb.enlarge(pyx.unit.u_pt) | |
try: | |
if yd < 0: | |
cnx = pyx.connector.curve(bxvt,bxdt,absangle1=0, absangle2=-90) | |
else: | |
cnx = pyx.connector.curve(bxvt,bxdt,absangle1=0, absangle2=90) | |
except: | |
pass | |
else: | |
canvas.stroke(cnx,[pyx.style.linewidth.thin,pyx.deco.earrow.small,col]) | |
canvas.insert(dt) | |
canvas.insert(ft) | |
canvas.insert(vt) | |
last_y += layer_shift | |
return canvas | |
def extract_padding(self, s): | |
"""DEV: to be overloaded to extract current layer's padding. Return a couple of strings (actual layer, padding)""" | |
return s,None | |
def post_dissect(self, s): | |
"""DEV: is called right after the current layer has been dissected""" | |
return s | |
def pre_dissect(self, s): | |
"""DEV: is called right before the current layer is dissected""" | |
return s | |
def do_dissect(self, s): | |
flist = self.fields_desc[:] | |
flist.reverse() | |
raw = s | |
while s and flist: | |
f = flist.pop() | |
#print(f, end = " = ") | |
s,fval = f.getfield(self, s) | |
#print('fval') | |
self.fields[f.name] = fval | |
assert(raw.endswith(s)) | |
if s: | |
self.raw_packet_cache = raw[:-len(s)] | |
else: | |
self.raw_packet_cache = raw | |
self.explicit = 1 | |
return s | |
def do_dissect_payload(self, s): | |
if s: | |
cls = self.guess_payload_class(s) | |
try: | |
p = cls(s, _internal=1, _underlayer=self) | |
except KeyboardInterrupt: | |
raise | |
except: | |
if conf.debug_dissector: | |
if isinstance(cls,type) and issubclass(cls,Packet): | |
log_runtime.error("%s dissector failed" % cls.name) | |
else: | |
log_runtime.error("%s.guess_payload_class() returned [%s]" % (self.__class__.__name__,repr(cls))) | |
if cls is not None: | |
raise | |
p = conf.raw_layer(s, _internal=1, _underlayer=self) | |
self.add_payload(p) | |
def dissect(self, s): | |
s = self.pre_dissect(s) | |
s = self.do_dissect(s) | |
s = self.post_dissect(s) | |
payl,pad = self.extract_padding(s) | |
self.do_dissect_payload(payl) | |
if pad and conf.padding: | |
self.add_payload(conf.padding_layer(pad)) | |
def guess_payload_class(self, payload): | |
"""DEV: Guesses the next payload class from layer bonds. Can be overloaded to use a different mechanism.""" | |
for t in self.aliastypes: | |
for fval, cls in t.payload_guess: | |
ok = 1 | |
for k in fval.keys(): | |
if not hasattr(self, k) or fval[k] != self.getfieldval(k): | |
ok = 0 | |
break | |
if ok: | |
return cls | |
return self.default_payload_class(payload) | |
def default_payload_class(self, payload): | |
"""DEV: Returns the default payload class if nothing has been found by the guess_payload_class() method.""" | |
return conf.raw_layer | |
def hide_defaults(self): | |
"""Removes fields' values that are the same as default values.""" | |
for k in list(self.fields.keys()): | |
if k in self.default_fields: | |
if self.default_fields[k] == self.fields[k]: | |
del(self.fields[k]) | |
self.payload.hide_defaults() | |
def clone_with(self, payload=None, **kargs): | |
pkt = self.__class__() | |
pkt.explicit = 1 | |
pkt.fields = kargs | |
pkt.time = self.time | |
pkt.underlayer = self.underlayer | |
pkt.overload_fields = self.overload_fields.copy() | |
pkt.post_transforms = self.post_transforms | |
if payload is not None: | |
pkt.add_payload(payload) | |
return pkt | |
def __iter__(self): | |
def loop(todo, done, self=self): | |
if todo: | |
eltname = todo.pop() | |
elt = self.getfieldval(eltname) | |
if not isinstance(elt, Gen): | |
if self.get_field(eltname).islist: | |
elt = SetGen([elt]) | |
else: | |
elt = SetGen(elt) | |
for e in elt: | |
done[eltname]=e | |
for x in loop(todo[:], done): | |
yield x | |
else: | |
if isinstance(self.payload,NoPayload): | |
payloads = [None] | |
else: | |
payloads = self.payload | |
for payl in payloads: | |
done2=done.copy() | |
for k in done2: | |
if isinstance(done2[k], VolatileValue): | |
done2[k] = done2[k]._fix() | |
pkt = self.clone_with(payload=payl, **done2) | |
yield pkt | |
if self.explicit: | |
todo = [] | |
done = self.fields | |
else: | |
todo = [ k for (k,v) in itertools.chain(self.default_fields.items(), | |
self.overloaded_fields.items()) | |
if isinstance(v, VolatileValue) ] + list(self.fields.keys()) | |
done = {} | |
return loop(todo, done) | |
def __gt__(self, other): | |
"""True if other is an answer from self (self ==> other).""" | |
if isinstance(other, Packet): | |
return other < self | |
elif type(other) is str: | |
return 1 | |
else: | |
raise TypeError((self, other)) | |
def __lt__(self, other): | |
"""True if self is an answer from other (other ==> self).""" | |
if isinstance(other, Packet): | |
return self.answers(other) | |
elif type(other) is str: | |
return 1 | |
else: | |
raise TypeError((self, other)) | |
def __eq__(self, other): | |
if not isinstance(other, self.__class__): | |
return False | |
for f in self.fields_desc: | |
if f not in other.fields_desc: | |
return False | |
if self.getfieldval(f.name) != other.getfieldval(f.name): | |
return False | |
return self.payload == other.payload | |
def __ne__(self, other): | |
return not self.__eq__(other) | |
def hashret(self): | |
"""DEV: returns a string that has the same value for a request and its answer.""" | |
return self.payload.hashret() | |
def answers(self, other): | |
"""DEV: true if self is an answer from other""" | |
if other.__class__ == self.__class__: | |
return self.payload.answers(other.payload) | |
return 0 | |
def haslayer(self, cls): | |
"""true if self has a layer that is an instance of cls. Superseded by "cls in self" syntax.""" | |
if self.__class__ == cls or self.__class__.__name__ == cls: | |
return 1 | |
for f in self.packetfields: | |
fvalue_gen = self.getfieldval(f.name) | |
if fvalue_gen is None: | |
continue | |
if not f.islist: | |
fvalue_gen = SetGen(fvalue_gen,_iterpacket=0) | |
for fvalue in fvalue_gen: | |
if isinstance(fvalue, Packet): | |
ret = fvalue.haslayer(cls) | |
if ret: | |
return ret | |
return self.payload.haslayer(cls) | |
def getlayer(self, cls, nb=1, _track=None): | |
"""Return the nb^th layer that is an instance of cls.""" | |
if type(cls) is int: | |
nb = cls+1 | |
cls = None | |
if type(cls) is str and "." in cls: | |
ccls,fld = cls.split(".",1) | |
else: | |
ccls,fld = cls,None | |
if cls is None or self.__class__ == cls or self.__class__.name == ccls: | |
if nb == 1: | |
if fld is None: | |
return self | |
else: | |
return self.getfieldval(fld) | |
else: | |
nb -=1 | |
for f in self.packetfields: | |
fvalue_gen = self.getfieldval(f.name) | |
if fvalue_gen is None: | |
continue | |
if not f.islist: | |
fvalue_gen = SetGen(fvalue_gen,_iterpacket=0) | |
for fvalue in fvalue_gen: | |
if isinstance(fvalue, Packet): | |
track=[] | |
ret = fvalue.getlayer(cls, nb, _track=track) | |
if ret is not None: | |
return ret | |
nb = track[0] | |
return self.payload.getlayer(cls,nb,_track=_track) | |
def firstlayer(self): | |
q = self | |
while q.underlayer is not None: | |
q = q.underlayer | |
return q | |
def __getitem__(self, cls): | |
if type(cls) is slice: | |
lname = cls.start | |
if cls.stop: | |
ret = self.getlayer(cls.start, cls.stop) | |
else: | |
ret = self.getlayer(cls.start) | |
if ret is None and cls.step is not None: | |
ret = cls.step | |
else: | |
lname=cls | |
ret = self.getlayer(cls) | |
if ret is None: | |
if type(lname) is Packet_metaclass: | |
lname = lname.__name__ | |
elif type(lname) is not str: | |
lname = repr(lname) | |
raise IndexError("Layer [%s] not found" % lname) | |
return ret | |
def __delitem__(self, cls): | |
del(self[cls].underlayer.payload) | |
def __setitem__(self, cls, val): | |
self[cls].underlayer.payload = val | |
def __contains__(self, cls): | |
""""cls in self" returns true if self has a layer which is an instance of cls.""" | |
return self.haslayer(cls) | |
def route(self): | |
return (None,None,None) | |
def fragment(self, *args, **kargs): | |
return self.payload.fragment(*args, **kargs) | |
def display(self,*args,**kargs): # Deprecated. Use show() | |
"""Deprecated. Use show() method.""" | |
self.show(*args,**kargs) | |
def show(self, indent=3, lvl="", label_lvl=""): | |
"""Prints a hierarchical view of the packet. "indent" gives the size of indentation for each layer.""" | |
ct = conf.color_theme | |
print("%s%s %s %s" % (label_lvl, | |
ct.punct("###["), | |
ct.layer_name(self.name), | |
ct.punct("]###"))) | |
for f in self.fields_desc: | |
if isinstance(f, ConditionalField) and not f._evalcond(self): | |
continue | |
if isinstance(f, Emph) or f in conf.emph: | |
ncol = ct.emph_field_name | |
vcol = ct.emph_field_value | |
else: | |
ncol = ct.field_name | |
vcol = ct.field_value | |
fvalue = self.getfieldval(f.name) | |
if isinstance(fvalue, Packet) or (f.islist and f.holds_packets and type(fvalue) is list): | |
print("%s \\%-10s\\" % (label_lvl+lvl, ncol(f.name))) | |
fvalue_gen = SetGen(fvalue,_iterpacket=0) | |
for fvalue in fvalue_gen: | |
fvalue.show(indent=indent, label_lvl=label_lvl+lvl+" |") | |
else: | |
begn = "%s %-10s%s " % (label_lvl+lvl, | |
ncol(f.name), | |
ct.punct("="),) | |
reprval = f.i2repr(self,fvalue) | |
if type(reprval) is str: | |
reprval = reprval.replace("\n", "\n"+" "*(len(label_lvl) | |
+len(lvl) | |
+len(f.name) | |
+4)) | |
print("%s%s" % (begn,vcol(reprval))) | |
self.payload.show(indent=indent, lvl=lvl+(" "*indent*self.show_indent), label_lvl=label_lvl) | |
def show2(self): | |
"""Prints a hierarchical view of an assembled version of the packet, so that automatic fields are calculated (checksums, etc.)""" | |
self.__class__(bytes(self)).show() | |
def sprintf(self, fmt, relax=1): | |
"""sprintf(format, [relax=1]) -> str | |
where format is a string that can include directives. A directive begins and | |
ends by % and has the following format %[fmt[r],][cls[:nb].]field%. | |
fmt is a classic printf directive, "r" can be appended for raw substitution | |
(ex: IP.flags=0x18 instead of SA), nb is the number of the layer we want | |
(ex: for IP/IP packets, IP:2.src is the src of the upper IP layer). | |
Special case : "%.time%" is the creation time. | |
Ex : p.sprintf("%.time% %-15s,IP.src% -> %-15s,IP.dst% %IP.chksum% " | |
"%03xr,IP.proto% %r,TCP.flags%") | |
Moreover, the format string can include conditionnal statements. A conditionnal | |
statement looks like : {layer:string} where layer is a layer name, and string | |
is the string to insert in place of the condition if it is true, i.e. if layer | |
is present. If layer is preceded by a "!", the result si inverted. Conditions | |
can be imbricated. A valid statement can be : | |
p.sprintf("This is a{TCP: TCP}{UDP: UDP}{ICMP:n ICMP} packet") | |
p.sprintf("{IP:%IP.dst% {ICMP:%ICMP.type%}{TCP:%TCP.dport%}}") | |
A side effect is that, to obtain "{" and "}" characters, you must use | |
"%(" and "%)". | |
""" | |
escape = { "%": "%", | |
"(": "{", | |
")": "}" } | |
# Evaluate conditions | |
while "{" in fmt: | |
i = fmt.rindex("{") | |
j = fmt[i+1:].index("}") | |
cond = fmt[i+1:i+j+1] | |
k = cond.find(":") | |
if k < 0: | |
raise Scapy_Exception("Bad condition in format string: [%s] (read sprintf doc!)"%cond) | |
cond,format = cond[:k],cond[k+1:] | |
res = False | |
if cond[0] == "!": | |
res = True | |
cond = cond[1:] | |
if self.haslayer(cond): | |
res = not res | |
if not res: | |
format = "" | |
fmt = fmt[:i]+format+fmt[i+j+2:] | |
# Evaluate directives | |
s = "" | |
while "%" in fmt: | |
i = fmt.index("%") | |
s += fmt[:i] | |
fmt = fmt[i+1:] | |
if fmt and fmt[0] in escape: | |
s += escape[fmt[0]] | |
fmt = fmt[1:] | |
continue | |
try: | |
i = fmt.index("%") | |
sfclsfld = fmt[:i] | |
fclsfld = sfclsfld.split(",") | |
if len(fclsfld) == 1: | |
f = "s" | |
clsfld = fclsfld[0] | |
elif len(fclsfld) == 2: | |
f,clsfld = fclsfld | |
else: | |
raise Scapy_Exception | |
if "." in clsfld: | |
cls,fld = clsfld.split(".") | |
else: | |
cls = self.__class__.__name__ | |
fld = clsfld | |
num = 1 | |
if ":" in cls: | |
cls,num = cls.split(":") | |
num = int(num) | |
fmt = fmt[i+1:] | |
except: | |
raise Scapy_Exception("Bad format string [%%%s%s]" % (fmt[:25], fmt[25:] and "...")) | |
else: | |
if fld == "time": | |
val = time.strftime("%H:%M:%S.%%06i", time.localtime(self.time)) % int((self.time-int(self.time))*1000000) | |
elif cls == self.__class__.__name__ and hasattr(self, fld): | |
if num > 1: | |
val = self.payload.sprintf("%%%s,%s:%s.%s%%" % (f,cls,num-1,fld), relax) | |
f = "s" | |
elif f[-1] == "r": # Raw field value | |
val = getattr(self,fld) | |
f = f[:-1] | |
if not f: | |
f = "s" | |
else: | |
val = getattr(self,fld) | |
if fld in self.fieldtype: | |
val = self.fieldtype[fld].i2repr(self,val) | |
else: | |
val = self.payload.sprintf("%%%s%%" % sfclsfld, relax) | |
f = "s" | |
s += ("%"+f) % val | |
s += fmt | |
return s | |
def mysummary(self): | |
"""DEV: can be overloaded to return a string that summarizes the layer. | |
Only one mysummary() is used in a whole packet summary: the one of the upper layer, | |
except if a mysummary() also returns (as a couple) a list of layers whose | |
mysummary() must be called if they are present.""" | |
return "" | |
def _do_summary(self): | |
found,s,needed = self.payload._do_summary() | |
if s: | |
s = " / "+s | |
ret = "" | |
if not found or self.__class__ in needed: | |
ret = self.mysummary() | |
if type(ret) is tuple: | |
ret,n = ret | |
needed += n | |
if ret or needed: | |
found = 1 | |
if not ret: | |
ret = self.__class__.__name__ | |
if self.__class__ in conf.emph: | |
impf = [] | |
for f in self.fields_desc: | |
if f in conf.emph: | |
impf.append("%s=%s" % (f.name, f.i2repr(self, self.getfieldval(f.name)))) | |
ret = "%s [%s]" % (ret," ".join(impf)) | |
ret = "%s%s" % (ret,s) | |
return found,ret,needed | |
def summary(self, intern=0): | |
"""Prints a one line summary of a packet.""" | |
found,s,needed = self._do_summary() | |
return s | |
def lastlayer(self,layer=None): | |
"""Returns the uppest layer of the packet""" | |
return self.payload.lastlayer(self) | |
def decode_payload_as(self,cls): | |
"""Reassembles the payload and decode it using another packet class""" | |
s = bytes(self.payload) | |
self.payload = cls(s, _internal=1, _underlayer=self) | |
pp = self | |
while pp.underlayer is not None: | |
pp = pp.underlayer | |
self.payload.dissection_done(pp) | |
def libnet(self): | |
"""Not ready yet. Should give the necessary C code that interfaces with libnet to recreate the packet""" | |
print("libnet_build_%s(" % self.__class__.name.lower()) | |
det = self.__class__(str(self)) | |
for f in self.fields_desc: | |
val = det.getfieldval(f.name) | |
if val is None: | |
val = 0 | |
elif type(val) is int: | |
val = str(val) | |
else: | |
val = '"%s"' % str(val) | |
print("\t%s, \t\t/* %s */" % (val,f.name)) | |
print(");") | |
def command(self): | |
"""Returns a string representing the command you have to type to obtain the same packet""" | |
f = [] | |
for fn,fv in self.fields.items(): | |
fld = self.get_field(fn) | |
if isinstance(fv, Packet): | |
fv = fv.command() | |
elif fld.islist and fld.holds_packets and type(fv) is list: | |
#fv = "[%s]" % ",".join( map(Packet.command, fv)) | |
fv = "[%s]" % ",".join([ Packet.command(i) for i in fv ]) | |
else: | |
fv = repr(fv) | |
f.append("%s=%s" % (fn, fv)) | |
c = "%s(%s)" % (self.__class__.__name__, ", ".join(f)) | |
pc = self.payload.command() | |
if pc: | |
c += "/"+pc | |
return c | |
class NoPayload(Packet): | |
def __new__(cls, *args, **kargs): | |
singl = cls.__dict__.get("__singl__") | |
if singl is None: | |
cls.__singl__ = singl = Packet.__new__(cls) | |
Packet.__init__(singl) | |
return singl | |
def __init__(self, *args, **kargs): | |
pass | |
def dissection_done(self,pkt): | |
return | |
def add_payload(self, payload): | |
raise Scapy_Exception("Can't add payload to NoPayload instance") | |
def remove_payload(self): | |
pass | |
def add_underlayer(self,underlayer): | |
pass | |
def remove_underlayer(self,other): | |
pass | |
def copy(self): | |
return self | |
def __repr__(self): | |
return "" | |
def __str__(self): | |
return "" | |
def __nonzero__(self): | |
return False | |
def do_build(self): | |
return b"" | |
def build(self): | |
return b"" | |
def build_padding(self): | |
return b"" | |
def build_done(self, p): | |
return p | |
def build_ps(self, internal=0): | |
return b"",[] | |
def getfieldval(self, attr): | |
raise AttributeError(attr) | |
def getfield_and_val(self, attr): | |
raise AttributeError(attr) | |
def setfieldval(self, attr, val): | |
raise AttributeError(attr) | |
def delfieldval(self, attr): | |
raise AttributeError(attr) | |
def __getattr__(self, attr): | |
if attr in self.__dict__: | |
return self.__dict__[attr] | |
elif attr in self.__class__.__dict__: | |
return self.__class__.__dict__[attr] | |
else: | |
raise AttributeError(attr) | |
def hide_defaults(self): | |
pass | |
def __iter__(self): | |
return iter([]) | |
def __eq__(self, other): | |
if isinstance(other, NoPayload): | |
return True | |
return False | |
def hashret(self): | |
return b"" | |
def answers(self, other): | |
return isinstance(other, NoPayload) or isinstance(other, conf.padding_layer) | |
def haslayer(self, cls): | |
return 0 | |
def getlayer(self, cls, nb=1, _track=None): | |
if _track is not None: | |
_track.append(nb) | |
return None | |
def fragment(self, *args, **kargs): | |
raise Scapy_Exception("cannot fragment this packet") | |
def show(self, indent=3, lvl="", label_lvl=""): | |
pass | |
def sprintf(self, fmt, relax): | |
if relax: | |
return "??" | |
else: | |
raise Scapy_Exception("Format not found [%s]"%fmt) | |
def _do_summary(self): | |
return 0,"",[] | |
def lastlayer(self,layer): | |
return layer | |
def command(self): | |
return "" | |
#################### | |
## packet classes ## | |
#################### | |
class Raw(Packet): | |
name = "Raw" | |
fields_desc = [ StrField("load", b"") ] | |
def answers(self, other): | |
return 1 | |
# s = str(other) | |
# t = self.load | |
# l = min(len(s), len(t)) | |
# return s[:l] == t[:l] | |
def mysummary(self): | |
cs = conf.raw_summary | |
if cs: | |
if callable(cs): | |
return "Raw %s" % cs(self.load) | |
else: | |
return "Raw %r" % self.load | |
return Packet.mysummary(self) | |
class Padding(Raw): | |
name = "Padding" | |
def self_build(self): | |
return b"" | |
def build_padding(self): | |
return (self.getbyteval("load") if self.raw_packet_cache is None | |
else self.raw_packet_cache) + self.payload.build_padding() | |
conf.raw_layer = Raw | |
conf.padding_layer = Padding | |
if conf.default_l2 is None: | |
conf.default_l2 = Raw | |
################# | |
## Bind layers ## | |
################# | |
def bind_bottom_up(lower, upper, __fval=None, **fval): | |
if __fval is not None: | |
fval.update(__fval) | |
lower.payload_guess = lower.payload_guess[:] | |
lower.payload_guess.append((fval, upper)) | |
def bind_top_down(lower, upper, __fval=None, **fval): | |
if __fval is not None: | |
fval.update(__fval) | |
upper.overload_fields = upper.overload_fields.copy() | |
upper.overload_fields[lower] = fval | |
@conf.commands.register | |
def bind_layers(lower, upper, __fval=None, **fval): | |
"""Bind 2 layers on some specific fields' values""" | |
if __fval is not None: | |
fval.update(__fval) | |
bind_top_down(lower, upper, **fval) | |
bind_bottom_up(lower, upper, **fval) | |
def split_bottom_up(lower, upper, __fval=None, **fval): | |
if __fval is not None: | |
fval.update(__fval) | |
#def do_filter((f,u),upper=upper,fval=fval): | |
def do_filter(s,upper=upper,fval=fval): | |
if s[1] != upper: | |
return True | |
for k in fval: | |
if k not in s[0] or s[0][k] != fval[k]: | |
return True | |
return False | |
lower.payload_guess = list(filter(do_filter, lower.payload_guess)) | |
def split_top_down(lower, upper, __fval=None, **fval): | |
if __fval is not None: | |
fval.update(__fval) | |
if lower in upper.overload_fields: | |
ofval = upper.overload_fields[lower] | |
for k in fval: | |
if k not in ofval or ofval[k] != fval[k]: | |
return | |
upper.overload_fields = upper.overload_fields.copy() | |
del(upper.overload_fields[lower]) | |
@conf.commands.register | |
def split_layers(lower, upper, __fval=None, **fval): | |
"""Split 2 layers previously bound""" | |
if __fval is not None: | |
fval.update(__fval) | |
split_bottom_up(lower, upper, **fval) | |
split_top_down(lower, upper, **fval) | |
@conf.commands.register | |
def ls(obj=None): | |
"""List available layers, or infos on a given layer""" | |
if obj is None: | |
import builtins | |
all = builtins.__dict__.copy() | |
all.update(globals()) | |
objlst = sorted(conf.layers, key=lambda x:x.__name__) | |
for o in objlst: | |
print("%-10s : %s" %(o.__name__,o.name)) | |
else: | |
if isinstance(obj, type) and issubclass(obj, Packet): | |
for f in obj.fields_desc: | |
print("%-10s : %-20s = (%s)" % (f.name, f.__class__.__name__, repr(f.default))) | |
elif isinstance(obj, Packet): | |
for f in obj.fields_desc: | |
print("%-10s : %-20s = %-15s (%s)" % (f.name, f.__class__.__name__, repr(getattr(obj,f.name)), repr(f.default))) | |
if not isinstance(obj.payload, NoPayload): | |
print("--") | |
ls(obj.payload) | |
else: | |
print("Not a packet class. Type 'ls()' to list packet classes.") | |
############# | |
## Fuzzing ## | |
############# | |
@conf.commands.register | |
def fuzz(p, _inplace=0): | |
"""Transform a layer into a fuzzy layer by replacing some default values by random objects""" | |
if not _inplace: | |
p = p.copy() | |
q = p | |
while not isinstance(q, NoPayload): | |
for f in q.fields_desc: | |
if isinstance(f, PacketListField): | |
for r in getattr(q, f.name): | |
print("fuzzing", repr(r)) | |
fuzz(r, _inplace=1) | |
elif f.default is not None: | |
rnd = f.randval() | |
if rnd is not None: | |
q.default_fields[f.name] = rnd | |
q = q.payload | |
return p | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment