Skip to content

Instantly share code, notes, and snippets.

@zengjie
Created October 31, 2024 04:24
Show Gist options
  • Save zengjie/f54f1c3e6be2c334219afc6cb25287bd to your computer and use it in GitHub Desktop.
Save zengjie/f54f1c3e6be2c334219afc6cb25287bd to your computer and use it in GitHub Desktop.
Even Realities Smart Glasses Python Demo
import asyncio
from bleak import BleakScanner, BleakClient
from enum import IntEnum
import struct
import time
# Service and Characteristic UUIDs
UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
UART_TX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E" # Write
UART_RX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E" # Read/Notify
class Commands(IntEnum):
# System commands
BLE_REQ_INIT = 0x4D
BLE_REQ_HEARTBEAT = 0x25
BLE_REQ_EVENAI = 0x4E
# Voice commands
BLE_REQ_TRANSFER_MIC_DATA = 0xF1
BLE_REQ_DEVICE_ORDER = 0xF5
# Display commands
BLE_REQ_NORMAL_TEXT = 0x30
BLE_REQ_FINAL_TEXT = 0x40
BLE_REQ_MANUAL_PAGE = 0x50
BLE_REQ_ERROR_TEXT = 0x60
class DeviceOrders(IntEnum):
DISPLAY_READY = 0x00
DISPLAY_BUSY = 0x11
DISPLAY_UPDATE = 0x0F
DISPLAY_COMPLETE = 0x09
class DisplayStatus(IntEnum):
NORMAL_TEXT = 0x30 # Initial display
FINAL_TEXT = 0x40 # Final display after 3 seconds
MANUAL_PAGE = 0x50 # Manual page turning mode
ERROR_TEXT = 0x60 # Error display
class GlassesProtocol:
def __init__(self):
self._left_client = None
self._right_client = None
self._heartbeat_seq = 0
self._evenai_seq = 0
self._last_device_order = None
self._last_evenai_response = None
self._received_ack = False
async def scan_for_glasses(self):
"""Scan for left and right glasses devices"""
devices = await BleakScanner.discover()
left_device = None
right_device = None
for device in devices:
if device.name:
if "_L_" in device.name:
left_device = device
elif "_R_" in device.name:
right_device = device
return left_device, right_device
async def connect(self):
"""Connect to both left and right glasses"""
left_device, right_device = await self.scan_for_glasses()
if not left_device or not right_device:
raise Exception("Could not find both glasses devices")
self._left_client = BleakClient(left_device)
self._right_client = BleakClient(right_device)
await self._left_client.connect()
await self._right_client.connect()
# Initialize both devices
await self._send_init_command()
# Start notification handlers
await self._setup_notifications()
async def _send_init_command(self):
"""Send initialization command to both devices"""
init_data = bytes([Commands.BLE_REQ_INIT, 0x01])
await self._left_client.write_gatt_char(UART_TX_CHAR_UUID, init_data)
await self._right_client.write_gatt_char(UART_TX_CHAR_UUID, init_data)
async def _setup_notifications(self):
"""Setup notification handlers for both devices"""
await self._left_client.start_notify(UART_RX_CHAR_UUID, self._notification_handler)
await self._right_client.start_notify(UART_RX_CHAR_UUID, self._notification_handler)
def _notification_handler(self, sender, data):
"""Handle incoming notifications from glasses"""
cmd = data[0]
if cmd == Commands.BLE_REQ_HEARTBEAT:
print(f"Received heartbeat response: {data.hex()}")
elif cmd == Commands.BLE_REQ_TRANSFER_MIC_DATA:
print(f"Received voice data: {data[1:].hex()}")
elif cmd == Commands.BLE_REQ_EVENAI:
print(f"Received EvenAI response: {data.hex()}")
# Check if it's a success response (0xC9)
if len(data) > 1 and data[1] == 0xC9:
self._received_ack = True
elif cmd == Commands.BLE_REQ_DEVICE_ORDER:
order = data[1]
self._last_device_order = order
print(f"Received device order: {hex(order)}")
if order == DeviceOrders.DISPLAY_COMPLETE:
self._received_ack = True
else:
print(f"Received unknown command {hex(cmd)}: {data.hex()}")
async def send_heartbeat(self):
"""Send heartbeat command to both devices"""
length = 6
data = struct.pack(
"BBBBBB",
Commands.BLE_REQ_HEARTBEAT,
length & 0xFF,
(length >> 8) & 0xFF,
self._heartbeat_seq % 0xFF,
0x04,
self._heartbeat_seq % 0xFF
)
self._heartbeat_seq += 1
await self._left_client.write_gatt_char(UART_TX_CHAR_UUID, data)
await self._right_client.write_gatt_char(UART_TX_CHAR_UUID, data)
async def send_text(self, text, new_screen=1, pos=0, current_page=1, max_pages=1):
"""
Send text to display on glasses with proper formatting and status transitions
"""
# Split text into lines and format according to screen size
lines = self._format_text_lines(text)
total_pages = (len(lines) + 4) // 5 # 5 lines per page
if len(lines) <= 3:
# Special formatting for short messages
display_text = "\n\n" + "\n".join(lines)
# Send initial display
success = await self._send_text_packet(
display_text,
new_screen=1,
status=DisplayStatus.NORMAL_TEXT,
current_page=1,
max_pages=1
)
if not success:
return False
# Wait 3 seconds then send final display
await asyncio.sleep(3)
success = await self._send_text_packet(
display_text,
new_screen=1,
status=DisplayStatus.FINAL_TEXT,
current_page=1,
max_pages=1
)
return success
elif len(lines) <= 5:
# Format for 4-5 lines
padding = "\n" if len(lines) == 4 else ""
display_text = padding + "\n".join(lines)
# Send initial display
success = await self._send_text_packet(
display_text,
new_screen=1,
status=DisplayStatus.NORMAL_TEXT,
current_page=1,
max_pages=1
)
if not success:
return False
# Wait 3 seconds then send final display
await asyncio.sleep(3)
success = await self._send_text_packet(
display_text,
new_screen=1,
status=DisplayStatus.FINAL_TEXT,
current_page=1,
max_pages=1
)
return success
else:
# Multi-page display
current_page = 1
start_idx = 0
while start_idx < len(lines):
page_lines = lines[start_idx:start_idx + 5]
display_text = "\n".join(page_lines)
is_last_page = start_idx + 5 >= len(lines)
status = DisplayStatus.FINAL_TEXT if is_last_page else DisplayStatus.NORMAL_TEXT
success = await self._send_text_packet(
display_text,
new_screen=1,
status=status,
current_page=current_page,
max_pages=total_pages
)
if not success:
return False
if not is_last_page:
await asyncio.sleep(5) # Wait 5 seconds between pages
start_idx += 5
current_page += 1
return True
async def _send_text_packet(self, text, new_screen, status, current_page, max_pages):
"""Send a single text packet with proper formatting"""
text_bytes = text.encode('utf-8')
max_chunk_size = 191
chunks = [text_bytes[i:i + max_chunk_size]
for i in range(0, len(text_bytes), max_chunk_size)]
# Send to Left first
for i, chunk in enumerate(chunks):
self._received_ack = False
header = struct.pack(
">BBBBBBBB",
Commands.BLE_REQ_EVENAI,
self._evenai_seq & 0xFF,
len(chunks),
i,
status | new_screen, # Combine status and new_screen
0, # pos high byte
0, # pos low byte
current_page
)
packet = header + bytes([max_pages]) + chunk
await self._left_client.write_gatt_char(UART_TX_CHAR_UUID, packet)
if not await self._wait_for_display_complete(timeout=2.0):
return False
await asyncio.sleep(0.1)
# Send to Right after Left completes
for i, chunk in enumerate(chunks):
self._received_ack = False
header = struct.pack(
">BBBBBBBB",
Commands.BLE_REQ_EVENAI,
self._evenai_seq & 0xFF,
len(chunks),
i,
status | new_screen,
0,
0,
current_page
)
packet = header + bytes([max_pages]) + chunk
await self._right_client.write_gatt_char(UART_TX_CHAR_UUID, packet)
if not await self._wait_for_display_complete(timeout=2.0):
return False
await asyncio.sleep(0.1)
self._evenai_seq += 1
return True
def _format_text_lines(self, text):
"""Format text into lines that fit the display"""
# Split text into paragraphs
paragraphs = [p.strip() for p in text.split('\n') if p.strip()]
lines = []
for paragraph in paragraphs:
# Simple line wrapping at ~40 characters
# In real implementation, should use proper text measurement
while len(paragraph) > 40:
space_idx = paragraph.rfind(' ', 0, 40)
if space_idx == -1:
space_idx = 40
lines.append(paragraph[:space_idx])
paragraph = paragraph[space_idx:].strip()
if paragraph:
lines.append(paragraph)
return lines
async def _wait_for_display_complete(self, timeout=2.0):
"""Wait for display complete status"""
start_time = time.time()
while time.time() - start_time < timeout:
if self._received_ack:
return True
await asyncio.sleep(0.1)
return False
async def main():
glasses = GlassesProtocol()
try:
print("Connecting to glasses...")
await glasses.connect()
print("Connected successfully!")
# Start heartbeat loop
while True:
await glasses.send_heartbeat()
print("Sent heartbeat")
# Test message with specific formatting
test_message = "Test message\nLine 1\nLine 2"
success = await glasses.send_text(
text=test_message,
new_screen=1,
pos=0,
current_page=1,
max_pages=1
)
print(f"Send text {'succeeded' if success else 'failed'}")
# Wait 8 seconds before next heartbeat
await asyncio.sleep(8)
except Exception as e:
print(f"Error: {e}")
finally:
# Cleanup
if glasses._left_client:
await glasses._left_client.disconnect()
if glasses._right_client:
await glasses._right_client.disconnect()
if __name__ == "__main__":
asyncio.run(main())
@droidXrobot
Copy link

Thanks for sharing this!

I get this error when trying to connect:

Connecting to glasses...
Error: Could not get GATT services: Unreachable

Any idea how to solve it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment