Created
October 31, 2024 04:24
-
-
Save zengjie/f54f1c3e6be2c334219afc6cb25287bd to your computer and use it in GitHub Desktop.
Even Realities Smart Glasses Python Demo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from bleak import BleakScanner, BleakClient | |
from enum import IntEnum | |
import struct | |
import time | |
# Service and Characteristic UUIDs | |
UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E" | |
UART_TX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E" # Write | |
UART_RX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E" # Read/Notify | |
class Commands(IntEnum): | |
# System commands | |
BLE_REQ_INIT = 0x4D | |
BLE_REQ_HEARTBEAT = 0x25 | |
BLE_REQ_EVENAI = 0x4E | |
# Voice commands | |
BLE_REQ_TRANSFER_MIC_DATA = 0xF1 | |
BLE_REQ_DEVICE_ORDER = 0xF5 | |
# Display commands | |
BLE_REQ_NORMAL_TEXT = 0x30 | |
BLE_REQ_FINAL_TEXT = 0x40 | |
BLE_REQ_MANUAL_PAGE = 0x50 | |
BLE_REQ_ERROR_TEXT = 0x60 | |
class DeviceOrders(IntEnum): | |
DISPLAY_READY = 0x00 | |
DISPLAY_BUSY = 0x11 | |
DISPLAY_UPDATE = 0x0F | |
DISPLAY_COMPLETE = 0x09 | |
class DisplayStatus(IntEnum): | |
NORMAL_TEXT = 0x30 # Initial display | |
FINAL_TEXT = 0x40 # Final display after 3 seconds | |
MANUAL_PAGE = 0x50 # Manual page turning mode | |
ERROR_TEXT = 0x60 # Error display | |
class GlassesProtocol: | |
def __init__(self): | |
self._left_client = None | |
self._right_client = None | |
self._heartbeat_seq = 0 | |
self._evenai_seq = 0 | |
self._last_device_order = None | |
self._last_evenai_response = None | |
self._received_ack = False | |
async def scan_for_glasses(self): | |
"""Scan for left and right glasses devices""" | |
devices = await BleakScanner.discover() | |
left_device = None | |
right_device = None | |
for device in devices: | |
if device.name: | |
if "_L_" in device.name: | |
left_device = device | |
elif "_R_" in device.name: | |
right_device = device | |
return left_device, right_device | |
async def connect(self): | |
"""Connect to both left and right glasses""" | |
left_device, right_device = await self.scan_for_glasses() | |
if not left_device or not right_device: | |
raise Exception("Could not find both glasses devices") | |
self._left_client = BleakClient(left_device) | |
self._right_client = BleakClient(right_device) | |
await self._left_client.connect() | |
await self._right_client.connect() | |
# Initialize both devices | |
await self._send_init_command() | |
# Start notification handlers | |
await self._setup_notifications() | |
async def _send_init_command(self): | |
"""Send initialization command to both devices""" | |
init_data = bytes([Commands.BLE_REQ_INIT, 0x01]) | |
await self._left_client.write_gatt_char(UART_TX_CHAR_UUID, init_data) | |
await self._right_client.write_gatt_char(UART_TX_CHAR_UUID, init_data) | |
async def _setup_notifications(self): | |
"""Setup notification handlers for both devices""" | |
await self._left_client.start_notify(UART_RX_CHAR_UUID, self._notification_handler) | |
await self._right_client.start_notify(UART_RX_CHAR_UUID, self._notification_handler) | |
def _notification_handler(self, sender, data): | |
"""Handle incoming notifications from glasses""" | |
cmd = data[0] | |
if cmd == Commands.BLE_REQ_HEARTBEAT: | |
print(f"Received heartbeat response: {data.hex()}") | |
elif cmd == Commands.BLE_REQ_TRANSFER_MIC_DATA: | |
print(f"Received voice data: {data[1:].hex()}") | |
elif cmd == Commands.BLE_REQ_EVENAI: | |
print(f"Received EvenAI response: {data.hex()}") | |
# Check if it's a success response (0xC9) | |
if len(data) > 1 and data[1] == 0xC9: | |
self._received_ack = True | |
elif cmd == Commands.BLE_REQ_DEVICE_ORDER: | |
order = data[1] | |
self._last_device_order = order | |
print(f"Received device order: {hex(order)}") | |
if order == DeviceOrders.DISPLAY_COMPLETE: | |
self._received_ack = True | |
else: | |
print(f"Received unknown command {hex(cmd)}: {data.hex()}") | |
async def send_heartbeat(self): | |
"""Send heartbeat command to both devices""" | |
length = 6 | |
data = struct.pack( | |
"BBBBBB", | |
Commands.BLE_REQ_HEARTBEAT, | |
length & 0xFF, | |
(length >> 8) & 0xFF, | |
self._heartbeat_seq % 0xFF, | |
0x04, | |
self._heartbeat_seq % 0xFF | |
) | |
self._heartbeat_seq += 1 | |
await self._left_client.write_gatt_char(UART_TX_CHAR_UUID, data) | |
await self._right_client.write_gatt_char(UART_TX_CHAR_UUID, data) | |
async def send_text(self, text, new_screen=1, pos=0, current_page=1, max_pages=1): | |
""" | |
Send text to display on glasses with proper formatting and status transitions | |
""" | |
# Split text into lines and format according to screen size | |
lines = self._format_text_lines(text) | |
total_pages = (len(lines) + 4) // 5 # 5 lines per page | |
if len(lines) <= 3: | |
# Special formatting for short messages | |
display_text = "\n\n" + "\n".join(lines) | |
# Send initial display | |
success = await self._send_text_packet( | |
display_text, | |
new_screen=1, | |
status=DisplayStatus.NORMAL_TEXT, | |
current_page=1, | |
max_pages=1 | |
) | |
if not success: | |
return False | |
# Wait 3 seconds then send final display | |
await asyncio.sleep(3) | |
success = await self._send_text_packet( | |
display_text, | |
new_screen=1, | |
status=DisplayStatus.FINAL_TEXT, | |
current_page=1, | |
max_pages=1 | |
) | |
return success | |
elif len(lines) <= 5: | |
# Format for 4-5 lines | |
padding = "\n" if len(lines) == 4 else "" | |
display_text = padding + "\n".join(lines) | |
# Send initial display | |
success = await self._send_text_packet( | |
display_text, | |
new_screen=1, | |
status=DisplayStatus.NORMAL_TEXT, | |
current_page=1, | |
max_pages=1 | |
) | |
if not success: | |
return False | |
# Wait 3 seconds then send final display | |
await asyncio.sleep(3) | |
success = await self._send_text_packet( | |
display_text, | |
new_screen=1, | |
status=DisplayStatus.FINAL_TEXT, | |
current_page=1, | |
max_pages=1 | |
) | |
return success | |
else: | |
# Multi-page display | |
current_page = 1 | |
start_idx = 0 | |
while start_idx < len(lines): | |
page_lines = lines[start_idx:start_idx + 5] | |
display_text = "\n".join(page_lines) | |
is_last_page = start_idx + 5 >= len(lines) | |
status = DisplayStatus.FINAL_TEXT if is_last_page else DisplayStatus.NORMAL_TEXT | |
success = await self._send_text_packet( | |
display_text, | |
new_screen=1, | |
status=status, | |
current_page=current_page, | |
max_pages=total_pages | |
) | |
if not success: | |
return False | |
if not is_last_page: | |
await asyncio.sleep(5) # Wait 5 seconds between pages | |
start_idx += 5 | |
current_page += 1 | |
return True | |
async def _send_text_packet(self, text, new_screen, status, current_page, max_pages): | |
"""Send a single text packet with proper formatting""" | |
text_bytes = text.encode('utf-8') | |
max_chunk_size = 191 | |
chunks = [text_bytes[i:i + max_chunk_size] | |
for i in range(0, len(text_bytes), max_chunk_size)] | |
# Send to Left first | |
for i, chunk in enumerate(chunks): | |
self._received_ack = False | |
header = struct.pack( | |
">BBBBBBBB", | |
Commands.BLE_REQ_EVENAI, | |
self._evenai_seq & 0xFF, | |
len(chunks), | |
i, | |
status | new_screen, # Combine status and new_screen | |
0, # pos high byte | |
0, # pos low byte | |
current_page | |
) | |
packet = header + bytes([max_pages]) + chunk | |
await self._left_client.write_gatt_char(UART_TX_CHAR_UUID, packet) | |
if not await self._wait_for_display_complete(timeout=2.0): | |
return False | |
await asyncio.sleep(0.1) | |
# Send to Right after Left completes | |
for i, chunk in enumerate(chunks): | |
self._received_ack = False | |
header = struct.pack( | |
">BBBBBBBB", | |
Commands.BLE_REQ_EVENAI, | |
self._evenai_seq & 0xFF, | |
len(chunks), | |
i, | |
status | new_screen, | |
0, | |
0, | |
current_page | |
) | |
packet = header + bytes([max_pages]) + chunk | |
await self._right_client.write_gatt_char(UART_TX_CHAR_UUID, packet) | |
if not await self._wait_for_display_complete(timeout=2.0): | |
return False | |
await asyncio.sleep(0.1) | |
self._evenai_seq += 1 | |
return True | |
def _format_text_lines(self, text): | |
"""Format text into lines that fit the display""" | |
# Split text into paragraphs | |
paragraphs = [p.strip() for p in text.split('\n') if p.strip()] | |
lines = [] | |
for paragraph in paragraphs: | |
# Simple line wrapping at ~40 characters | |
# In real implementation, should use proper text measurement | |
while len(paragraph) > 40: | |
space_idx = paragraph.rfind(' ', 0, 40) | |
if space_idx == -1: | |
space_idx = 40 | |
lines.append(paragraph[:space_idx]) | |
paragraph = paragraph[space_idx:].strip() | |
if paragraph: | |
lines.append(paragraph) | |
return lines | |
async def _wait_for_display_complete(self, timeout=2.0): | |
"""Wait for display complete status""" | |
start_time = time.time() | |
while time.time() - start_time < timeout: | |
if self._received_ack: | |
return True | |
await asyncio.sleep(0.1) | |
return False | |
async def main(): | |
glasses = GlassesProtocol() | |
try: | |
print("Connecting to glasses...") | |
await glasses.connect() | |
print("Connected successfully!") | |
# Start heartbeat loop | |
while True: | |
await glasses.send_heartbeat() | |
print("Sent heartbeat") | |
# Test message with specific formatting | |
test_message = "Test message\nLine 1\nLine 2" | |
success = await glasses.send_text( | |
text=test_message, | |
new_screen=1, | |
pos=0, | |
current_page=1, | |
max_pages=1 | |
) | |
print(f"Send text {'succeeded' if success else 'failed'}") | |
# Wait 8 seconds before next heartbeat | |
await asyncio.sleep(8) | |
except Exception as e: | |
print(f"Error: {e}") | |
finally: | |
# Cleanup | |
if glasses._left_client: | |
await glasses._left_client.disconnect() | |
if glasses._right_client: | |
await glasses._right_client.disconnect() | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for sharing this!
I get this error when trying to connect:
Connecting to glasses...
Error: Could not get GATT services: Unreachable
Any idea how to solve it?