Created
August 31, 2021 05:37
-
-
Save madmo/18ff6e67a6974e4f109c5730ba73437b to your computer and use it in GitHub Desktop.
Simple webcam on/off script used to control a onair sign with tasmota firmware
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import pyinotify | |
import asyncio | |
import aiohttp | |
import sys | |
def debounce(wait): | |
def decorator(fn): | |
def debounced(*args, **kwargs): | |
async def call_it(): | |
await asyncio.sleep(wait) | |
fn(*args, **kwargs) | |
try: | |
debounced.t.cancel() | |
except (AttributeError): | |
pass | |
debounced.t = asyncio.ensure_future(call_it()) | |
return debounced | |
return decorator | |
class WebcamWatch: | |
def __init__(self, loop, device, host): | |
self.watch_manager = pyinotify.WatchManager() | |
self.watch_manager.add_watch( | |
device, | |
pyinotify.IN_OPEN | pyinotify.IN_CLOSE_NOWRITE | pyinotify.IN_CLOSE_WRITE, | |
) | |
self.handler = WebcamWatch.EventHandler(self.notify) | |
self.notifier = pyinotify.AsyncioNotifier( | |
self.watch_manager, loop, default_proc_fun=self.handler | |
) | |
self.host = host | |
def __del__(self): | |
self.notifier.stop() | |
class EventHandler(pyinotify.ProcessEvent): | |
def __init__(self, notify, *args, **kwargs): | |
self.count = 0 | |
self.notify = notify | |
self.active = False | |
super().__init__(*args, **kwargs) | |
@debounce(1) | |
def _notify(self): | |
asyncio.ensure_future(self.notify(self.active)) | |
def process_IN_OPEN(self, event): | |
self.count = self.count + 1 | |
if self.active == False: | |
self.active = True | |
self._notify() | |
def process_IN_CLOSE(self, event): | |
self.count = self.count - 1 | |
if self.count < 0: | |
self.count = 0 | |
if self.active and self.count == 0: | |
self.active = False | |
self._notify() | |
async def notify(self, isActive): | |
session = aiohttp.ClientSession() | |
params = None | |
if isActive: | |
params = {"cmnd" : "Power On"} | |
else: | |
params = {"cmnd" : "Power Off"} | |
async with session.get("http://{}/cm".format(self.host), params=params) as resp: | |
await resp.text() | |
await session.close() | |
loop = asyncio.get_event_loop() | |
watch = WebcamWatch(loop, "/dev/video0", sys.argv[1]) | |
loop.run_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment