Skip to content

Instantly share code, notes, and snippets.

@jkent
Last active August 15, 2022 21:05
Show Gist options
  • Save jkent/8aef028414ac71ceb0ee8350eb239e38 to your computer and use it in GitHub Desktop.
Save jkent/8aef028414ac71ceb0ee8350eb239e38 to your computer and use it in GitHub Desktop.
Simple python RPC server/client
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# vim: set ts=4 et
import json
import socket
class RpcException(Exception):
pass
class RpcClient(object):
def __init__(self, addr):
self._s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._s.connect(addr)
self._input_buf = ''
def close(self):
self._s.close()
def __getattr__(self, name):
client = self
self._endpoint = name
def func(*args):
d = {'action': 'call', 'endpoint': self._endpoint, 'args': args}
self._s.send(json.dumps(d) + '\n')
buf = self._s.recv(16384)
self._input_buf += buf
if len(buf) == 0:
raise RpcException('socket closed')
line, remainder = self._input_buf.split('\n', 1)
self._input_buf = remainder
try:
response = json.loads(line)
except:
raise RpcException('json parse error')
if not isinstance(response, dict):
raise RpcException('response is not an instance of dict')
if not response.has_key('type'):
raise RpcException('response type missing')
if response['type'] == u'error':
if not response.has_key('reason'):
raise RpcException('error reponse does not have reason')
if not isinstance(response['reason'], unicode):
raise RpcException('error response reason is not unicode')
raise RpcException(response['reason'])
elif response['type'] == u'result':
if not response.has_key('result'):
raise RpcException('result missing')
return response['result']
return func
if __name__ == '__main__':
client = RpcClient(('localhost', 54321))
print client.greeting()
name = raw_input('enter a name: ')
print client.greet_me(name)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# vim: set ts=4 et
import json
import socket
from select import select
class RpcSession(object):
def __init__(self, core, s):
self.core = core
self.s = s
self.output_buf = ''
self.input_buf = ''
def fileno(self):
return self.s.fileno()
@property
def can_read(self):
return True
@property
def can_write(self):
return len(self.output_buf) > 0
def do_read(self):
buf = self.s.recv(16384)
if len(buf) == 0:
self.core.selectable.remove(self)
return
self.input_buf += buf
while True:
if not self.input_buf:
break
line, remainder = self.input_buf.split('\n', 1)
self.input_buf = remainder
try:
command = json.loads(line)
except:
self.error('json parse error')
continue
if not isinstance(command, dict):
self.error('command is not instance of dict')
continue
if not command.has_key('action'):
self.error('no action')
continue
if not isinstance(command['action'], unicode):
self.error('action is not unicode')
continue
if command['action'] == u'call':
if not command.has_key('endpoint'):
self.error('no endpoint')
continue
if not isinstance(command['endpoint'], unicode):
self.error('endpoint is not unicode')
continue
if not command.has_key('args'):
self.error('no args')
continue
if not isinstance(command['args'], list):
self.error('args is not list')
continue
self.call(command['endpoint'].decode(), *command['args'])
else:
self.error('command is not known')
continue
def do_write(self):
chunk, rest = self.output_buf[:16384], self.output_buf[16384:]
self.s.send(chunk)
self.output_buf = rest
def call(self, name, *args):
if not self.core.endpoints.has_key(name):
self.error("endpoint unknown")
return
endpoint = self.core.endpoints[name]
if len(args) != endpoint.__code__.co_argcount:
self.error("endpoint argument count mismatch")
return
try:
result = endpoint(*args)
except:
self.error("calling endpoint failed")
return
data = {'type': 'result', 'result': result}
self.output_buf += json.dumps(data) + '\n'
def error(self, reason):
data = {'type': 'error', 'reason': reason}
self.output_buf += json.dumps(data) + '\n'
class RpcServer(object):
def __init__(self, core, addr):
self.core = core
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.s.bind(addr)
self.s.listen(5)
def fileno(self):
return self.s.fileno()
@property
def can_read(self):
return True
@property
def can_write(self):
return False
def do_read(self):
conn, _ = self.s.accept()
session = RpcSession(self.core, conn)
self.core.selectable.append(session)
class RpcCore(object):
def __init__(self):
self.selectable = []
self.endpoints = {}
def listen(self, addr=('0.0.0.0', 54321)):
listener = RpcServer(self, addr)
self.selectable.append(listener)
def run(self):
self.terminate = False
while not self.terminate:
self.tick()
def tick(self):
read_objs = (obj for obj in self.selectable if obj.can_read)
write_objs = (obj for obj in self.selectable if obj.can_write)
readable, writeable, _ = select(read_objs, write_objs, [], 0.25)
for obj in readable:
obj.do_read()
for obj in writeable:
obj.do_write()
def shutdown(self, reason=''):
self.terminate = True
def export(self, f):
name = f.__name__
self.endpoints[name] = f
return f
if __name__ == '__main__':
core = RpcCore()
core.listen(('0.0.0.0', 54321))
@core.export
def greeting():
return 'hello world!'
@core.export
def greet_me(name):
return 'hello %s' % name
core.run()
@jkent
Copy link
Author

jkent commented Mar 26, 2021

hi anirey, this is python2 code, it needs conversion

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment