Created
July 17, 2023 15:10
-
-
Save lachesis/2e481f695c21de0c3f6d5a67e6461f23 to your computer and use it in GitHub Desktop.
python client for https://github.com/losvedir/ephemeral2, just playing with websockets
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python2 | |
import collections | |
import hashlib | |
import json | |
import logging | |
import os | |
import time | |
import websocket | |
MAX_HEARTBEAT_TIME = 30 | |
class Heartbeatable(object): | |
def __init__(self, func, rate): | |
self.func = func | |
self.rate = rate | |
self.last_call = None | |
def call(self): | |
if self.last_call is None or time.time() - self.last_call > self.rate: | |
self.func() | |
self.last_call = time.time() | |
class PhoenixState(object): | |
"""An object that tracks the state of multiple phoenix channels.""" | |
def __init__(self): | |
#self.messages = collections.defaultdict(lambda *args: []) # map of topics (str) to their messages (list) | |
self.callbacks = collections.defaultdict(lambda *args: []) # map of topics (str) to their message callbacks | |
self.heartbeats = {} | |
self.register_heartbeat(self._phoenix_heartbeat, MAX_HEARTBEAT_TIME) | |
def _generate_message(self, topic, event, payload=None): | |
if payload is None: | |
payload = {} | |
return { | |
'topic': topic, | |
'event': event, | |
'payload': payload | |
} | |
def process_message(self, message): | |
"""Call this with a dictionary for message to process that message.""" | |
topic = message['topic'] | |
#self.messages[topic].append(message) | |
for callback in self.callbacks[topic]: | |
callback(message) | |
def _phoenix_heartbeat(self): | |
self.send("phoenix", "heartbeat") | |
def register_callback(self, topic, callback): | |
"""Register a callback for a given topic.""" | |
self.callbacks[topic].append(callback) | |
def register_heartbeat(self, func, rate): | |
self.heartbeats[func] = Heartbeatable(func, rate) | |
def deregister_heartbeat(self, func): | |
del self.heartbeats[func] | |
def heartbeat(self): | |
"""Call this method at least once every 30 seconds to heartbeat the connection.""" | |
for hb in self.heartbeats.values(): | |
hb.call() | |
def join(self, topic, callback): | |
"""Join a topic (and register a callback)""" | |
self.register_callback(topic, callback) | |
self.raw_send(self._generate_message( | |
topic=topic, | |
event='phx_join' | |
)) | |
def leave(self, topic): | |
"""Leave a topic (and deregister all callbacks)""" | |
self.raw_send(self._generate_message( | |
topic=topic, | |
event='phx_leave' | |
)) | |
self.callbacks[topic] = [] | |
def send(self, topic, event, payload=None): | |
"""Send a message to a topic.""" | |
self.raw_send(self._generate_message(topic, event, payload)) | |
class WSClient(object): | |
def __init__(self, url): | |
self.url = url | |
self.conn = None | |
self.ref = 1 | |
def __del__(self): | |
try: self.conn.close() | |
except Exception: pass | |
def raw_connect(self, on_open=None): | |
self.conn = websocket.WebSocketApp(self.url, | |
on_message=self.raw_process_message, | |
on_ping=self.raw_pingpong, | |
on_pong=self.raw_pingpong, | |
on_open=on_open | |
) | |
def raw_send(self, body): | |
if 'ref' not in body: | |
body['ref'] = self.ref | |
self.ref += 1 | |
bstr = json.dumps(body) | |
logging.debug("SEND %r", bstr) | |
self.conn.send(bstr) | |
def raw_pingpong(self, *args, **kwargs): | |
self.heartbeat() | |
def raw_process_message(self, ws, message): | |
logging.debug("GOT %r", message) | |
self.heartbeat() | |
self.process_message(json.loads(message)) | |
def raw_run_forever(self): | |
self.conn.run_forever(ping_interval=MAX_HEARTBEAT_TIME) | |
class WSPhoenix(PhoenixState, WSClient): | |
def __init__(self, url): | |
WSClient.__init__(self, url) | |
PhoenixState.__init__(self) | |
class EphemeralWS(object): | |
def __init__(self): | |
self.client = WSPhoenix("ws://ephemeralp2p.durazo.us/ws") | |
def get_content(self, chash, callback): | |
"""Get the given hash's content from EphemeralP2P (callback)""" | |
topic = 'want:' + chash | |
def ask_func(): | |
# Ask for the content to be sent to us | |
self.client.send(topic, 'content_request', {'hash': chash}) | |
def want_cb(message): | |
if message.get('payload', {}).get('content'): | |
# Deregister the ask func | |
self.client.deregister_heartbeat(ask_func) | |
# Call our callback | |
callback(message['payload']['content']) | |
# Leave the topic | |
self.client.leave(topic) | |
# Join the topic | |
self.client.join(topic, want_cb) | |
# Register the ask func to be retried at heartbeat | |
self.client.register_heartbeat(ask_func, 5) | |
# Heartbeat | |
self.client.heartbeat() | |
def serve_content(self, chash, content): | |
"""Register to serve content.""" | |
topic = 'have:' + chash | |
def have_cb(message): | |
if message['event'] == 'content_request': | |
logging.info('serving content for %s', chash) | |
self.client.send(topic, 'content', {'content': content, 'hash': chash}) | |
self.client.join(topic, have_cb) | |
def get_and_serve_content(self, chash, callback=None): | |
"""Fetch, then serve a hash.""" | |
def got_func(content): | |
logging.info('got content for hash %s (len: %d bytes)', chash, len(content)) | |
if callback: | |
callback(content) | |
self.serve_content(chash, content) | |
self.get_content(chash, got_func) | |
def connect(self, on_open): | |
self.client.raw_connect(on_open) | |
self.client.raw_run_forever() | |
class CachingEphemeralServer(object): | |
def __init__(self): | |
self.client = EphemeralWS() | |
self.cache_dir = '.ephem_cache' | |
def connect(self, on_open): | |
try: | |
os.mkdir(self.cache_dir) | |
except Exception: | |
pass | |
self.client.connect(on_open) | |
def serve(self, chash): | |
"""Serve a given hash. | |
If the content is available in the cache dir, get it from there. | |
If it's not, fetch it from the server, cache it, and serve. | |
""" | |
cache_path = os.path.join(self.cache_dir, chash) | |
if os.path.exists(cache_path): | |
with open(cache_path, 'r') as inp: | |
content = inp.read() | |
# Make sure the hash matches | |
calc_hash = hashlib.sha256(content).hexdigest() | |
if calc_hash != chash: | |
raise ValueError("CACHED: Calculated hash of %s not equal to passed in hash of %s" % (calc_hash, chash)) | |
logging.info('serving hash %s from cache...', chash) | |
# Serve! | |
self.client.serve_content(chash, content) | |
else: | |
def callback(content): | |
# Make sure the hash matches | |
calc_hash = hashlib.sha256(content).hexdigest() | |
if calc_hash != chash: | |
raise ValueError("NET: Calculated hash of %s not equal to passed in hash of %s" % (calc_hash, chash)) | |
logging.info('writing hash %s to cache...', chash) | |
# Write it out to the cache | |
with open(cache_path, 'w') as out: | |
out.write(content) | |
# Fetch the content from the server (p2p network) and serve it | |
self.client.get_and_serve_content(chash, callback) | |
def basic_test(): | |
# enable logging | |
logging.basicConfig() | |
client = EphemeralWS() | |
def on_open(*a, **k): | |
# Fetch, then start serving the "test123" doc | |
client.get_and_serve_content('ecd71870d1963316a97e3ac3408c9835ad8cf0f3c1bc703527c30265534f75ae') | |
# Serve (without fetching) the "ABCdef" doc | |
client.serve_content('057e5833fca53ae19901247bd5e68039100561b8535f346dff7d6a4dcc7bf996', 'ABCdef') | |
# Fetch and serve the intro doc | |
client.get_and_serve_content('2bbbf21959178ef2f935e90fc60e5b6e368d27514fe305ca7dcecc32c0134838') | |
client.connect(on_open) | |
# start mainloop | |
def caching_test(): | |
# enable logging | |
logging.basicConfig(format="[%(asctime)s] %(message)s") | |
logging.getLogger().setLevel(logging.INFO) | |
client = CachingEphemeralServer() | |
HASHES = [ | |
'ecd71870d1963316a97e3ac3408c9835ad8cf0f3c1bc703527c30265534f75ae', # "test123" | |
'057e5833fca53ae19901247bd5e68039100561b8535f346dff7d6a4dcc7bf996', # "ABCdef" | |
'2bbbf21959178ef2f935e90fc60e5b6e368d27514fe305ca7dcecc32c0134838', # the intro doc for the service | |
] | |
def on_open(*a, **k): | |
for chash in HASHES: | |
client.serve(chash) | |
# Connect | |
client.connect(on_open) | |
if __name__ == '__main__': | |
caching_test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment