Last active
May 19, 2018 06:42
-
-
Save sirex/e6446add852758da8b56fc0600c7b4bc to your computer and use it in GitHub Desktop.
Šviesoforas su asyncio
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sanic import Sanic | |
from sanic.response import json | |
from sviesoforas import TerminaloSviesoforas, SviesoforoValdiklis | |
app = Sanic() | |
stream = open('sviesoforas.log', 'w') | |
valdiklis = SviesoforoValdiklis(TerminaloSviesoforas, stream) | |
@app.route("/") | |
def index(request): | |
return json({ | |
'programa': valdiklis.programa, | |
}) | |
@app.route("/<programa>") | |
def perjungti(request, programa): | |
if programa in valdiklis.programos: | |
valdiklis.perjungti(programa) | |
return json({ | |
'programa': programa, | |
}) | |
else: | |
return json({ | |
'klaida': "Nežinoma programa {programa!r}.", | |
}) | |
@app.listener('after_server_start') | |
def paleisti_valdikli(app, loop): | |
loop.create_task(valdiklis.paleisti()) | |
if __name__ == '__main__': | |
try: | |
app.run(host='0.0.0.0', port=8000) | |
finally: | |
stream.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# python==3.6.5 | |
aiofiles==0.3.2 | |
httptools==0.0.11 | |
sanic==0.7.0 | |
ujson==1.35 | |
uvloop==0.9.1 | |
websockets==4.0.1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
class BazinisSviesoforas: | |
def sviesk(self, r, g, z): | |
pass | |
class TerminaloSviesoforas(BazinisSviesoforas): | |
def __init__(self, stream): | |
self.stream = stream | |
def sviesk(self, programa, i, r, g, z): | |
self.stream.write(f'{programa}: #{i} [{r}, {g}, {z}]\n') | |
self.stream.flush() | |
class SviesoforoValdiklis: | |
programos = { | |
'veikia': [ | |
[(1, 0, 0), (0, 0, 1)], | |
[(1, 1, 0), (0, 1, 0)], | |
[(0, 0, 1), (1, 0, 0)], | |
[(0, 1, 0), (1, 1, 0)], | |
], | |
'neveikia': [ | |
[(0, 0, 0), (0, 0, 0)], | |
], | |
'naktis': [ | |
[(0, 1, 0), (0, 1, 0)], | |
[(0, 0, 0), (0, 0, 0)], | |
], | |
'svente': [ | |
[(1, 1, 1), (1, 1, 1)], | |
] | |
} | |
def __init__(self, Sviesoforas, *args, **kwargs): | |
self.programa = 'neveikia' | |
self.sviesoforai = ( | |
Sviesoforas(*args, **kwargs), | |
Sviesoforas(*args, **kwargs), | |
) | |
def perjungti(self, programa): | |
self.programa = programa | |
async def paleisti(self): | |
while True: | |
programa = self.programa | |
for i, sviesos in enumerate(self.programos[programa]): | |
for sviesoforas, (r, g, z) in zip(self.sviesoforai, sviesos): | |
sviesoforas.sviesk(programa, i, r, g, z) | |
await asyncio.sleep(1) | |
# Programa buvo perjungta, todėl nutraukiam prieš tai buvusios | |
# programos vykdymą. | |
if programa != self.programa: | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment