Created
July 17, 2018 23:51
-
-
Save Sasszem/090a3860b4c6a792128a641be7c10508 to your computer and use it in GitHub Desktop.
A socket wrapper to help write custom protocols
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import struct | |
import array | |
def _toBytes(list): | |
return array.array('B', list).tostring() | |
def _toId(int): | |
return _toBytes((int %256, int//256)) | |
class SocketConnector(object): | |
IDs={"byte": _toBytes([1, 0]), | |
"short":_toBytes([2, 0]), | |
"int":_toBytes([3, 0]), | |
"float":_toBytes([4, 0])} # Name => Id | |
_IDs_reversed={v: k for k, v in IDs.items()} | |
_lastid=len(IDs) | |
@classmethod | |
def _createType(self, type): | |
if len(type)==4: #It's a function based one | |
self.senders[type[0]]=type[-1] | |
self.readers[type[1]]=type[2] | |
self.IDs[type[0]]=type[1] | |
self._IDs_reversed[type[1]]=type[0] | |
elif len(type)==3: #It's a compbound one | |
self.IDs[type[0]]=type[1] | |
self._IDs_reversed[type[1]]=type[0] | |
fields=type[-1] | |
c=len(fields) | |
def send(self, data): | |
return b"".join(self.senders[fields[i]](self, data[i]) for i in range(c)) | |
def read(self): | |
return [self.readers[self.IDs[fields[i]]](self) for i in range(c)] | |
self.readers[type[1]]=read | |
self.senders[type[0]]=send | |
@classmethod | |
def load(self, files): | |
if isinstance(files, list): | |
for f in files: self._load(f) | |
elif isinstance(files, str): | |
self._load(files) | |
else: | |
raise Exception("Sc.load takes str or list of multiple str, not %s"%type(files)) | |
self.IDs.update({v: k for k, v in self.IDs.items()}) | |
@classmethod | |
def _load(self, file): | |
f=open(file, "r") | |
data=json.loads(f.read()) | |
f.close() | |
if "module" in data: | |
m= __import__(data['module']) | |
for tp in data["func_types"]: | |
self._lastid+=1 | |
name=tp | |
reader=getattr(m, "read_"+tp) | |
sender=getattr(m, "send_"+tp) | |
self._createType((name, _toId(self._lastid), reader, sender)) | |
print("Registrated type: %s with id %s"%(name, self._lastid)) | |
if "comp_types" in data: | |
for tp in data["comp_types"]: | |
self._lastid+=1 | |
name=tp[0] | |
fields=tp[1] | |
self._createType((name, _toId(self._lastid), fields)) | |
print("Registrated type: %s with id %s"%(name, self._lastid)) | |
def __init__(self,sock,return_types=False, return_types_as_str=1): | |
self.sock=sock | |
self.intpacker=struct.Struct("i") | |
self.shortpacker=struct.Struct("h") | |
self.floatpacker=struct.Struct("f") | |
self._typeConverter = (lambda x: self._IDs_reversed[x]) if return_types_as_str else (lambda x: x) | |
if return_types: | |
self.read=self.read_types | |
def send(self, id, payload): | |
resoult=self.IDs[id] | |
p=self.senders[id](self, payload) | |
resoult+=p | |
self.send_raw(resoult) | |
def read(self): | |
id=self.read_raw(2) | |
return self.readers[id](self) | |
def read_types(self): | |
id=self.read_raw(2) | |
return self._typeConverter(id),self.readers[id](self) | |
def send_byte(self, b): | |
return b | |
def send_short(self, s): | |
return self.shortpacker.pack(s) | |
def send_int(self, i): | |
return self.intpacker.pack(i) | |
def send_float(self, f): | |
return self.floatpacker.pack(f) | |
senders={"byte":send_byte, | |
"short":send_short, | |
"int":send_int, | |
"float":send_float} #Name => function | |
def read_byte(self): | |
return self.sock.read(1) | |
def read_short(self): | |
return self.shortpacker.unpack(self.sock.recv(2))[0] | |
def read_int(self): | |
return self.intpacker.unpack(self.sock.recv(4))[0] | |
def read_float(self): | |
return self.floatpacker.unpack(self.sock.recv(4))[0] | |
readers={IDs["byte"]:read_byte, | |
IDs["short"]:read_short, | |
IDs["int"]:read_int, | |
IDs["float"]:read_float} #ID => function | |
def close(self): | |
self.sock.close() | |
def send_raw(self, data): | |
self.sock.send(data) | |
def read_raw(self, l): | |
return self.sock.recv(l) | |
def __enter__(self): | |
return self | |
def __exit__(self, type, value, traceback): | |
pass | |
Sc=SocketConnector |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment