Skip to content

Instantly share code, notes, and snippets.

@lispandfound
Created July 5, 2017 09:41
Show Gist options
  • Save lispandfound/bf2246cd9721a525cbef09ee8caa4dbe to your computer and use it in GitHub Desktop.
Save lispandfound/bf2246cd9721a525cbef09ee8caa4dbe to your computer and use it in GitHub Desktop.
''' Control and run RGB Fedora server. '''
# Asynchronous loop code
import uasyncio as asyncio
# Controlling RGB strip
import neopixel
# Controlling pin out
import machine
# Connecting to Wireless Network
import network
# Communicating over sockets
import socket
# Sleeping
import time
def connect(ssid, password):
''' Connect to internet SSID with supplied password.
Returns device ip.
>>> connect('LHS_BYOD', 'password')
'192.168.123'
'''
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, password)
while wlan.isconnected() is False:
time.sleep(1)
ip = wlan.ifconfig()[0]
return ip
class Server:
''' Server manages the state of an asynchronous UDP server.
The server runs a couple of asynchronous co-routines, the server that
listens for incoming UDP connections, and step_server, which handles the
received data. '''
def __init__(self, loop, ip, port, handler):
''' Initialize Server instance. '''
self.loop = loop
self.data = None
self.ip = ip
self.port = port
self.handler = handler
async def udp_server(self):
''' UDP Server co-routine, waits for an incoming connection and reads from the socket. '''
# Create a socket with the following arguments:
# - socket.AF_INET :: Sets the socket to be addressable using the familiar ip system
# e.g 192.168.0.1, www.google.com, etc.
# - socket.SOCK_DGRAM :: Documentation unclear on this one, however is
# recommended to be set for most usage.
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Tell the kernel that it should reuse the socket rather than waiting
# for it to expire. Only really affects the program if it is run
# multiple times in quick succession.
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((self.ip, self.port))
print('Server running at', self.ip, self.port)
stream_reader = asyncio.StreamReader(s)
while True:
# The await call will cause the server to handle off control to the
# rest of the program until someone tries to send data to it.
data = await stream_reader.read(64)
self.data = data
async def step_server(self):
''' step_server takes received data and passes it off to the client
handler. '''
while True:
await asyncio.sleep(1)
if self.data is not None:
self.handler.step(data=self.data)
self.data = None
else:
self.handler.step()
def run(self):
''' Start the server instance. '''
self.loop.call_soon(self.step_server())
self.loop.run_until_complete(self.udp_server())
def dummy_profile(strip):
''' Some silly dummy profile. '''
print('Running dummy profile')
def another_profile(strip):
''' Another test profile. '''
print('Running a different profile')
# Maps names of profiles to profile functions that are run by the client
# handler.
PROFILE_MAP = {
'dummy_profile': dummy_profile,
'another_profile': another_profile
}
def parse_command(data):
''' Parse command and argument from a byteslike object.
>>> parse_command(b'SETP test_profile')
('SETP', 'test_profile')
'''
data = str(data, 'ascii').strip()
command_end = data.find(' ')
command = data[:command_end]
arg = data[command_end + 1:]
return command, arg
class ClientHandler:
''' ClientHandler deals with UDP requests passed to it by the server. '''
def __init__(self):
''' Initialize ClientHandler instance. '''
self.data = None
self.current_profile = None
self.strip = neopixel.NeoPixel(machine.Pin(5), 30)
def run_command(self, data):
''' Run a command (command is parsed from data). '''
command, arg = parse_command(data)
if command == 'SETP':
profile = PROFILE_MAP.get(arg, None)
if profile is not None:
self.current_profile = profile
self.current_profile(self.strip)
def step(self, data=None):
''' Receive data from server, running command if new data is received. '''
data = data or self.data
if data is not None:
self.run_command(data)
elif self.current_profile is not None:
self.current_profile(self.strip)
def run_server():
''' Run a server instance with ClientHandler. '''
loop = asyncio.get_event_loop()
client_handler = ClientHandler()
server = Server(loop, '0.0.0.0', 8081, client_handler)
loop.run_until_complete(server.run())
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment