|
from collections import MutableMapping, namedtuple |
|
|
|
|
|
Message = namedtuple("Message", 'word, obj') |
|
|
|
|
|
class ExecNetSharedStorageClient(MutableMapping): |
|
def __init__(self, channel): |
|
super().__init__() |
|
self.ch = channel |
|
|
|
def _call(self, word, *args, **kwargs): |
|
self.ch.send((word, (args, kwargs))) |
|
response = Message(*self.ch.receive()) |
|
if response.word == word: |
|
return response.obj |
|
elif response.word == 'KeyError': |
|
raise KeyError(*response.obj) |
|
elif response.word == 'ValueError': |
|
raise ValueError(*response.obj) |
|
elif response.word == 'Exception': |
|
raise Exception(*response.obj) |
|
raise RuntimeError("Unexpected response: {!r}".format(response)) |
|
|
|
def __delitem__(self, key): |
|
return self._call('__delitem__', key) |
|
|
|
def __getitem__(self, key): |
|
return self._call('__getitem__', key) |
|
|
|
def __iter__(self): |
|
return iter(self._call('__iter__')) |
|
|
|
def __len__(self): |
|
return self._call('__len__') |
|
|
|
def __setitem__(self, key, value): |
|
return self._call('__setitem__', key, value) |
|
|
|
|
|
class ExecNetSharedStorageServer(dict): |
|
permitted_methods = {'__delitem__', '__getitem__', '__iter__', '__len__', '__setitem__'} |
|
def __init__(self, channel, *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
self.ch = channel |
|
|
|
def listen(self): |
|
self.ch.setcallback(self._callback) |
|
|
|
def check(self, timeout=0): |
|
message = self.ch.receive(timeout) |
|
return self._callback(message) |
|
|
|
def _callback(self, message): |
|
word, obj = message |
|
if word in self.permitted_methods: |
|
func = getattr(self, word) |
|
args, kwargs = obj |
|
try: |
|
value = func(*args, **kwargs) |
|
if word == '__iter__': |
|
value = tuple(value) |
|
self.ch.send((word, value)) |
|
except (KeyError, ValueError) as err: |
|
self.ch.send((type(err).__name__, err.args)) |
|
else: |
|
ch.send(('Exception', ('not permitted', (word, obj)))) |
|
|
|
def cycle(self, forever=False): |
|
# continue indefinitely |
|
if forever: |
|
while forever: |
|
self.check(timeout=None) |
|
# if not forever, only finish pending items |
|
while True: |
|
try: |
|
self.check() |
|
except self.ch.TimeoutError: |
|
break |
|
|
|
|
|
if __name__ == "__channelexec__": |
|
# this module was imported and then run via gw.remote_exec(module) |
|
# this is only done for testing. We'll act as the Mapping server. |
|
while True: |
|
servers = [] |
|
message = Message(*channel.receive()) |
|
if message.word == 'exit': |
|
exit() |
|
if message.word == 'new': |
|
server = ExecNetSharedStorageServer(message.obj) |
|
server.listen() |
|
servers.append(server) |
|
|
|
|
|
def test_client_server(gw, channel): |
|
client = ExecNetSharedStorageClient(gw.newchannel()) |
|
channel.send(('new', client.ch)) |
|
return client |