Created
March 13, 2014 13:19
-
-
Save devunt/9528292 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
from threading import Thread | |
import subprocess | |
import logging | |
import os | |
SCREEN_LOCK = ["sudo", "-u", "devunt", "/usr/bin/gnome-screensaver-command", "-a"] | |
SCREEN_UNLOCK = ["sudo", "-u", "devunt", "/usr/bin/gnome-screensaver-command", "-d"] | |
HCITOOL = "/usr/local/bin/hcitool" | |
HCIDUMP = "/usr/local/bin/hcidump" | |
MAC_ADDR = "C9:C5:D6:14:51:C7" | |
class LEScanThread(Thread): | |
def __init__(self): | |
self.cmd = [HCITOOL, "lescan"] | |
Thread.__init__(self) | |
def run(self): | |
while 1: | |
with open(os.devnull, "w") as f: | |
self.p = subprocess.Popen(self.cmd, | |
stdout=f, | |
stderr=f) | |
self.p.wait() | |
def abort(self): | |
self.p.termiate() | |
class BTMonThread(Thread): | |
def __init__(self): | |
self.cmd = [HCIDUMP] | |
Thread.__init__(self) | |
def run(self): | |
flag = False | |
locked = False | |
cnt = 0 | |
p = subprocess.Popen(self.cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT) | |
for line in iter(p.stdout.readline, b''): | |
line = p.stdout.readline().strip() | |
if line == 'bdaddr %s (Random)' % MAC_ADDR: | |
flag = True | |
elif flag and line.startswith('RSSI: '): | |
flag = False | |
rssi = int(line[6:]) | |
logging.debug('RSSI: %d' % rssi) | |
if locked: | |
if rssi >= -80: cnt += 1 | |
elif cnt > 0: cnt = 0 | |
if cnt >= 5: | |
cnt = 0 | |
locked = False | |
subprocess.Popen(SCREEN_UNLOCK) | |
logging.info('unlock()') | |
else: | |
if rssi < -80: cnt += 1 | |
elif cnt > 0: cnt = 0 | |
if cnt >= 5: | |
cnt = 0 | |
locked = True | |
subprocess.Popen(SCREEN_LOCK) | |
logging.info('lock()') | |
if __name__ == "__main__": | |
logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s] {%(levelname)s} %(message)s') | |
lescan_th = LEScanThread() | |
lescan_th.daemon = True | |
lescan_th.start() | |
btmon_th = BTMonThread() | |
try: | |
btmon_th.start() | |
except KeyboardInterrupt: | |
logging.critical('abort!') | |
lescan_th.abort() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment