Created
April 11, 2023 15:20
-
-
Save MarlonJD/97f5c83208572a55ad2d0c7ca73a7dac to your computer and use it in GitHub Desktop.
last working micropython
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import aioble | |
import uasyncio as asyncio | |
import bluetooth | |
from struct import unpack | |
from binascii import hexlify | |
from micropython import const | |
import sys | |
import network | |
import time | |
import urequests | |
# Beacon advertisement codes | |
_IBEACON_UUID = bluetooth.UUID(0x004C) | |
_EDDYSTONE_UUID = bluetooth.UUID(0xFEAA) | |
# ADV data frame Frame type for complete local name | |
_ADV_TYPE_COMPLETE_NAME = const(0x09) | |
def _unique_id(): | |
if sys.platform == "linux": | |
return b"LINUX" | |
elif sys.platform == "esp32": | |
import bluetooth | |
ble = bluetooth.BLE() | |
ble.active(True) | |
id = ble.config("mac")[1][4:] | |
ble.active(False) | |
return hexlify(id).upper() | |
elif sys.platform.endswith("Py"): # PyCom | |
import machine | |
return hexlify(machine.unique_id()[4:]).upper() | |
class uBeaconDecorators: | |
@classmethod | |
def remove_adv_header(cls, decorated): | |
"""Decorator to remove the ADV data flags header if any""" | |
def inner(cls, adv_data): | |
if adv_data[:2] == bytes([0x02, 0x01]): | |
adv_data = adv_data[3:] | |
decorated(cls, adv_data) | |
return inner | |
class Beacon: | |
# Use the Wifi MAC address to get a 2-byte unique id | |
name = b"uBeacon " + _unique_id() | |
def __str__(self): | |
adv = self.adv_bytes | |
return "Bytes: {:d} data: {:s}".format(len(adv), hexlify(adv)) | |
def __repr__(self): | |
return "{}({!r})".format(self.__class__.__name__, self.__dict__) | |
@property | |
def adv_bytes(self): | |
return bytes(self.adv) | |
@property | |
def resp(self): | |
return [ | |
const(0x02), | |
const(0x01), | |
const(0x06), | |
len(self.name) + 1, | |
const(0x09), | |
] + [x for x in self.name] | |
@property | |
def resp_bytes(self): | |
return bytes(self.resp) | |
def decode(self, adv_data): | |
raise NotImplementedError("No decode method in child class implemented") | |
@staticmethod | |
def validate(value, size: int) -> bytes: | |
value_bytes = b"" | |
if isinstance(value, bytes): | |
value_bytes = value | |
if len(value_bytes) != size: | |
raise ValueError("Value has to be {}-bytes long".format(size)) | |
elif isinstance(value, int): | |
value_bytes = value.to_bytes(size, "big") | |
else: | |
raise ValueError("Value has to be int or bytes") | |
return value_bytes | |
# The beacon device manufacturer's company identifier code. | |
_COMPANY_ID = bytes([0x4C, 0x00]) | |
# iBeacon advertisement code | |
_DEVICE_TYPE = bytes([0x02, 0x15]) | |
# Length of the data frame from the manufacturer specific ADV data structure. | |
_ADV_LENGHT = const(0x1A) | |
# Value representing the average received signal strength at 1m from the advertiser | |
_REFERENCE_RSSI = const(-70) | |
class iBeacon(Beacon): | |
def __init__( | |
self, | |
uuid=None, # 16-bytes | |
major=None, # 0 - 65535 | |
minor=None, # 0 - 65535 | |
reference_rssi=_REFERENCE_RSSI, # 1-byte | |
*, | |
adv_data=None | |
): | |
if adv_data: | |
self.decode(adv_data) | |
elif uuid and major and minor: | |
self.uuid = uuid | |
self.major = major | |
self.minor = minor | |
self.reference_rssi = reference_rssi | |
else: | |
raise ValueError("Could not initialize beacon") | |
@property | |
def adv(self): | |
return ( | |
[ | |
const(0x02), | |
const(0x01), | |
const(0x06), | |
_ADV_LENGHT, | |
const(0xFF), | |
] | |
+ [x for x in _COMPANY_ID] | |
+ [x for x in _DEVICE_TYPE] | |
+ [x for x in self.validate(self.uuid, 16)] | |
+ [x for x in self.validate(self.major, 2)] | |
+ [x for x in self.validate(self.minor, 2)] | |
+ [ | |
self.validate(self.reference_rssi, 1)[0], | |
] | |
) | |
@uBeaconDecorators.remove_adv_header | |
def decode(self, adv_data): | |
self.uuid = hexlify(adv_data[6:22]).decode() | |
self.major = unpack("!H", adv_data[22:24])[0] | |
self.minor = unpack("!H", adv_data[24:26])[0] | |
self.reference_rssi = unpack("!b", bytes([adv_data[26]]))[0] | |
_URL = "http://trigger-room-load-balancer-626692971.eu-central-1.elb.amazonaws.com/create" | |
async def scan(): | |
while True: | |
print("start scan") | |
foundDevices = [] | |
async with aioble.scan( | |
5000, interval_us=16000, window_us=16000 | |
) as scanner: | |
async for result in scanner: | |
beacon = None | |
if result.adv_data and result.rssi > -90: | |
if _EDDYSTONE_UUID in result.services(): | |
frame_type = result.adv_data[8] | |
else: | |
for mfg_id, mfg_data in result.manufacturer(): | |
uuid = bluetooth.UUID(mfg_id) | |
if uuid == _IBEACON_UUID and len(result.adv_data) > 24: | |
beacon = iBeacon(adv_data=result.adv_data) | |
if beacon and beacon.major == 1 and beacon.minor == 33: | |
if (beacon.uuid not in foundDevices): | |
print("MAC: {} Type: {!r} RSSI: {}".format(hexlify(result.device.addr), beacon, result.rssi)) | |
foundDevices.append(beacon.uuid) | |
data = {"name": "Hello Malatya","code": "A1B1", "employee": beacon.uuid} | |
response = urequests.post(_URL, json=data) | |
print(response.text) | |
if __name__ == "__main__": | |
print("Connecting wifi") | |
sta_if = network.WLAN(network.STA_IF) | |
if not sta_if.isconnected(): | |
print('connecting to network...') | |
sta_if.active(True) | |
sta_if.connect("Turksat_Kablonet_z99N_2.4", "uMn24P5q") | |
while not sta_if.isconnected(): | |
pass | |
if sta_if.isconnected(): | |
print('network config:', sta_if.ifconfig()) | |
asyncio.run(scan()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment