-
-
Save ScottJWalter/f74dadf7ff3b4eed2e6c640c4e1d6d20 to your computer and use it in GitHub Desktop.
Bluetooth scanning
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# Huge thanks to: https://github.com/blakeman399/Bluetooth-Proximity-Light/blob/master/https/github.com/blakeman399/Bluetooth-Proximity-Light.py | |
# This script scans the area looking for a list of devices with known Bluetooth | |
# MAC addresses. Once seen, a notification is immediately sent to the Bomba | |
# system. After that an occasional scan occurs to see if the device is still | |
# in the area, notifying when it is "missing" then again several minutes later | |
# when it still hasn't been seen and is certainly "gone". | |
import fcntl | |
import struct | |
import array | |
import bluetooth | |
import bluetooth._bluetooth as bt | |
import time | |
import os | |
import datetime | |
def bluetooth_rssi(addr): | |
# Open hci socket | |
hci_sock = bt.hci_open_dev() | |
hci_fd = hci_sock.fileno() | |
# Connect to device (to whatever you like) | |
# | |
# Original timeout was 10 seconds. For some reason at 5 seconds | |
# or less it goes wonky with non-present BT MACs | |
bt_sock = bluetooth.BluetoothSocket(bluetooth.L2CAP) | |
bt_sock.settimeout(3) | |
result = bt_sock.connect_ex((addr, 1)) # PSM 1 - Service Discovery | |
if result != 0: | |
return -255 # failed to connect before timeout | |
try: | |
# Get ConnInfo | |
reqstr = struct.pack("6sB17s", bt.str2ba(addr), bt.ACL_LINK, "\0" * 17) | |
request = array.array("c", reqstr ) | |
handle = fcntl.ioctl(hci_fd, bt.HCIGETCONNINFO, request, 1) | |
handle = struct.unpack("8xH14x", request.tostring())[0] | |
# Get RSSI | |
cmd_pkt=struct.pack('H', handle) | |
rssi = bt.hci_send_req(hci_sock, bt.OGF_STATUS_PARAM, | |
bt.OCF_READ_RSSI, bt.EVT_CMD_COMPLETE, 4, cmd_pkt) | |
rssi = struct.unpack('b', rssi[3])[0] | |
# Close sockets | |
bt_sock.close() | |
hci_sock.close() | |
return rssi | |
except: | |
return -255 | |
# This writes a file safely without worry of access | |
def AtomicWrite(path, text): | |
tmppath = path + ".tmp" | |
output = open(tmppath, "w") | |
output.writelines(text) | |
output.flush() | |
os.fsync(output.fileno()) | |
output.close() | |
os.rename(tmppath, path) # atomic action | |
while True: | |
mtime = 0 | |
if os.path.isfile("/media/ramdisk/user_MAC_IDs.txt"): | |
mtime = os.path.getmtime("/media/ramdisk/user_MAC_IDs.txt") | |
with open("/media/ramdisk/user_MAC_IDs.txt", "r") as f: | |
newMACs = f.read().splitlines() | |
newInArea = [] | |
newLastSeenInArea = [] | |
newRssi = [] | |
# Initialize the data structures for each MAC address. | |
# Initially set all to far away | |
for MAC in newMACs: | |
try: | |
iOldMAC = MACs.index(MAC) # this will cause exception if not found | |
newInArea.append(inArea[iOldMAC]) | |
newLastSeenInArea.append(lastSeenInArea[iOldMAC]) | |
newRssi.append(rssi[iOldMAC]) | |
except: | |
# not found in existing data structure, so add with initial values | |
newLastSeenInArea.append(0) | |
newInArea.append(False) | |
newRssi.append(-255) | |
MACs = newMACs | |
inArea = newInArea | |
rssi = newRssi | |
lastSeenInArea = newLastSeenInArea | |
while True: | |
if os.path.isfile("/media/ramdisk/user_MAC_IDs.txt") and mtime != os.path.getmtime("/media/ramdisk/user_MAC_IDs.txt"): | |
print "Reloading MACs..." | |
break # break out of the loop to load/rebuild the structures | |
print "Scanning..." | |
seen = [] # MAC is visible | |
missing = [] # MAC hasn't been seen in a bit | |
gone = [] # MAC hasn't been seen for a long time | |
for MAC in MACs: | |
idx = MACs.index(MAC) | |
# once seen, no need to rescan for device more than once | |
# a minute. | |
if inArea[idx] and time.time() - abs(lastSeenInArea[idx]) < 60: | |
continue; | |
# read new value | |
rssi[idx] = bluetooth_rssi(MAC) | |
# RSSI values have a high of zero and go down from there. I.e. | |
# higher is stronger. | |
# 0 is almost certainly in the same room | |
# Single digits (e.g. -5) is probably the same room, but I saw 0 to -6 from the room above (thru wood floors) | |
# -10 to -19 could be same room or adjacent room (thru plaster walls) | |
# -20 to -30 is probably in the house, but a distant room | |
# -30 to -150 is very distant in the house | |
# -255 is not seen at all (away from the house) | |
# Values will certainly fluctuate even if the phone doesn't move. For | |
# example, here are three subsequent readings from a CSR 4.0 module | |
# without moving the phone: | |
# Same room: 0 0 0 | |
# Room above: -3 -4 -1 | |
# Near room (plaster, stairs): -255 -19 -25 | |
# Near room (drywall, open door): -1 -6 -11 | |
# Near room (2 drywalls): -16 -11 -6 | |
# Distant room: -25 -22 -26 | |
# Very distant room: -255 -27 -255 | |
strength = rssi[idx] | |
print "Scan: [",idx,"]",MAC," strength=",strength | |
if strength > -100: | |
if not inArea[idx]: | |
inArea[idx] = True | |
lastSeenInArea[idx] = time.time() | |
seen.append(MAC + " " + str(strength)) | |
else: | |
# if not seen for 60 seconds, consider missing | |
if inArea[idx] and lastSeenInArea[idx] > 0 and time.time() - lastSeenInArea[idx] >= 60: | |
# negative time indicates that we've issued a 'missing' notice already | |
lastSeenInArea[idx] = -lastSeenInArea[idx] | |
print datetime.datetime.now(), "left the area: ", MAC | |
missing.append(MAC) | |
# if not seen for 5 minutes, consider gone | |
if inArea[idx] and time.time() - abs(lastSeenInArea[idx]) >= 5*60: | |
inArea[idx] = False | |
# lastSeenInArea[idx] = 0 | |
print datetime.datetime.now(), "left the area: ", MAC | |
gone.append(MAC) | |
if seen: | |
print "=== Seen: ",seen | |
with open("/media/ramdisk/ipc/seen_bluetooth", "w") as f: | |
f.writelines(seen) | |
if missing: | |
print "=== Missing: ",missing | |
with open("/media/ramdisk/ipc/missing_bluetooth", "w") as f: | |
f.writelines(missing) | |
if gone: | |
print "=== Has left: ",gone | |
with open("/media/ramdisk/ipc/depart_bluetooth", "w") as f: | |
f.writelines(gone) | |
time.sleep(1) # prevent rapid spinning if no MAC IDs |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment