Created
June 3, 2018 00:37
-
-
Save indiv0/cd1f124184dd263a036044e3257a3c8f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from curio import run | |
from curio.socket import * | |
import os | |
import sys | |
from common import send_message, read_message, b8zs_encode | |
DEFAULT_HOST = '127.0.0.1' | |
DEFAULT_PORT = 8080 | |
async def b8zs_client(address, b8zs_str): | |
sock = socket(AF_INET, SOCK_STREAM) | |
try: | |
await sock.connect(address) | |
except ConnectionRefusedError: | |
print('Connection refused', address) | |
return | |
print('Connected at', address) | |
s = sock.as_stream() | |
async with s: | |
print("Sending request-to send") | |
await send_message(s, b'request-to send') | |
message = await read_message(s) | |
if not message: | |
return | |
elif message == b'clear-to send': | |
print('Received clear-to send') | |
await send_message(s, (b8zs_str + '\n').encode()) | |
else: | |
raise ValueError('Invalid message:', message) | |
message = await read_message(s) | |
if not message: | |
return | |
elif message == b'acknowledged': | |
print('Received acknowledge') | |
else: | |
raise ValueError('Invalid message:', message) | |
await s.close() | |
print('Connection closed') | |
if __name__ == "__main__": | |
host = os.environ.get('HOST', DEFAULT_HOST) | |
port = int(os.environ.get('PORT', DEFAULT_PORT)) | |
try: | |
data = sys.argv[1] | |
except IndexError: | |
print('Please input a string of zeroes and ones') | |
sys.exit(1) | |
try: | |
b8zs_str = b8zs_encode(data) | |
except Exception as e: | |
print('Exception:', e) | |
sys.exit(1) | |
run(b8zs_client, (host, port), b8zs_str) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
from enum import Enum | |
BINARY_STR_REGEX = re.compile(r'^([01]+)$') | |
B8ZS_STR_REGEX = re.compile(r'^([0+-]+)$') | |
async def read_message(stream): | |
return (await stream.readline()).strip() | |
async def send_message(stream, message): | |
await stream.write(message + b'\n') | |
class Polarity(Enum): | |
Negative = -1 | |
Positive = 1 | |
def is_binary_str(string): | |
return bool(BINARY_STR_REGEX.search(string)) | |
def is_b8zs_str(string): | |
return bool(B8ZS_STR_REGEX.search(string)) | |
def b8zs_decode(string): | |
if not is_b8zs_str(string): | |
raise ValueError('Invalid B8ZS bitstring') | |
return string \ | |
.replace("000-+0+-", "00000000") \ | |
.replace("000+-0-+", "00000000") \ | |
.replace("-", "1") \ | |
.replace("+", "1") | |
def b8zs_encode(string): | |
if not is_binary_str(string): | |
raise ValueError('Invalid bitstring:', string) | |
last_polarity = Polarity.Negative | |
consecutive_zeroes = 0 | |
s = "" | |
for i in range(0, len(string)): | |
c = string[i] | |
if c == "1": | |
consecutive_zeroes = 0 | |
if last_polarity is Polarity.Negative: | |
s += "+" | |
last_polarity = Polarity.Positive | |
else: | |
s += "-" | |
last_polarity = Polarity.Negative | |
elif c == "0": | |
s += "0" | |
consecutive_zeroes += 1 | |
# An octet of all zeroes is encoded as "000+-0-+" if the last | |
# voltage pulse preceding was positive. | |
# An octet of all zeroes is encoded as "000-+0+-" if the last | |
# voltage pulse preceding was negative. | |
if consecutive_zeroes == 8: | |
if last_polarity is Polarity.Positive: | |
s = s[:-8] + "000+-0-+" | |
else: | |
s = s[:-8] + "000-+0+-" | |
consecutive_zeroes = 0 | |
else: | |
raise ValueError("Unexpected character found in binary string") | |
return s |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from curio import run, spawn | |
from curio.socket import * | |
import os | |
from common import send_message, read_message, b8zs_decode | |
DEFAULT_HOST = '127.0.0.1' | |
DEFAULT_PORT = 8080 | |
DEFAULT_MAX_CONNECT_REQUESTS = 5 | |
async def b8zs_server(address, max_connect_requests): | |
sock = socket(AF_INET, SOCK_STREAM) | |
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) | |
sock.bind(address) | |
sock.listen(max_connect_requests) | |
print('Server listening at', address) | |
async with sock: | |
while True: | |
client, addr = await sock.accept() | |
print('Connection from', addr) | |
server = B8ZSServer(client, addr) | |
await spawn(server.run(), daemon=True) | |
class B8ZSServer: | |
"""TODO""" | |
def __init__(self, sock, addr): | |
self.sock = sock | |
self.addr = addr | |
async def run(self): | |
s = self.sock.as_stream() | |
async with s: | |
message = await read_message(s) | |
if not message: | |
return | |
elif message == b'request-to send': | |
print('Received request-to send') | |
await send_message(s, b'clear-to send') | |
else: | |
raise ValueError('Invalid message:', message) | |
message = await read_message(s) | |
if not message: | |
return | |
encoded = message.decode() | |
print("Encoded data:", encoded) | |
try: | |
decoded = b8zs_decode(encoded) | |
except ValueError: | |
print('Invalid data:', encoded) | |
return | |
print("Decoded data:", decoded) | |
await send_message(s, b"acknowledged") | |
print('Connection closed') | |
await s.close() | |
if __name__ == "__main__": | |
host = os.environ.get('HOST', DEFAULT_HOST) | |
port = int(os.environ.get('PORT', DEFAULT_PORT)) | |
max_connect_requests = os.environ.get('MAX_CONNECT_REQUESTS', DEFAULT_MAX_CONNECT_REQUESTS) | |
run(b8zs_server, (host, port), max_connect_requests) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment