Created
July 5, 2017 09:41
-
-
Save lispandfound/bf2246cd9721a525cbef09ee8caa4dbe to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' Control and run RGB Fedora server. ''' | |
# Asynchronous loop code | |
import uasyncio as asyncio | |
# Controlling RGB strip | |
import neopixel | |
# Controlling pin out | |
import machine | |
# Connecting to Wireless Network | |
import network | |
# Communicating over sockets | |
import socket | |
# Sleeping | |
import time | |
def connect(ssid, password): | |
''' Connect to internet SSID with supplied password. | |
Returns device ip. | |
>>> connect('LHS_BYOD', 'password') | |
'192.168.123' | |
''' | |
wlan = network.WLAN(network.STA_IF) | |
wlan.active(True) | |
wlan.connect(ssid, password) | |
while wlan.isconnected() is False: | |
time.sleep(1) | |
ip = wlan.ifconfig()[0] | |
return ip | |
class Server: | |
''' Server manages the state of an asynchronous UDP server. | |
The server runs a couple of asynchronous co-routines, the server that | |
listens for incoming UDP connections, and step_server, which handles the | |
received data. ''' | |
def __init__(self, loop, ip, port, handler): | |
''' Initialize Server instance. ''' | |
self.loop = loop | |
self.data = None | |
self.ip = ip | |
self.port = port | |
self.handler = handler | |
async def udp_server(self): | |
''' UDP Server co-routine, waits for an incoming connection and reads from the socket. ''' | |
# Create a socket with the following arguments: | |
# - socket.AF_INET :: Sets the socket to be addressable using the familiar ip system | |
# e.g 192.168.0.1, www.google.com, etc. | |
# - socket.SOCK_DGRAM :: Documentation unclear on this one, however is | |
# recommended to be set for most usage. | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
# Tell the kernel that it should reuse the socket rather than waiting | |
# for it to expire. Only really affects the program if it is run | |
# multiple times in quick succession. | |
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
s.bind((self.ip, self.port)) | |
print('Server running at', self.ip, self.port) | |
stream_reader = asyncio.StreamReader(s) | |
while True: | |
# The await call will cause the server to handle off control to the | |
# rest of the program until someone tries to send data to it. | |
data = await stream_reader.read(64) | |
self.data = data | |
async def step_server(self): | |
''' step_server takes received data and passes it off to the client | |
handler. ''' | |
while True: | |
await asyncio.sleep(1) | |
if self.data is not None: | |
self.handler.step(data=self.data) | |
self.data = None | |
else: | |
self.handler.step() | |
def run(self): | |
''' Start the server instance. ''' | |
self.loop.call_soon(self.step_server()) | |
self.loop.run_until_complete(self.udp_server()) | |
def dummy_profile(strip): | |
''' Some silly dummy profile. ''' | |
print('Running dummy profile') | |
def another_profile(strip): | |
''' Another test profile. ''' | |
print('Running a different profile') | |
# Maps names of profiles to profile functions that are run by the client | |
# handler. | |
PROFILE_MAP = { | |
'dummy_profile': dummy_profile, | |
'another_profile': another_profile | |
} | |
def parse_command(data): | |
''' Parse command and argument from a byteslike object. | |
>>> parse_command(b'SETP test_profile') | |
('SETP', 'test_profile') | |
''' | |
data = str(data, 'ascii').strip() | |
command_end = data.find(' ') | |
command = data[:command_end] | |
arg = data[command_end + 1:] | |
return command, arg | |
class ClientHandler: | |
''' ClientHandler deals with UDP requests passed to it by the server. ''' | |
def __init__(self): | |
''' Initialize ClientHandler instance. ''' | |
self.data = None | |
self.current_profile = None | |
self.strip = neopixel.NeoPixel(machine.Pin(5), 30) | |
def run_command(self, data): | |
''' Run a command (command is parsed from data). ''' | |
command, arg = parse_command(data) | |
if command == 'SETP': | |
profile = PROFILE_MAP.get(arg, None) | |
if profile is not None: | |
self.current_profile = profile | |
self.current_profile(self.strip) | |
def step(self, data=None): | |
''' Receive data from server, running command if new data is received. ''' | |
data = data or self.data | |
if data is not None: | |
self.run_command(data) | |
elif self.current_profile is not None: | |
self.current_profile(self.strip) | |
def run_server(): | |
''' Run a server instance with ClientHandler. ''' | |
loop = asyncio.get_event_loop() | |
client_handler = ClientHandler() | |
server = Server(loop, '0.0.0.0', 8081, client_handler) | |
loop.run_until_complete(server.run()) | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment