Skip to content

Instantly share code, notes, and snippets.

@syakesaba
Created April 24, 2016 12:15
Show Gist options
  • Save syakesaba/f1271154262bf3c3292995828b09e3fa to your computer and use it in GitHub Desktop.
Save syakesaba/f1271154262bf3c3292995828b09e3fa to your computer and use it in GitHub Desktop.
## This file is part of Scapy
## See http://www.secdev.org/projects/scapy for more informations
## Copyright (C) Philippe Biondi <[email protected]>
## This program is published under a GPLv2 license
"""
Fields: basic data structures that make up parts of packets.
"""
import struct,copy,socket
from .config import conf
from .volatile import *
from .data import *
from .utils import *
from .base_classes import BasePacket,Gen,Net
############
## Fields ##
############
class Field:
"""For more informations on how this work, please refer to
http://www.secdev.org/projects/scapy/files/scapydoc.pdf
chapter ``Adding a New Field''"""
islist=0
holds_packets=0
def __init__(self, name, default, fmt="H"):
self.name = name
if fmt[0] in "@=<>!":
self.fmt = fmt
else:
self.fmt = "!"+fmt
self.default = self.any2i(None,default)
self.sz = struct.calcsize(self.fmt)
self.owners = []
def register_owner(self, cls):
self.owners.append(cls)
def i2len(self, pkt, x):
"""Convert internal value to a length usable by a FieldLenField"""
return self.sz
def i2count(self, pkt, x):
"""Convert internal value to a number of elements usable by a FieldLenField.
Always 1 except for list fields"""
return 1
def i2b(self, pkt, x):
"""Convert internal value to internal value"""
if type(x) is str:
x = bytes([ ord(i) for i in x ])
return x
def i2dict(self, pkt, x):
return { self.name: x }
def h2i(self, pkt, x):
"""Convert human value to internal value"""
if type(x) is str:
x = bytes([ ord(i) for i in x ])
return x
def i2h(self, pkt, x):
"""Convert internal value to human value"""
return x
def m2i(self, pkt, x):
"""Convert machine value to internal value"""
return x
def i2m(self, pkt, x):
"""Convert internal value to machine value"""
if x is None:
x = 0
return x
def any2i(self, pkt, x):
"""Try to understand the most input values possible and make an internal value from them"""
return self.h2i(pkt, x)
def i2repr(self, pkt, x):
"""Convert internal value to a nice representation"""
return repr(self.i2h(pkt,x))
def addfield(self, pkt, s, val):
"""Add an internal value to a string"""
return s+struct.pack(self.fmt, self.i2m(pkt,val))
def getfield(self, pkt, s):
"""Extract an internal value from a string"""
return s[self.sz:], self.m2i(pkt, struct.unpack(self.fmt, s[:self.sz])[0])
def do_copy(self, x):
if hasattr(x, "copy"):
return x.copy()
if type(x) is list:
x = x[:]
for i in range(len(x)):
if isinstance(x[i], BasePacket):
x[i] = x[i].copy()
return x
def __repr__(self):
return "<Field (%s).%s>" % (",".join(x.__name__ for x in self.owners),self.name)
def copy(self):
return copy.deepcopy(self)
def randval(self):
"""Return a volatile object whose value is both random and suitable for this field"""
fmtt = self.fmt[-1]
if fmtt in "BHIQ":
return {"B":RandByte,"H":RandShort,"I":RandInt, "Q":RandLong}[fmtt]()
elif fmtt == "s":
if self.fmt[0] in "0123456789":
l = int(self.fmt[:-1])
else:
l = int(self.fmt[1:-1])
return RandBin(l)
else:
warning("no random class for [%s] (fmt=%s)." % (self.name, self.fmt))
class Emph:
fld = b""
def __init__(self, fld):
self.fld = fld
def __getattr__(self, attr):
return getattr(self.fld,attr)
def __hash__(self):
return hash(self.fld)
def __eq__(self, other):
return self.fld == other
class ActionField:
_fld = None
def __init__(self, fld, action_method, **kargs):
self._fld = fld
self._action_method = action_method
self._privdata = kargs
def any2i(self, pkt, val):
getattr(pkt, self._action_method)(val, self._fld, **self._privdata)
return getattr(self._fld, "any2i")(pkt, val)
def __getattr__(self, attr):
return getattr(self._fld,attr)
class ConditionalField:
fld = None
def __init__(self, fld, cond):
self.fld = fld
self.cond = cond
def _evalcond(self,pkt):
return self.cond(pkt)
def getfield(self, pkt, s):
if self._evalcond(pkt):
return self.fld.getfield(pkt,s)
else:
return s,None
def addfield(self, pkt, s, val):
if self._evalcond(pkt):
return self.fld.addfield(pkt,s,val)
else:
return s
def __getattr__(self, attr):
return getattr(self.fld,attr)
class PadField:
"""Add bytes after the proxified field so that it ends at the specified
alignment from its begining"""
_fld = None
def __init__(self, fld, align, padwith=None):
self._fld = fld
self._align = align
self._padwith = padwith or b""
def padlen(self, flen):
return -flen%self._align
def getfield(self, pkt, s):
remain,val = self._fld.getfield(pkt,s)
padlen = self.padlen(len(s)-len(remain))
return remain[padlen:], val
def addfield(self, pkt, s, val):
sval = self._fld.addfield(pkt, b"", val)
return s+sval+struct.pack("%is" % (self.padlen(len(sval))), self._padwith)
def __getattr__(self, attr):
return getattr(self._fld,attr)
class MACField(Field):
def __init__(self, name, default):
Field.__init__(self, name, default, "6s")
def i2m(self, pkt, x):
if x is None:
return b"\0\0\0\0\0\0"
return mac2str(x)
def m2i(self, pkt, x):
return str2mac(x)
def any2i(self, pkt, x):
if type(x) is bytes and len(x) is 6:
x = self.m2i(pkt, x)
return x
def i2repr(self, pkt, x):
x = self.i2h(pkt, x)
if self in conf.resolve:
x = conf.manufdb._resolve_MAC(x)
return x
def randval(self):
return RandMAC()
class IPField(Field):
def __init__(self, name, default):
Field.__init__(self, name, default, "4s")
def h2i(self, pkt, x):
if type(x) is str:
try:
inet_aton(x)
except socket.error:
x = Net(x)
elif type(x) is list:
x = [self.h2i(pkt, n) for n in x]
return x
def resolve(self, x):
if self in conf.resolve:
try:
ret = socket.gethostbyaddr(x)[0]
except:
pass
else:
if ret:
return ret
return x
def i2m(self, pkt, x):
return inet_aton(x)
def m2i(self, pkt, x):
return inet_ntoa(x)
def any2i(self, pkt, x):
return self.h2i(pkt,x)
def i2repr(self, pkt, x):
return self.resolve(self.i2h(pkt, x))
def randval(self):
return RandIP()
class SourceIPField(IPField):
def __init__(self, name, dstname):
IPField.__init__(self, name, None)
self.dstname = dstname
def i2m(self, pkt, x):
if x is None:
iff,x,gw = pkt.route()
if x is None:
x = "0.0.0.0"
return IPField.i2m(self, pkt, x)
def i2h(self, pkt, x):
if x is None:
dst=getattr(pkt,self.dstname)
if isinstance(dst,Gen):
#r = map(conf.route.route, dst)
r = [ conf.route.route(i) for i in dst ]
r.sort()
if r[0] != r[-1]:
warning("More than one possible route for %s"%repr(dst))
iff,x,gw = r[0]
else:
iff,x,gw = conf.route.route(dst)
return IPField.i2h(self, pkt, x)
class ByteField(Field):
def __init__(self, name, default):
Field.__init__(self, name, default, "B")
class SignedByteField(Field):
def __init__(self, name, default):
Field.__init__(self, name, default, "b")
class XByteField(ByteField):
def i2repr(self, pkt, x):
return lhex(self.i2h(pkt, x))
class OByteField(ByteField):
def i2repr(self, pkt, x):
return "%03o"%self.i2h(pkt, x)
class X3BytesField(XByteField):
def __init__(self, name, default):
Field.__init__(self, name, default, "!I")
def addfield(self, pkt, s, val):
return s+struct.pack(self.fmt, self.i2m(pkt,val))[1:4]
def getfield(self, pkt, s):
return s[3:], self.m2i(pkt, struct.unpack(self.fmt, b"\x00"+s[:3])[0])
class ShortField(Field):
def __init__(self, name, default):
Field.__init__(self, name, default, "H")
class LEShortField(Field):
def __init__(self, name, default):
Field.__init__(self, name, default, "<H")
class XShortField(ShortField):
def i2repr(self, pkt, x):
return lhex(self.i2h(pkt, x))
class IntField(Field):
def __init__(self, name, default):
Field.__init__(self, name, default, "I")
class SignedIntField(Field):
def __init__(self, name, default):
Field.__init__(self, name, default, "i")
def randval(self):
return RandSInt()
class LEIntField(Field):
def __init__(self, name, default):
Field.__init__(self, name, default, "<I")
class LESignedIntField(Field):
def __init__(self, name, default):
Field.__init__(self, name, default, "<i")
def randval(self):
return RandSInt()
class XIntField(IntField):
def i2repr(self, pkt, x):
return lhex(self.i2h(pkt, x))
class LongField(Field):
def __init__(self, name, default):
Field.__init__(self, name, default, "Q")
class LELongField(Field):
def __init__(self, name, default):
Field.__init__(self, name, default, "<Q")
class XLongField(LongField):
def i2repr(self, pkt, x):
return lhex(self.i2h(pkt, x))
class IEEEFloatField(Field):
def __init__(self, name, default):
Field.__init__(self, name, default, "f")
class IEEEDoubleField(Field):
def __init__(self, name, default):
Field.__init__(self, name, default, "d")
class StrField(Field):
def __init__(self, name, default, fmt="H", remain=0):
Field.__init__(self,name,default,fmt)
self.remain = remain
#def i2h(self, pkt, x):
def i2repr(self, pkt, x):
try:
if type(x) is bytes:
x = x.decode('ascii')
except UnicodeDecodeError:
pass
return repr(x)
#def i2repr(self, pkt, x):
# return repr(self.i2h(pkt,x))
def i2len(self, pkt, i):
return len(i)
def i2m(self, pkt, x):
if x is None:
x = b""
elif type(x) is not bytes:
x=str(x).encode('ascii')
return x
def addfield(self, pkt, s, val):
return s+self.i2m(pkt, val)
def getfield(self, pkt, s):
if self.remain == 0:
return b"",self.m2i(pkt, s)
else:
return s[-self.remain:],self.m2i(pkt, s[:-self.remain])
def randval(self):
return RandBin(RandNum(0,1200))
class PacketField(StrField):
holds_packets=1
def __init__(self, name, default, cls, remain=0): #is remain used somewhere?
StrField.__init__(self, name, default, remain=remain)
self.cls = cls
def i2m(self, pkt, i):
return bytes(i)
def m2i(self, pkt, m):
return self.cls(m)
def getfield(self, pkt, s):
i = self.m2i(pkt, s)
remain = b""
if conf.padding_layer in i:
r = i[conf.padding_layer]
del(r.underlayer.payload)
remain = r.load
return remain,i
class PacketLenField(PacketField):
holds_packets=1
def __init__(self, name, default, cls, length_from=None):
PacketField.__init__(self, name, default, cls)
self.length_from = length_from
def getfield(self, pkt, s):
l = self.length_from(pkt)
try:
i = self.m2i(pkt, s[:l])
except Exception:
if conf.debug_dissector:
raise
i = conf.raw_layer(load=s[:l])
return s[l:],i
class PacketListField(PacketField):
islist = 1
holds_packets=1
def __init__(self, name, default, cls, count_from=None, length_from=None):
if default is None:
default = [] # Create a new list for each instance
PacketField.__init__(self, name, default, cls)
self.count_from = count_from
self.length_from = length_from
def any2i(self, pkt, x):
if type(x) is not list:
return [x]
else:
return x
def i2count(self, pkt, val):
if type(val) is list:
return len(val)
return 1
def i2len(self, pkt, val):
return sum( len(p) for p in val )
def do_copy(self, x):
#return map(lambda p:p.copy(), x)
return [ i.copy() for i in x ]
def getfield(self, pkt, s):
c = l = None
if self.length_from is not None:
l = self.length_from(pkt)
elif self.count_from is not None:
c = self.count_from(pkt)
lst = []
ret = b""
remain = s
if l is not None:
remain,ret = s[:l],s[l:]
while remain:
if c is not None:
if c <= 0:
break
c -= 1
try:
p = self.m2i(pkt,remain)
except Exception:
if conf.debug_dissector:
raise
p = conf.raw_layer(load=remain)
remain = b""
else:
if conf.padding_layer in p:
pad = p[conf.padding_layer]
remain = pad.load
del(pad.underlayer.payload)
else:
remain = b""
lst.append(p)
return remain+ret,lst
def addfield(self, pkt, s, val):
return s+b"".join([ bytes(i) for i in val ])
class StrFixedLenField(StrField):
def __init__(self, name, default, length=None, length_from=None):
StrField.__init__(self, name, default)
self.length_from = length_from
if length is not None:
self.length_from = lambda pkt,length=length: length
def i2repr(self, pkt, v):
if type(v) is bytes:
v = v.rstrip(b"\0")
return repr(v)
def getfield(self, pkt, s):
l = self.length_from(pkt)
return s[l:], self.m2i(pkt,s[:l])
def addfield(self, pkt, s, val):
l = self.length_from(pkt)
return s+struct.pack("%is"%l,self.i2m(pkt, val))
def randval(self):
try:
l = self.length_from(None)
except:
l = RandNum(0,200)
return RandBin(l)
class StrFixedLenEnumField(StrFixedLenField):
def __init__(self, name, default, length=None, enum=None, length_from=None):
StrFixedLenField.__init__(self, name, default, length=length, length_from=length_from)
self.enum = enum
def i2repr(self, pkt, v):
r = v.rstrip("\0")
rr = repr(r)
if v in self.enum:
rr = "%s (%s)" % (rr, self.enum[v])
elif r in self.enum:
rr = "%s (%s)" % (rr, self.enum[r])
return rr
class NetBIOSNameField(StrFixedLenField):
def __init__(self, name, default, length=31):
StrFixedLenField.__init__(self, name, default, length)
def i2m(self, pkt, x):
l = self.length_from(pkt)//2
if x is None:
x = b""
x += b" "*(l)
x = x[:l]
#x = b"".join(map(lambda x: chr(0x41+(ord(x)>>4))+chr(0x41+(ord(x)&0xf)), x))
x = b"".join([ bytes([0x41+(i>>4),0x41+(i&0xf)]) for i in x ])
x = b" "+x
return x
def m2i(self, pkt, x):
x = x.strip(b"\x00").strip(b" ")
#return b"".join(map(lambda x,y: chr((((ord(x)-1)&0xf)<<4)+((ord(y)-1)&0xf)), x[::2],x[1::2]))
return b"".join(map(lambda x,y: bytes([(((x-1)&0xf)<<4)+((y-1)&0xf)]), x[::2],x[1::2]))
class StrLenField(StrField):
def __init__(self, name, default, fld=None, length_from=None):
StrField.__init__(self, name, default)
self.length_from = length_from
def getfield(self, pkt, s):
l = self.length_from(pkt)
return s[l:], self.m2i(pkt,s[:l])
class FieldListField(Field):
islist=1
def __init__(self, name, default, field, length_from=None, count_from=None):
if default is None:
default = [] # Create a new list for each instance
Field.__init__(self, name, default)
self.count_from = count_from
self.length_from = length_from
self.field = field
def i2count(self, pkt, val):
if type(val) is list:
return len(val)
return 1
def i2len(self, pkt, val):
return sum( self.field.i2len(pkt,v) for v in val )
def i2m(self, pkt, val):
if val is None:
val = []
return val
def any2i(self, pkt, x):
if type(x) is not list:
return [x]
else:
return x
def addfield(self, pkt, s, val):
val = self.i2m(pkt, val)
for v in val:
s = self.field.addfield(pkt, s, v)
return s
def getfield(self, pkt, s):
c = l = None
if self.length_from is not None:
l = self.length_from(pkt)
elif self.count_from is not None:
c = self.count_from(pkt)
val = []
ret=b""
if l is not None:
s,ret = s[:l],s[l:]
while s:
if c is not None:
if c <= 0:
break
c -= 1
s,v = self.field.getfield(pkt, s)
val.append(v)
return s+ret, val
class FieldLenField(Field):
def __init__(self, name, default, length_of=None, fmt = "H", count_of=None, adjust=lambda pkt,x:x, fld=None):
Field.__init__(self, name, default, fmt)
self.length_of=length_of
self.count_of=count_of
self.adjust=adjust
if fld is not None:
FIELD_LENGTH_MANAGEMENT_DEPRECATION(self.__class__.__name__)
self.length_of = fld
def i2m(self, pkt, x):
if x is None:
if self.length_of is not None:
fld,fval = pkt.getfield_and_val(self.length_of)
f = fld.i2len(pkt, fval)
else:
fld,fval = pkt.getfield_and_val(self.count_of)
f = fld.i2count(pkt, fval)
x = self.adjust(pkt,f)
return x
class StrNullField(StrField):
def addfield(self, pkt, s, val):
return s+self.i2m(pkt, val)+b"\x00"
def getfield(self, pkt, s):
l = s.find(b"\x00")
if l < 0:
#XXX \x00 not found
return "",s
return s[l+1:],self.m2i(pkt, s[:l])
def randval(self):
return RandTermString(RandNum(0,1200),b"\x00")
class StrStopField(StrField):
def __init__(self, name, default, stop, additionnal=0):
Field.__init__(self, name, default)
self.stop=stop
self.additionnal=additionnal
def getfield(self, pkt, s):
l = s.find(self.stop)
if l < 0:
return b"",s
# raise Scapy_Exception,"StrStopField: stop value [%s] not found" %stop
l += len(self.stop)+self.additionnal
return s[l:],s[:l]
def randval(self):
return RandTermString(RandNum(0,1200),self.stop)
class LenField(Field):
def i2m(self, pkt, x):
if x is None:
x = len(pkt.payload)
return x
class BCDFloatField(Field):
def i2m(self, pkt, x):
return int(256*x)
def m2i(self, pkt, x):
return x/256.0
class BitField(Field):
def __init__(self, name, default, size):
Field.__init__(self, name, default)
self.rev = size < 0
self.size = abs(size)
def reverse(self, val):
if self.size == 16:
val = socket.ntohs(val)
elif self.size == 32:
val = socket.ntohl(val)
return val
def addfield(self, pkt, s, val):
val = self.i2m(pkt, val)
if type(s) is tuple:
s,bitsdone,v = s
else:
bitsdone = 0
v = 0
if self.rev:
val = self.reverse(val)
v <<= self.size
v |= val & ((1<<self.size) - 1)
bitsdone += self.size
while bitsdone >= 8:
bitsdone -= 8
s = s+struct.pack("!B", v >> bitsdone)
v &= (1<<bitsdone)-1
if bitsdone:
return s,bitsdone,v
else:
return s
def getfield(self, pkt, s):
if type(s) is tuple:
s,bn = s
else:
bn = 0
# we don't want to process all the string
nb_bytes = (self.size+bn-1)//8 + 1
w = s[:nb_bytes]
# split the substring byte by byte
bs = struct.unpack('!%dB' % nb_bytes , w)
b = 0
for c in range(nb_bytes):
b |= int(bs[c]) << (nb_bytes-c-1)*8
# get rid of high order bits
b &= (1 << (nb_bytes*8-bn)) - 1
# remove low order bits
b = b >> (nb_bytes*8 - self.size - bn)
if self.rev:
b = self.reverse(b)
bn += self.size
s = s[bn//8:]
bn = bn%8
b = self.m2i(pkt, b)
if bn:
return (s,bn),b
else:
return s,b
def randval(self):
return RandNum(0,2**self.size-1)
class BitFieldLenField(BitField):
def __init__(self, name, default, size, length_of=None, count_of=None, adjust=lambda pkt,x:x):
BitField.__init__(self, name, default, size)
self.length_of=length_of
self.count_of=count_of
self.adjust=adjust
def i2m(self, pkt, x):
#return FieldLenField.i2m.im_func(self, pkt, x)
return FieldLenField.i2m(self, pkt, x)
class XBitField(BitField):
def i2repr(self, pkt, x):
return lhex(self.i2h(pkt,x))
class EnumField(Field):
def __init__(self, name, default, enum, fmt = "H"):
i2s = self.i2s = {}
s2i = self.s2i = {}
if type(enum) is list:
keys = range(len(enum))
else:
keys = enum.keys()
if list(filter(lambda x: type(x) is str, keys)):
i2s,s2i = s2i,i2s
for k in keys:
i2s[k] = enum[k]
s2i[enum[k]] = k
Field.__init__(self, name, default, fmt)
def any2i_one(self, pkt, x):
if type(x) is str:
x = self.s2i[x]
return x
def i2repr_one(self, pkt, x):
if self not in conf.noenum and not isinstance(x,VolatileValue) and x in self.i2s:
return self.i2s[x]
return repr(x)
def any2i(self, pkt, x):
if type(x) is list:
return list(map(lambda z,pkt=pkt:self.any2i_one(pkt,z), x))
else:
return self.any2i_one(pkt,x)
def i2repr(self, pkt, x):
if type(x) is list:
return list(map(lambda z,pkt=pkt:self.i2repr_one(pkt,z), x))
else:
return self.i2repr_one(pkt,x)
class CharEnumField(EnumField):
def __init__(self, name, default, enum, fmt = "1s"):
EnumField.__init__(self, name, default, enum, fmt)
k = list(self.i2s.keys())
if k and len(k[0]) != 1:
self.i2s,self.s2i = self.s2i,self.i2s
def any2i_one(self, pkt, x):
if len(x) != 1:
x = self.s2i[x]
return x
class BitEnumField(BitField,EnumField):
def __init__(self, name, default, size, enum):
EnumField.__init__(self, name, default, enum)
self.rev = size < 0
self.size = abs(size)
def any2i(self, pkt, x):
return EnumField.any2i(self, pkt, x)
def i2repr(self, pkt, x):
return EnumField.i2repr(self, pkt, x)
class ShortEnumField(EnumField):
def __init__(self, name, default, enum):
EnumField.__init__(self, name, default, enum, "H")
class LEShortEnumField(EnumField):
def __init__(self, name, default, enum):
EnumField.__init__(self, name, default, enum, "<H")
class ByteEnumField(EnumField):
def __init__(self, name, default, enum):
EnumField.__init__(self, name, default, enum, "B")
class IntEnumField(EnumField):
def __init__(self, name, default, enum):
EnumField.__init__(self, name, default, enum, "I")
class SignedIntEnumField(EnumField):
def __init__(self, name, default, enum):
EnumField.__init__(self, name, default, enum, "i")
def randval(self):
return RandSInt()
class LEIntEnumField(EnumField):
def __init__(self, name, default, enum):
EnumField.__init__(self, name, default, enum, "<I")
class XShortEnumField(ShortEnumField):
def i2repr_one(self, pkt, x):
if self not in conf.noenum and not isinstance(x,VolatileValue) and x in self.i2s:
return self.i2s[x]
return lhex(x)
class MultiEnumField(EnumField):
def __init__(self, name, default, enum, depends_on, fmt = "H"):
self.depends_on = depends_on
self.i2s_multi = enum
self.s2i_multi = {}
self.s2i_all = {}
for m in enum:
self.s2i_multi[m] = s2i = {}
for k,v in enum[m].items():
s2i[v] = k
self.s2i_all[v] = k
Field.__init__(self, name, default, fmt)
def any2i_one(self, pkt, x):
if type (x) is str:
v = self.depends_on(pkt)
if v in self.s2i_multi:
s2i = self.s2i_multi[v]
if x in s2i:
return s2i[x]
return self.s2i_all[x]
return x
def i2repr_one(self, pkt, x):
v = self.depends_on(pkt)
if v in self.i2s_multi:
return self.i2s_multi[v].get(x,x)
return x
class BitMultiEnumField(BitField,MultiEnumField):
def __init__(self, name, default, size, enum, depends_on):
MultiEnumField.__init__(self, name, default, enum)
self.rev = size < 0
self.size = abs(size)
def any2i(self, pkt, x):
return MultiEnumField.any2i(self, pkt, x)
def i2repr(self, pkt, x):
return MultiEnumField.i2repr(self, pkt, x)
# Little endian long field
class LELongField(Field):
def __init__(self, name, default):
Field.__init__(self, name, default, "<Q")
# Little endian fixed length field
class LEFieldLenField(FieldLenField):
def __init__(self, name, default, length_of=None, fmt = "<H", count_of=None, adjust=lambda pkt,x:x, fld=None):
FieldLenField.__init__(self, name, default, length_of=length_of, fmt=fmt, fld=fld, adjust=adjust)
class FlagsField(BitField):
def __init__(self, name, default, size, names):
self.multi = type(names) is list or type(names) is tuple
if self.multi:
#self.names = map(lambda x:[x], names)
self.names = names
else:
self.names = names
BitField.__init__(self, name, default, size)
def any2i(self, pkt, x):
if type(x) is str:
if self.multi:
#x = map(lambda y:[y], x.split("+"))
x = x.split("+")
y = 0
for i in x:
y |= 1 << self.names.index(i)
x = y
return x
def i2repr(self, pkt, x):
if type(x) is list or type(x) is tuple:
return repr(x)
if self.multi:
r = []
else:
r = ""
i=0
while x:
if x & 1:
if self.multi:
r += [ self.names[i] ]
else:
r += self.names[i]
i += 1
x >>= 1
if self.multi:
r = "+".join(r)
return r
def i2dict(self, pkt, x): # Potential compatibility issue for older code. And fields to conf.noenum for old behaviour
if self.multi:
r = {}
else:
return { self.names: x }
for i,n in enumerate(self.names):
if x & 1:
r[n] = True
else:
r[n] = False
x >>= 1
return r
class FixedPointField(BitField):
def __init__(self, name, default, size, frac_bits=16):
self.frac_bits = frac_bits
BitField.__init__(self, name, default, size)
def any2i(self, pkt, val):
if val is None:
return val
ival = int(val)
fract = int( (val-ival) * 2**self.frac_bits )
return (ival << self.frac_bits) | fract
def i2h(self, pkt, val):
int_part = val >> self.frac_bits
frac_part = val & (1 << self.frac_bits) - 1
frac_part /= 2.0**self.frac_bits
return int_part+frac_part
def i2repr(self, pkt, val):
return self.i2h(pkt, val)
#!/usr/bin/env python
# encoding: utf-8
from scapy.packet import Packet,bind_layers
from scapy.fields import *
class Example(Packet):#必ずPacketクラスを継承すること
"""
Scapyに自作プロトコルを実装するためのモデルです。
Packetクラスは基本的に受信した1つのパケットデータのすべてを常に保持します。
"""
name = "ProtocolName" #ls()で表示されるプロトコルの説明文。クラス名と一致させる必要はない。
fields_desc = [
ByteEnumField("code",0,{1:"code1!",2:"code2!!",3:"code3!!!",0xFF:"OMG"}),
] #前から順にバイト列が決まるので注意
#Dissecting:
# NICやファイルからパケットを読み込んだときにhookする関数群
# アナライザとしての役割を果たす
# 元となるのはdissect関数である。
# def dissect(self, s):
# s = self.pre_dissect(s)
# s = self.do_dissect(s)
# s = self.post_dissect(s)
# payl,pad = self.extract_padding(s)
# self.do_dissect_payload(payl)
# if pad and conf.padding:
# self.add_payload(conf.padding_layer(pad))
def pre_dissect(self, s):
"""
この関数はパケットの解析の準備を担います。オーバーライドしなければ何もしません。
この関数ではFCSのチェックとか、パケットの長さのチェック、
その他、パケットの解析の前にすべきことを行います。
@param s str 受信したバイト列全文。
@return s str do_dissectにたらい回すバイト列全文
"""
return Packet.pre_dissect(self,s)
def do_dissect(self,s):
"""
この関数はパケットの解析の中核を担います。オーバーライドしなければ
fields_descに沿った解析がいい感じに自動でなされます。
@param s str pre_dieectからたらい回されたバイト列。
@return s str post_dissectにたらい回すバイト列全文
"""
return Packet.do_dissect(self,s)
def post_dissect(self,s):
"""
この関数はパケットの解析の後始末を担います。オーバーライドしなければ何もしません。
この関数ではデータ解析後の完全性のチェックや内包圧縮データの展開を行います。
@param s str do_dieectからたらい回されたバイト列。
@return s str extract_paddingにたらい回すバイト列全文
"""
return Packet.post_dissect(self,s)
def extract_padding(self,s):
"""
この関数ではパケット解析の後発生した
ペイロード(next layer)とパディング(Padding)を2つに分断します。
パディングが無ければpay,Noneをreturnすること。
@param s str pre_dissectで返されたバイト列。(分割前のデータ)
@return pay str ペイロード部。guess_payload_classに渡される
@return pad str パディング部。Paddingクラスに渡される
"""
return Packet.extract_padding(self,s)# return pay,pad
def guess_payload_class(self,pay):
"""
この関数ではこのクラスのペイロード部のプロトコルを判定し、
そのプロトコルを解析しうるクラスオブジェクトを返します。
この関数の中で複雑なペイロード識別を行うことができます。
Note:
TCPの宛先ポート番号80番はHTTPクラス、というように
このPacketのペイロードのプロトコルがこのPacketのfields_descの値によって
即座に決定する場合、bind_layersに頼るべきです。
(オーバーライドしない場合の標準仕様です。)
@param pay str extract_paddingで渡されたペイロード。
@return pktClass class 推測判定したペイロードプロトコルクラス。Packetクラスを継承していること。
"""
return Packet.guess_payload_class(self,pay) #bind_layers関数による紐付けに頼ります。
def default_payload_class(self,pay):
"""
guess_payload_classでペイロードのプロトコルが推測できない場合に呼び出されます。
ただし、同関数をあなたがオーバーライドしていた場合、関数の中でこの関数を
処理の最後にreturnとして明示的に呼び出す必要があります。
本来Rawクラスがデフォルトのプロトコルですが、この関数をオーバーロードすることで
異なるデフォルトのプロトコルを指定できます。あまり使いません。
@param pay str extract_paddingで渡されたペイロード。
@return pktClass class 推測判定したペイロードプロトコルクラス。Packetクラスを継承していること。
"""
return Packet.default_payload_class(self,pay)
#Building:
# パケットを送信するときなど、パケットを完成させるための関数群
# 与えられた文字や数字をmachine語(バイナリ)になおす。
# 元となるのはbuild関数である。
# ****最も重要なのはpost_build関数である。
# def build(self):
# p = self.do_build()
# p += self.build_padding()
# p = self.build_done(p)
# return p
# def do_build(self):
# if not self.explicit:
# self = self.__iter__().next()
# pkt = self.self_build()
# for t in self.post_transforms:
# pkt = t(pkt)
# pay = self.do_build_payload()
# p = self.post_build(pkt,pay)
# return p
def self_build(self, field_post_list=[]):
"""
デフォルトで、パケットを構成するfields_descの各フィールドを順にi2mしていく。
その後、fuzzing用のtransform系関数が呼ばれ、晴れて一つのpktになる。
@param field_post_list list 知らん
@return pkt str 各フィールドをi2mした後のパケットのバイナリ文字列
"""
return Packet.self_build(self,field_post_list)
def do_build_payload(self):
"""
ペイロードに対してdo_build_payload関数を呼び出す。
デフォルトでペイロードで連鎖する。
@return pay str 連鎖して完成した自レイヤ以降のペイロード
"""
return Packet.do_build_payload(self)
def post_build(self, pkt, pay):
"""
自レイヤ以降すべてのペイロードが計算された後呼ばれる。
pktはこのクラスのテキストデータであり、payはペイロード部である。
この関数ではCRCなどのチェックサムを計算したり、
lengthを代入するフィールドの計算をするべきである。
大抵この関数を実装しないとロクに送受信できない。
この関数の時点でselfには完成間近のパケットクラスが入っているので
fields_descのフィールド名でフィールドにアクセスできる。
各フィールドの値で埋められていないところをこの関数で埋めるが、
その際selfは編集せず、pktの方を編集すること。
payは基本編集しないはず。
@param pkt str パケットデータ。
@param pay str パケットデータ。最後にpkt+payするために使う。
@return p str パケットデータ。多くの場合pkt+payをreturnするだろう。
"""
self.show()
print "===pkt==="
print pkt
print "===pay==="
print pay
return Packet.post_build(self, pkt, pay)
def build_padding(self):
"""
ペイロードに対してbuild_padding関数を呼び出す。
デフォルトでペイロードで連鎖する。
@return pad str 連鎖して完成した自レイヤ以降のパディング
"""
return Packet.build_padding(self)
def build_done(self,pkt):
"""
ペイロードに対してbuild_done関数を呼び出す。
デフォルトでペイロードで連鎖する。
@return pkt str 最終的に完成した全パケットデータ
"""
return Packet.build_done(self,pkt)
i=Example()
i.code=0xFF
i.show()
from scapy.main import interact
interact(mydict=locals())
## This file is part of Scapy
## See http://www.secdev.org/projects/scapy for more informations
## Copyright (C) Philippe Biondi <[email protected]>
## This program is published under a GPLv2 license
"""
Packet class. Binding mechanism. fuzz() method.
"""
import time,itertools,os
import sys,traceback
import copy
from .fields import StrField,ConditionalField,Emph,PacketListField
from .config import conf
from .base_classes import BasePacket,Gen,SetGen,Packet_metaclass,NewDefaultValues
from .volatile import VolatileValue
from .utils import import_hexcap,tex_escape,colgen,get_temp_file
from .error import Scapy_Exception,log_runtime, warning
import subprocess
try:
import pyx
except ImportError:
pass
class RawVal:
def __init__(self, val=b""):
assert type(val) == bytes
self.val = val
def __str__(self):
return str(self.val)
def __repr__(self):
return "<RawVal [%r]>" % self.val
def bytes(self):
return self.val
class Packet(BasePacket, metaclass = Packet_metaclass):
name=None
fields_desc = []
aliastypes = []
overload_fields = {}
underlayer = None
sent_time = None
payload_guess = []
initialized = 0
show_indent=1
explicit = 0
raw_packet_cache = None
@classmethod
def from_hexcap(cls):
return cls(import_hexcap())
@classmethod
def upper_bonds(self):
for fval,upper in self.payload_guess:
print("%-20s %s" % (upper.__name__, ", ".join("%-12s" % ("%s=%r"%i) for i in fval.items())))
@classmethod
def lower_bonds(self):
for lower,fval in self.overload_fields.items():
print("%-20s %s" % (lower.__name__, ", ".join("%-12s" % ("%s=%r"%i) for i in fval.items())))
def __init__(self, _pkt=b"", post_transform=None, _internal=0, _underlayer=None, **fields):
self.time = time.time()
self.sent_time = 0
if self.name is None:
self.name = self.__class__.__name__
self.aliastypes = [ self.__class__ ] + self.aliastypes
self.default_fields = {}
self.overloaded_fields = {}
self.fields={}
self.fieldtype={}
self.packetfields=[]
self.__dict__["payload"] = NoPayload()
self.init_fields()
self.underlayer = _underlayer
self.initialized = 1
self.original = _pkt
if _pkt:
self.dissect(_pkt)
if not _internal:
self.dissection_done(self)
for f in fields.keys():
self.fields[f] = self.get_field(f).any2i(self,fields[f])
if type(post_transform) is list:
self.post_transforms = post_transform
elif post_transform is None:
self.post_transforms = []
else:
self.post_transforms = [post_transform]
def init_fields(self):
self.do_init_fields(self.fields_desc)
def do_init_fields(self, flist):
for f in flist:
self.default_fields[f.name] = copy.deepcopy(f.default)
self.fieldtype[f.name] = f
if f.holds_packets:
self.packetfields.append(f)
def dissection_done(self,pkt):
"""DEV: will be called after a dissection is completed"""
self.post_dissection(pkt)
self.payload.dissection_done(pkt)
def post_dissection(self, pkt):
"""DEV: is called after the dissection of the whole packet"""
pass
def get_field(self, fld):
"""DEV: returns the field instance from the name of the field"""
return self.fieldtype[fld]
def add_payload(self, payload):
if payload is None:
return
elif not isinstance(self.payload, NoPayload):
self.payload.add_payload(payload)
else:
if isinstance(payload, Packet):
self.__dict__["payload"] = payload
payload.add_underlayer(self)
for t in self.aliastypes:
if t in payload.overload_fields:
self.overloaded_fields = payload.overload_fields[t]
break
#elif type(payload) is str:
elif type(payload) is bytes:
self.__dict__["payload"] = conf.raw_layer(load=payload)
else:
raise TypeError("payload must be either 'Packet' or 'bytes', not [%s]" % repr(payload))
def remove_payload(self):
self.payload.remove_underlayer(self)
self.__dict__["payload"] = NoPayload()
self.overloaded_fields = {}
def add_underlayer(self, underlayer):
self.underlayer = underlayer
def remove_underlayer(self,other):
self.underlayer = None
def copy(self):
"""Returns a deep copy of the instance."""
clone = self.__class__()
clone.fields = self.fields.copy()
for k in clone.fields:
clone.fields[k] = self.get_field(k).do_copy(clone.fields[k])
clone.default_fields = self.default_fields.copy()
clone.overloaded_fields = self.overloaded_fields.copy()
clone.overload_fields = self.overload_fields.copy()
clone.underlayer = self.underlayer
clone.explicit = self.explicit
clone.raw_packet_cache = self.raw_packet_cache
clone.post_transforms = self.post_transforms[:]
clone.__dict__["payload"] = self.payload.copy()
clone.payload.add_underlayer(clone)
return clone
def getfieldval(self, attr):
if attr in self.fields:
return self.fields[attr]
if attr in self.overloaded_fields:
return self.overloaded_fields[attr]
if attr in self.default_fields:
return self.default_fields[attr]
return self.payload.getfieldval(attr)
def getbyteval(self, attr):
fld,v = self.getfield_and_val(attr)
return fld.i2b(self, v)
def getstrval(self, attr):
fld,v = self.getfield_and_val(attr)
return fld.i2repr(self, v)
def getdictval(self, attr):
fld,v = self.getfield_and_val(attr)
return fld.i2dict(self, v)
def getfield_and_val(self, attr):
if attr in self.fields:
return self.get_field(attr),self.fields[attr]
if attr in self.overloaded_fields:
return self.get_field(attr),self.overloaded_fields[attr]
if attr in self.default_fields:
return self.get_field(attr),self.default_fields[attr]
return self.payload.getfield_and_val(attr)
def __getattr__(self, attr):
if self.initialized:
fld,v = self.getfield_and_val(attr)
if fld is not None:
return fld.i2h(self, v)
return v
raise AttributeError(attr)
def setfieldval(self, attr, val):
if attr in self.default_fields:
fld = self.get_field(attr)
if fld is None:
any2i = lambda x,y: y
else:
any2i = fld.any2i
self.fields[attr] = any2i(self, val)
self.explicit = 0
self.raw_packet_cache = None
elif attr == "payload":
self.remove_payload()
self.add_payload(val)
else:
self.payload.setfieldval(attr,val)
def __setattr__(self, attr, val):
if self.initialized:
try:
self.setfieldval(attr,val)
except AttributeError:
pass
else:
return
self.__dict__[attr] = val
def delfieldval(self, attr):
if attr in self.fields:
del(self.fields[attr])
self.explicit = 0 # in case a default value must be explicited
self.raw_packet_cache = None
elif attr in self.default_fields:
pass
elif attr == "payload":
self.remove_payload()
else:
self.payload.delfieldval(attr)
def __delattr__(self, attr):
if self.initialized:
try:
self.delfieldval(attr)
except AttributeError:
pass
else:
return
if attr in self.__dict__:
del(self.__dict__[attr])
else:
raise AttributeError(attr)
def __repr__(self):
s = ""
ct = conf.color_theme
for f in self.fields_desc:
if isinstance(f, ConditionalField) and not f._evalcond(self):
continue
if f.name in self.fields:
val = f.i2repr(self, self.fields[f.name])
elif f.name in self.overloaded_fields:
val = f.i2repr(self, self.overloaded_fields[f.name])
else:
continue
if isinstance(f, Emph) or f in conf.emph:
ncol = ct.emph_field_name
vcol = ct.emph_field_value
else:
ncol = ct.field_name
vcol = ct.field_value
s += " %s%s%s" % (ncol(f.name),
ct.punct("="),
vcol(val))
return "%s%s %s %s%s%s"% (ct.punct("<"),
ct.layer_name(self.__class__.__name__),
s,
ct.punct("|"),
repr(self.payload),
ct.punct(">"))
#def __str__(self):
#TODO3 FIX
def __str__(self):
warning("Unless called manually, this could indicate deprecated use. Should be changed to bytes(self)")
return repr(bytes(self))
def __bytes__(self):
return self.build()
def __div__(self, other):
if isinstance(other, Packet):
cloneA = self.copy()
cloneB = other.copy()
cloneA.add_payload(cloneB)
return cloneA
elif type(other) is str:
return self/conf.raw_layer(load=other.encode('ascii'))
elif type(other) is bytes:
return self/conf.raw_layer(load=other)
else:
return other.__rdiv__(self)
__truediv__ = __div__
def __rdiv__(self, other):
if type(other) is str:
return conf.raw_layer(load=other.encode('ascii'))/self
if type(other) is bytes:
return conf.raw_layer(load=other)/self
else:
raise TypeError
__rtruediv__ = __rdiv__
def __mul__(self, other):
if type(other) is int:
return [self]*other
else:
raise TypeError
def __rmul__(self,other):
return self.__mul__(other)
def __nonzero__(self):
return True
def __len__(self):
return len(bytes(self))
def self_build(self, field_pos_list=None):
if self.raw_packet_cache is not None:
return self.raw_packet_cache
p=b""
for f in self.fields_desc:
#print(f.name)
val = self.getfieldval(f.name)
if isinstance(val, RawVal):
#sval = str(val)
sval = bytes(val)
p += sval
if field_pos_list is not None:
field_pos_list.append( (f.name, sval.encode("string_escape"), len(p), len(sval) ) )
else:
p = f.addfield(self, p, val)
return p
def do_build_payload(self):
return self.payload.do_build()
def do_build(self):
if not self.explicit:
self = next(self.__iter__())
pkt = self.self_build()
for t in self.post_transforms:
pkt = t(pkt)
pay = self.do_build_payload()
p = self.post_build(pkt,pay)
return p
def build_padding(self):
return self.payload.build_padding()
def build(self):
p = self.do_build()
p += self.build_padding()
p = self.build_done(p)
return p
def post_build(self, pkt, pay):
"""DEV: called right after the current layer is build."""
return pkt+pay
def build_done(self, p):
return self.payload.build_done(p)
def do_build_ps(self):
p=b""
pl = []
q=b""
for f in self.fields_desc:
if isinstance(f, ConditionalField) and not f._evalcond(self):
continue
p = f.addfield(self, p, self.getfieldval(f.name) )
if type(p) is bytes:
r = p[len(q):]
q = p
else:
r = b""
pl.append( (f, f.i2repr(self,self.getfieldval(f.name)), r) )
pkt,lst = self.payload.build_ps(internal=1)
p += pkt
lst.append( (self, pl) )
return p,lst
def build_ps(self,internal=0):
p,lst = self.do_build_ps()
# if not internal:
# pkt = self
# while pkt.haslayer(conf.padding_layer):
# pkt = pkt.getlayer(conf.padding_layer)
# lst.append( (pkt, [ ("loakjkjd", pkt.load, pkt.load) ] ) )
# p += pkt.load
# pkt = pkt.payload
return p,lst
def psdump(self, filename=None, **kargs):
"""psdump(filename=None, layer_shift=0, rebuild=1)
Creates an EPS file describing a packet. If filename is not provided a temporary file is created and gs is called."""
canvas = self.canvas_dump(**kargs)
if filename is None:
fname = get_temp_file(autoext=".eps")
canvas.writeEPSfile(fname)
subprocess.Popen([conf.prog.psreader, fname+".eps"])
else:
canvas.writeEPSfile(filename)
def pdfdump(self, filename=None, **kargs):
"""pdfdump(filename=None, layer_shift=0, rebuild=1)
Creates a PDF file describing a packet. If filename is not provided a temporary file is created and xpdf is called."""
canvas = self.canvas_dump(**kargs)
if filename is None:
fname = get_temp_file(autoext=".pdf")
canvas.writePDFfile(fname)
subprocess.Popen([conf.prog.pdfreader, fname+".pdf"])
else:
canvas.writePDFfile(filename)
def canvas_dump(self, layer_shift=0, rebuild=1):
canvas = pyx.canvas.canvas()
if rebuild:
p,t = self.__class__(bytes(self)).build_ps()
else:
p,t = self.build_ps()
YTXT=len(t)
for n,l in t:
YTXT += len(l)
YTXT = float(YTXT)
YDUMP=YTXT
XSTART = 1
XDSTART = 10
y = 0.0
yd = 0.0
xd = 0
XMUL= 0.55
YMUL = 0.4
backcolor=colgen(0.6, 0.8, 1.0, trans=pyx.color.rgb)
forecolor=colgen(0.2, 0.5, 0.8, trans=pyx.color.rgb)
# backcolor=makecol(0.376, 0.729, 0.525, 1.0)
def hexstr(x):
s = []
for c in x:
s.append("%02x" % c)
return " ".join(s)
def make_dump_txt(x,y,txt):
return pyx.text.text(XDSTART+x*XMUL, (YDUMP-y)*YMUL, r"\tt{%s}"%hexstr(txt), [pyx.text.size.Large])
def make_box(o):
return pyx.box.rect(o.left(), o.bottom(), o.width(), o.height(), relcenter=(0.5,0.5))
def make_frame(lst):
if len(lst) == 1:
b = lst[0].bbox()
b.enlarge(pyx.unit.u_pt)
return b.path()
else:
fb = lst[0].bbox()
fb.enlarge(pyx.unit.u_pt)
lb = lst[-1].bbox()
lb.enlarge(pyx.unit.u_pt)
if len(lst) == 2 and fb.left() > lb.right():
return pyx.path.path(pyx.path.moveto(fb.right(), fb.top()),
pyx.path.lineto(fb.left(), fb.top()),
pyx.path.lineto(fb.left(), fb.bottom()),
pyx.path.lineto(fb.right(), fb.bottom()),
pyx.path.moveto(lb.left(), lb.top()),
pyx.path.lineto(lb.right(), lb.top()),
pyx.path.lineto(lb.right(), lb.bottom()),
pyx.path.lineto(lb.left(), lb.bottom()))
else:
# XXX
gb = lst[1].bbox()
if gb != lb:
gb.enlarge(pyx.unit.u_pt)
kb = lst[-2].bbox()
if kb != gb and kb != lb:
kb.enlarge(pyx.unit.u_pt)
return pyx.path.path(pyx.path.moveto(fb.left(), fb.top()),
pyx.path.lineto(fb.right(), fb.top()),
pyx.path.lineto(fb.right(), kb.bottom()),
pyx.path.lineto(lb.right(), kb.bottom()),
pyx.path.lineto(lb.right(), lb.bottom()),
pyx.path.lineto(lb.left(), lb.bottom()),
pyx.path.lineto(lb.left(), gb.top()),
pyx.path.lineto(fb.left(), gb.top()),
pyx.path.closepath(),)
def make_dump(s, shift=0, y=0, col=None, bkcol=None, larg=16):
c = pyx.canvas.canvas()
tlist = []
while s:
dmp,s = s[:larg-shift],s[larg-shift:]
txt = make_dump_txt(shift, y, dmp)
tlist.append(txt)
shift += len(dmp)
if shift >= 16:
shift = 0
y += 1
if col is None:
col = pyx.color.rgb.red
if bkcol is None:
col = pyx.color.rgb.white
c.stroke(make_frame(tlist),[col,pyx.deco.filled([bkcol]),pyx.style.linewidth.Thick])
for txt in tlist:
c.insert(txt)
return c, tlist[-1].bbox(), shift, y
last_shift,last_y=0,0.0
while t:
bkcol = next(backcolor)
proto,fields = t.pop()
y += 0.5
pt = pyx.text.text(XSTART, (YTXT-y)*YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % proto.name, [ pyx.text.size.Large])
y += 1
ptbb=pt.bbox()
ptbb.enlarge(pyx.unit.u_pt*2)
canvas.stroke(ptbb.path(),[pyx.color.rgb.black, pyx.deco.filled([bkcol])])
canvas.insert(pt)
for fname, fval, fdump in fields:
col = next(forecolor)
ft = pyx.text.text(XSTART, (YTXT-y)*YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(fname.name))
if isinstance(fval, str):
if len(fval) > 18:
fval = fval[:18]+"[...]"
else:
fval=""
vt = pyx.text.text(XSTART+3, (YTXT-y)*YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(fval))
y += 1.0
if fdump:
dt,target,last_shift,last_y = make_dump(fdump, last_shift, last_y, col, bkcol)
dtb = dt.bbox()
dtb=target
vtb = vt.bbox()
bxvt = make_box(vtb)
bxdt = make_box(dtb)
dtb.enlarge(pyx.unit.u_pt)
try:
if yd < 0:
cnx = pyx.connector.curve(bxvt,bxdt,absangle1=0, absangle2=-90)
else:
cnx = pyx.connector.curve(bxvt,bxdt,absangle1=0, absangle2=90)
except:
pass
else:
canvas.stroke(cnx,[pyx.style.linewidth.thin,pyx.deco.earrow.small,col])
canvas.insert(dt)
canvas.insert(ft)
canvas.insert(vt)
last_y += layer_shift
return canvas
def extract_padding(self, s):
"""DEV: to be overloaded to extract current layer's padding. Return a couple of strings (actual layer, padding)"""
return s,None
def post_dissect(self, s):
"""DEV: is called right after the current layer has been dissected"""
return s
def pre_dissect(self, s):
"""DEV: is called right before the current layer is dissected"""
return s
def do_dissect(self, s):
flist = self.fields_desc[:]
flist.reverse()
raw = s
while s and flist:
f = flist.pop()
#print(f, end = " = ")
s,fval = f.getfield(self, s)
#print('fval')
self.fields[f.name] = fval
assert(raw.endswith(s))
if s:
self.raw_packet_cache = raw[:-len(s)]
else:
self.raw_packet_cache = raw
self.explicit = 1
return s
def do_dissect_payload(self, s):
if s:
cls = self.guess_payload_class(s)
try:
p = cls(s, _internal=1, _underlayer=self)
except KeyboardInterrupt:
raise
except:
if conf.debug_dissector:
if isinstance(cls,type) and issubclass(cls,Packet):
log_runtime.error("%s dissector failed" % cls.name)
else:
log_runtime.error("%s.guess_payload_class() returned [%s]" % (self.__class__.__name__,repr(cls)))
if cls is not None:
raise
p = conf.raw_layer(s, _internal=1, _underlayer=self)
self.add_payload(p)
def dissect(self, s):
s = self.pre_dissect(s)
s = self.do_dissect(s)
s = self.post_dissect(s)
payl,pad = self.extract_padding(s)
self.do_dissect_payload(payl)
if pad and conf.padding:
self.add_payload(conf.padding_layer(pad))
def guess_payload_class(self, payload):
"""DEV: Guesses the next payload class from layer bonds. Can be overloaded to use a different mechanism."""
for t in self.aliastypes:
for fval, cls in t.payload_guess:
ok = 1
for k in fval.keys():
if not hasattr(self, k) or fval[k] != self.getfieldval(k):
ok = 0
break
if ok:
return cls
return self.default_payload_class(payload)
def default_payload_class(self, payload):
"""DEV: Returns the default payload class if nothing has been found by the guess_payload_class() method."""
return conf.raw_layer
def hide_defaults(self):
"""Removes fields' values that are the same as default values."""
for k in list(self.fields.keys()):
if k in self.default_fields:
if self.default_fields[k] == self.fields[k]:
del(self.fields[k])
self.payload.hide_defaults()
def clone_with(self, payload=None, **kargs):
pkt = self.__class__()
pkt.explicit = 1
pkt.fields = kargs
pkt.time = self.time
pkt.underlayer = self.underlayer
pkt.overload_fields = self.overload_fields.copy()
pkt.post_transforms = self.post_transforms
if payload is not None:
pkt.add_payload(payload)
return pkt
def __iter__(self):
def loop(todo, done, self=self):
if todo:
eltname = todo.pop()
elt = self.getfieldval(eltname)
if not isinstance(elt, Gen):
if self.get_field(eltname).islist:
elt = SetGen([elt])
else:
elt = SetGen(elt)
for e in elt:
done[eltname]=e
for x in loop(todo[:], done):
yield x
else:
if isinstance(self.payload,NoPayload):
payloads = [None]
else:
payloads = self.payload
for payl in payloads:
done2=done.copy()
for k in done2:
if isinstance(done2[k], VolatileValue):
done2[k] = done2[k]._fix()
pkt = self.clone_with(payload=payl, **done2)
yield pkt
if self.explicit:
todo = []
done = self.fields
else:
todo = [ k for (k,v) in itertools.chain(self.default_fields.items(),
self.overloaded_fields.items())
if isinstance(v, VolatileValue) ] + list(self.fields.keys())
done = {}
return loop(todo, done)
def __gt__(self, other):
"""True if other is an answer from self (self ==> other)."""
if isinstance(other, Packet):
return other < self
elif type(other) is str:
return 1
else:
raise TypeError((self, other))
def __lt__(self, other):
"""True if self is an answer from other (other ==> self)."""
if isinstance(other, Packet):
return self.answers(other)
elif type(other) is str:
return 1
else:
raise TypeError((self, other))
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
for f in self.fields_desc:
if f not in other.fields_desc:
return False
if self.getfieldval(f.name) != other.getfieldval(f.name):
return False
return self.payload == other.payload
def __ne__(self, other):
return not self.__eq__(other)
def hashret(self):
"""DEV: returns a string that has the same value for a request and its answer."""
return self.payload.hashret()
def answers(self, other):
"""DEV: true if self is an answer from other"""
if other.__class__ == self.__class__:
return self.payload.answers(other.payload)
return 0
def haslayer(self, cls):
"""true if self has a layer that is an instance of cls. Superseded by "cls in self" syntax."""
if self.__class__ == cls or self.__class__.__name__ == cls:
return 1
for f in self.packetfields:
fvalue_gen = self.getfieldval(f.name)
if fvalue_gen is None:
continue
if not f.islist:
fvalue_gen = SetGen(fvalue_gen,_iterpacket=0)
for fvalue in fvalue_gen:
if isinstance(fvalue, Packet):
ret = fvalue.haslayer(cls)
if ret:
return ret
return self.payload.haslayer(cls)
def getlayer(self, cls, nb=1, _track=None):
"""Return the nb^th layer that is an instance of cls."""
if type(cls) is int:
nb = cls+1
cls = None
if type(cls) is str and "." in cls:
ccls,fld = cls.split(".",1)
else:
ccls,fld = cls,None
if cls is None or self.__class__ == cls or self.__class__.name == ccls:
if nb == 1:
if fld is None:
return self
else:
return self.getfieldval(fld)
else:
nb -=1
for f in self.packetfields:
fvalue_gen = self.getfieldval(f.name)
if fvalue_gen is None:
continue
if not f.islist:
fvalue_gen = SetGen(fvalue_gen,_iterpacket=0)
for fvalue in fvalue_gen:
if isinstance(fvalue, Packet):
track=[]
ret = fvalue.getlayer(cls, nb, _track=track)
if ret is not None:
return ret
nb = track[0]
return self.payload.getlayer(cls,nb,_track=_track)
def firstlayer(self):
q = self
while q.underlayer is not None:
q = q.underlayer
return q
def __getitem__(self, cls):
if type(cls) is slice:
lname = cls.start
if cls.stop:
ret = self.getlayer(cls.start, cls.stop)
else:
ret = self.getlayer(cls.start)
if ret is None and cls.step is not None:
ret = cls.step
else:
lname=cls
ret = self.getlayer(cls)
if ret is None:
if type(lname) is Packet_metaclass:
lname = lname.__name__
elif type(lname) is not str:
lname = repr(lname)
raise IndexError("Layer [%s] not found" % lname)
return ret
def __delitem__(self, cls):
del(self[cls].underlayer.payload)
def __setitem__(self, cls, val):
self[cls].underlayer.payload = val
def __contains__(self, cls):
""""cls in self" returns true if self has a layer which is an instance of cls."""
return self.haslayer(cls)
def route(self):
return (None,None,None)
def fragment(self, *args, **kargs):
return self.payload.fragment(*args, **kargs)
def display(self,*args,**kargs): # Deprecated. Use show()
"""Deprecated. Use show() method."""
self.show(*args,**kargs)
def show(self, indent=3, lvl="", label_lvl=""):
"""Prints a hierarchical view of the packet. "indent" gives the size of indentation for each layer."""
ct = conf.color_theme
print("%s%s %s %s" % (label_lvl,
ct.punct("###["),
ct.layer_name(self.name),
ct.punct("]###")))
for f in self.fields_desc:
if isinstance(f, ConditionalField) and not f._evalcond(self):
continue
if isinstance(f, Emph) or f in conf.emph:
ncol = ct.emph_field_name
vcol = ct.emph_field_value
else:
ncol = ct.field_name
vcol = ct.field_value
fvalue = self.getfieldval(f.name)
if isinstance(fvalue, Packet) or (f.islist and f.holds_packets and type(fvalue) is list):
print("%s \\%-10s\\" % (label_lvl+lvl, ncol(f.name)))
fvalue_gen = SetGen(fvalue,_iterpacket=0)
for fvalue in fvalue_gen:
fvalue.show(indent=indent, label_lvl=label_lvl+lvl+" |")
else:
begn = "%s %-10s%s " % (label_lvl+lvl,
ncol(f.name),
ct.punct("="),)
reprval = f.i2repr(self,fvalue)
if type(reprval) is str:
reprval = reprval.replace("\n", "\n"+" "*(len(label_lvl)
+len(lvl)
+len(f.name)
+4))
print("%s%s" % (begn,vcol(reprval)))
self.payload.show(indent=indent, lvl=lvl+(" "*indent*self.show_indent), label_lvl=label_lvl)
def show2(self):
"""Prints a hierarchical view of an assembled version of the packet, so that automatic fields are calculated (checksums, etc.)"""
self.__class__(bytes(self)).show()
def sprintf(self, fmt, relax=1):
"""sprintf(format, [relax=1]) -> str
where format is a string that can include directives. A directive begins and
ends by % and has the following format %[fmt[r],][cls[:nb].]field%.
fmt is a classic printf directive, "r" can be appended for raw substitution
(ex: IP.flags=0x18 instead of SA), nb is the number of the layer we want
(ex: for IP/IP packets, IP:2.src is the src of the upper IP layer).
Special case : "%.time%" is the creation time.
Ex : p.sprintf("%.time% %-15s,IP.src% -> %-15s,IP.dst% %IP.chksum% "
"%03xr,IP.proto% %r,TCP.flags%")
Moreover, the format string can include conditionnal statements. A conditionnal
statement looks like : {layer:string} where layer is a layer name, and string
is the string to insert in place of the condition if it is true, i.e. if layer
is present. If layer is preceded by a "!", the result si inverted. Conditions
can be imbricated. A valid statement can be :
p.sprintf("This is a{TCP: TCP}{UDP: UDP}{ICMP:n ICMP} packet")
p.sprintf("{IP:%IP.dst% {ICMP:%ICMP.type%}{TCP:%TCP.dport%}}")
A side effect is that, to obtain "{" and "}" characters, you must use
"%(" and "%)".
"""
escape = { "%": "%",
"(": "{",
")": "}" }
# Evaluate conditions
while "{" in fmt:
i = fmt.rindex("{")
j = fmt[i+1:].index("}")
cond = fmt[i+1:i+j+1]
k = cond.find(":")
if k < 0:
raise Scapy_Exception("Bad condition in format string: [%s] (read sprintf doc!)"%cond)
cond,format = cond[:k],cond[k+1:]
res = False
if cond[0] == "!":
res = True
cond = cond[1:]
if self.haslayer(cond):
res = not res
if not res:
format = ""
fmt = fmt[:i]+format+fmt[i+j+2:]
# Evaluate directives
s = ""
while "%" in fmt:
i = fmt.index("%")
s += fmt[:i]
fmt = fmt[i+1:]
if fmt and fmt[0] in escape:
s += escape[fmt[0]]
fmt = fmt[1:]
continue
try:
i = fmt.index("%")
sfclsfld = fmt[:i]
fclsfld = sfclsfld.split(",")
if len(fclsfld) == 1:
f = "s"
clsfld = fclsfld[0]
elif len(fclsfld) == 2:
f,clsfld = fclsfld
else:
raise Scapy_Exception
if "." in clsfld:
cls,fld = clsfld.split(".")
else:
cls = self.__class__.__name__
fld = clsfld
num = 1
if ":" in cls:
cls,num = cls.split(":")
num = int(num)
fmt = fmt[i+1:]
except:
raise Scapy_Exception("Bad format string [%%%s%s]" % (fmt[:25], fmt[25:] and "..."))
else:
if fld == "time":
val = time.strftime("%H:%M:%S.%%06i", time.localtime(self.time)) % int((self.time-int(self.time))*1000000)
elif cls == self.__class__.__name__ and hasattr(self, fld):
if num > 1:
val = self.payload.sprintf("%%%s,%s:%s.%s%%" % (f,cls,num-1,fld), relax)
f = "s"
elif f[-1] == "r": # Raw field value
val = getattr(self,fld)
f = f[:-1]
if not f:
f = "s"
else:
val = getattr(self,fld)
if fld in self.fieldtype:
val = self.fieldtype[fld].i2repr(self,val)
else:
val = self.payload.sprintf("%%%s%%" % sfclsfld, relax)
f = "s"
s += ("%"+f) % val
s += fmt
return s
def mysummary(self):
"""DEV: can be overloaded to return a string that summarizes the layer.
Only one mysummary() is used in a whole packet summary: the one of the upper layer,
except if a mysummary() also returns (as a couple) a list of layers whose
mysummary() must be called if they are present."""
return ""
def _do_summary(self):
found,s,needed = self.payload._do_summary()
if s:
s = " / "+s
ret = ""
if not found or self.__class__ in needed:
ret = self.mysummary()
if type(ret) is tuple:
ret,n = ret
needed += n
if ret or needed:
found = 1
if not ret:
ret = self.__class__.__name__
if self.__class__ in conf.emph:
impf = []
for f in self.fields_desc:
if f in conf.emph:
impf.append("%s=%s" % (f.name, f.i2repr(self, self.getfieldval(f.name))))
ret = "%s [%s]" % (ret," ".join(impf))
ret = "%s%s" % (ret,s)
return found,ret,needed
def summary(self, intern=0):
"""Prints a one line summary of a packet."""
found,s,needed = self._do_summary()
return s
def lastlayer(self,layer=None):
"""Returns the uppest layer of the packet"""
return self.payload.lastlayer(self)
def decode_payload_as(self,cls):
"""Reassembles the payload and decode it using another packet class"""
s = bytes(self.payload)
self.payload = cls(s, _internal=1, _underlayer=self)
pp = self
while pp.underlayer is not None:
pp = pp.underlayer
self.payload.dissection_done(pp)
def libnet(self):
"""Not ready yet. Should give the necessary C code that interfaces with libnet to recreate the packet"""
print("libnet_build_%s(" % self.__class__.name.lower())
det = self.__class__(str(self))
for f in self.fields_desc:
val = det.getfieldval(f.name)
if val is None:
val = 0
elif type(val) is int:
val = str(val)
else:
val = '"%s"' % str(val)
print("\t%s, \t\t/* %s */" % (val,f.name))
print(");")
def command(self):
"""Returns a string representing the command you have to type to obtain the same packet"""
f = []
for fn,fv in self.fields.items():
fld = self.get_field(fn)
if isinstance(fv, Packet):
fv = fv.command()
elif fld.islist and fld.holds_packets and type(fv) is list:
#fv = "[%s]" % ",".join( map(Packet.command, fv))
fv = "[%s]" % ",".join([ Packet.command(i) for i in fv ])
else:
fv = repr(fv)
f.append("%s=%s" % (fn, fv))
c = "%s(%s)" % (self.__class__.__name__, ", ".join(f))
pc = self.payload.command()
if pc:
c += "/"+pc
return c
class NoPayload(Packet):
def __new__(cls, *args, **kargs):
singl = cls.__dict__.get("__singl__")
if singl is None:
cls.__singl__ = singl = Packet.__new__(cls)
Packet.__init__(singl)
return singl
def __init__(self, *args, **kargs):
pass
def dissection_done(self,pkt):
return
def add_payload(self, payload):
raise Scapy_Exception("Can't add payload to NoPayload instance")
def remove_payload(self):
pass
def add_underlayer(self,underlayer):
pass
def remove_underlayer(self,other):
pass
def copy(self):
return self
def __repr__(self):
return ""
def __str__(self):
return ""
def __nonzero__(self):
return False
def do_build(self):
return b""
def build(self):
return b""
def build_padding(self):
return b""
def build_done(self, p):
return p
def build_ps(self, internal=0):
return b"",[]
def getfieldval(self, attr):
raise AttributeError(attr)
def getfield_and_val(self, attr):
raise AttributeError(attr)
def setfieldval(self, attr, val):
raise AttributeError(attr)
def delfieldval(self, attr):
raise AttributeError(attr)
def __getattr__(self, attr):
if attr in self.__dict__:
return self.__dict__[attr]
elif attr in self.__class__.__dict__:
return self.__class__.__dict__[attr]
else:
raise AttributeError(attr)
def hide_defaults(self):
pass
def __iter__(self):
return iter([])
def __eq__(self, other):
if isinstance(other, NoPayload):
return True
return False
def hashret(self):
return b""
def answers(self, other):
return isinstance(other, NoPayload) or isinstance(other, conf.padding_layer)
def haslayer(self, cls):
return 0
def getlayer(self, cls, nb=1, _track=None):
if _track is not None:
_track.append(nb)
return None
def fragment(self, *args, **kargs):
raise Scapy_Exception("cannot fragment this packet")
def show(self, indent=3, lvl="", label_lvl=""):
pass
def sprintf(self, fmt, relax):
if relax:
return "??"
else:
raise Scapy_Exception("Format not found [%s]"%fmt)
def _do_summary(self):
return 0,"",[]
def lastlayer(self,layer):
return layer
def command(self):
return ""
####################
## packet classes ##
####################
class Raw(Packet):
name = "Raw"
fields_desc = [ StrField("load", b"") ]
def answers(self, other):
return 1
# s = str(other)
# t = self.load
# l = min(len(s), len(t))
# return s[:l] == t[:l]
def mysummary(self):
cs = conf.raw_summary
if cs:
if callable(cs):
return "Raw %s" % cs(self.load)
else:
return "Raw %r" % self.load
return Packet.mysummary(self)
class Padding(Raw):
name = "Padding"
def self_build(self):
return b""
def build_padding(self):
return (self.getbyteval("load") if self.raw_packet_cache is None
else self.raw_packet_cache) + self.payload.build_padding()
conf.raw_layer = Raw
conf.padding_layer = Padding
if conf.default_l2 is None:
conf.default_l2 = Raw
#################
## Bind layers ##
#################
def bind_bottom_up(lower, upper, __fval=None, **fval):
if __fval is not None:
fval.update(__fval)
lower.payload_guess = lower.payload_guess[:]
lower.payload_guess.append((fval, upper))
def bind_top_down(lower, upper, __fval=None, **fval):
if __fval is not None:
fval.update(__fval)
upper.overload_fields = upper.overload_fields.copy()
upper.overload_fields[lower] = fval
@conf.commands.register
def bind_layers(lower, upper, __fval=None, **fval):
"""Bind 2 layers on some specific fields' values"""
if __fval is not None:
fval.update(__fval)
bind_top_down(lower, upper, **fval)
bind_bottom_up(lower, upper, **fval)
def split_bottom_up(lower, upper, __fval=None, **fval):
if __fval is not None:
fval.update(__fval)
#def do_filter((f,u),upper=upper,fval=fval):
def do_filter(s,upper=upper,fval=fval):
if s[1] != upper:
return True
for k in fval:
if k not in s[0] or s[0][k] != fval[k]:
return True
return False
lower.payload_guess = list(filter(do_filter, lower.payload_guess))
def split_top_down(lower, upper, __fval=None, **fval):
if __fval is not None:
fval.update(__fval)
if lower in upper.overload_fields:
ofval = upper.overload_fields[lower]
for k in fval:
if k not in ofval or ofval[k] != fval[k]:
return
upper.overload_fields = upper.overload_fields.copy()
del(upper.overload_fields[lower])
@conf.commands.register
def split_layers(lower, upper, __fval=None, **fval):
"""Split 2 layers previously bound"""
if __fval is not None:
fval.update(__fval)
split_bottom_up(lower, upper, **fval)
split_top_down(lower, upper, **fval)
@conf.commands.register
def ls(obj=None):
"""List available layers, or infos on a given layer"""
if obj is None:
import builtins
all = builtins.__dict__.copy()
all.update(globals())
objlst = sorted(conf.layers, key=lambda x:x.__name__)
for o in objlst:
print("%-10s : %s" %(o.__name__,o.name))
else:
if isinstance(obj, type) and issubclass(obj, Packet):
for f in obj.fields_desc:
print("%-10s : %-20s = (%s)" % (f.name, f.__class__.__name__, repr(f.default)))
elif isinstance(obj, Packet):
for f in obj.fields_desc:
print("%-10s : %-20s = %-15s (%s)" % (f.name, f.__class__.__name__, repr(getattr(obj,f.name)), repr(f.default)))
if not isinstance(obj.payload, NoPayload):
print("--")
ls(obj.payload)
else:
print("Not a packet class. Type 'ls()' to list packet classes.")
#############
## Fuzzing ##
#############
@conf.commands.register
def fuzz(p, _inplace=0):
"""Transform a layer into a fuzzy layer by replacing some default values by random objects"""
if not _inplace:
p = p.copy()
q = p
while not isinstance(q, NoPayload):
for f in q.fields_desc:
if isinstance(f, PacketListField):
for r in getattr(q, f.name):
print("fuzzing", repr(r))
fuzz(r, _inplace=1)
elif f.default is not None:
rnd = f.randval()
if rnd is not None:
q.default_fields[f.name] = rnd
q = q.payload
return p
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment