Skip to content

Instantly share code, notes, and snippets.

@sirex
Last active May 19, 2018 06:42
Show Gist options
  • Save sirex/e6446add852758da8b56fc0600c7b4bc to your computer and use it in GitHub Desktop.
Save sirex/e6446add852758da8b56fc0600c7b4bc to your computer and use it in GitHub Desktop.
Šviesoforas su asyncio
from sanic import Sanic
from sanic.response import json
from sviesoforas import TerminaloSviesoforas, SviesoforoValdiklis
app = Sanic()
stream = open('sviesoforas.log', 'w')
valdiklis = SviesoforoValdiklis(TerminaloSviesoforas, stream)
@app.route("/")
def index(request):
return json({
'programa': valdiklis.programa,
})
@app.route("/<programa>")
def perjungti(request, programa):
if programa in valdiklis.programos:
valdiklis.perjungti(programa)
return json({
'programa': programa,
})
else:
return json({
'klaida': "Nežinoma programa {programa!r}.",
})
@app.listener('after_server_start')
def paleisti_valdikli(app, loop):
loop.create_task(valdiklis.paleisti())
if __name__ == '__main__':
try:
app.run(host='0.0.0.0', port=8000)
finally:
stream.close()
# python==3.6.5
aiofiles==0.3.2
httptools==0.0.11
sanic==0.7.0
ujson==1.35
uvloop==0.9.1
websockets==4.0.1
import asyncio
class BazinisSviesoforas:
def sviesk(self, r, g, z):
pass
class TerminaloSviesoforas(BazinisSviesoforas):
def __init__(self, stream):
self.stream = stream
def sviesk(self, programa, i, r, g, z):
self.stream.write(f'{programa}: #{i} [{r}, {g}, {z}]\n')
self.stream.flush()
class SviesoforoValdiklis:
programos = {
'veikia': [
[(1, 0, 0), (0, 0, 1)],
[(1, 1, 0), (0, 1, 0)],
[(0, 0, 1), (1, 0, 0)],
[(0, 1, 0), (1, 1, 0)],
],
'neveikia': [
[(0, 0, 0), (0, 0, 0)],
],
'naktis': [
[(0, 1, 0), (0, 1, 0)],
[(0, 0, 0), (0, 0, 0)],
],
'svente': [
[(1, 1, 1), (1, 1, 1)],
]
}
def __init__(self, Sviesoforas, *args, **kwargs):
self.programa = 'neveikia'
self.sviesoforai = (
Sviesoforas(*args, **kwargs),
Sviesoforas(*args, **kwargs),
)
def perjungti(self, programa):
self.programa = programa
async def paleisti(self):
while True:
programa = self.programa
for i, sviesos in enumerate(self.programos[programa]):
for sviesoforas, (r, g, z) in zip(self.sviesoforai, sviesos):
sviesoforas.sviesk(programa, i, r, g, z)
await asyncio.sleep(1)
# Programa buvo perjungta, todėl nutraukiam prieš tai buvusios
# programos vykdymą.
if programa != self.programa:
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment