Skip to content

Instantly share code, notes, and snippets.

@williballenthin
Last active September 21, 2015 14:11
Show Gist options
  • Select an option

  • Save williballenthin/20c429aa1284d1c3edce to your computer and use it in GitHub Desktop.

Select an option

Save williballenthin/20c429aa1284d1c3edce to your computer and use it in GitHub Desktop.
Test client and server to demo Vivisect Synapse
import synapse.link as s_link
import synapse.daemon as s_daemon
import synapse.telepath as s_tele
def log(*args, **kwargs):
print("log %s %s" % (args, kwargs))
def main():
print("starting up")
link = s_link.chopLinkUrl("tcp://127.0.0.1:8081/?zerosig=1")
relay = s_link.initLinkRelay(link)
client = relay.initLinkClient()
log(client.sendAndRecv("testmsg", body="msgbody"))
a = s_tele.getProxy("tcp://127.0.0.1:8081/adder?zerosig=1")
log(a.add(1, 3))
client.fini()
if __name__ == "__main__":
main()
import time
import synapse.common as s_common
import synapse.link as s_link
import synapse.daemon as s_daemon
def log(*args, **kwargs):
print("log %s %s" % (args, kwargs))
def main():
print("starting up")
link = s_link.chopLinkUrl("tcp://127.0.0.1:8081/?zerosig=1")
daemon = s_daemon.Daemon()
daemon.on("link:sock:init", log)
daemon.on("link:sock:mesg", log)
daemon.on("link:sock:fini", log)
daemon.on("auth:info", log)
def handle_testmsg(sock, msg):
log(msg)
sock.fireobj("testresp", body="rspbody")
daemon.setMesgMeth("testmsg", handle_testmsg)
class Adder(object):
def add(self, a, b):
return a + b
daemon.addSharedObject("adder", Adder())
daemon.runLinkServer(link)
while True:
time.sleep(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment