Skip to content

Instantly share code, notes, and snippets.

@victorypoint
Last active November 20, 2024 18:19
Show Gist options
  • Save victorypoint/69b47099f177ad7a9cae819908b24ea7 to your computer and use it in GitHub Desktop.
Save victorypoint/69b47099f177ad7a9cae819908b24ea7 to your computer and use it in GitHub Desktop.
Retrieve iFit2 Workout Metrics From Screen via OCR
# iFit2-Metrics-OCR - 'Retrieve iFit2 Workout Metrics via OCR
# Author: Al Udell
# Revised: November 20, 2024
# --- functions ---
# Validate RPM values function
def validate_rpm(value):
try:
if '/' in value:
value = value.split('/')[0]
return value
except (ValueError, TypeError):
return value
# --- main ---
import subprocess
from PIL import Image
import requests
import io
import os
import socket
from datetime import datetime
from io import BytesIO
import gzip
# File paths
ocrlogFile = 'ocr-logfile.txt'
# Setup UDP socket for broadcast
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
broadcast_address = ('255.255.255.255', 8002)
# Define equipment and OCR slot metrics
equipment = "bike"
slot_labels = {
"bike": ["kilometers", "elapsed", "rpm", "watts", "calories", "incline", "resistance"],
"treadmill": ["kilometers", "elapsed", "pace/km", "calories", "vertical gain", "incline", "speed"],
}[equipment]
# Define crop coordinates
crop_coordinates = [
(420, 30, 590, 68), # Slot 1
(675, 30, 845, 68), # Slot 2
(925, 30, 1095, 68), # Slot 3
(1180, 30, 1350, 68), # Slot 4
(1435, 30, 1605, 68), # Slot 5
(560, 1015, 645, 1048), # Slot 6
(1270, 1015, 1355, 1048), # Slot 7
]
new_height = 100 # Target height for resized images
try:
# Loop to capture, process, and OCR the screenshot repeatedly
while True:
# --- Screenshot Section ---
# Run adb command to capture and compress screenshot
# Use subprocess.Popen and exec-out for streaming output, suppress errors for speed, highest gzip compression level -9
process = subprocess.Popen(["adb", "exec-out", "screencap -p | gzip -9"], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
# Read the output directly as it streams
compressed_data = process.stdout.read()
process.wait()
# Decompress the gzip data
decompressed_data = gzip.decompress(compressed_data)
# Load the screenshot into memory
image_data = BytesIO(decompressed_data)
image = Image.open(image_data)
# --- Image Processing Section ---
slot_values = []
for i, crop in enumerate(crop_coordinates):
# Crop and resize the image
slot_image = image.crop(crop)
height_percent = new_height / float(slot_image.height)
new_width = int((float(slot_image.width) * height_percent))
resized_image = slot_image.resize((new_width, new_height), Image.LANCZOS)
# Perform OCR directly on the resized image in memory
with io.BytesIO() as buffer:
resized_image.save(buffer, format="PNG")
buffer.seek(0)
image_data = buffer.read()
ocr_response = requests.post(
"http://localhost:32168/v1/vision/ocr", files={"image": image_data}
).json()
# Extract and validate OCR result
labels = [prediction['label'] for prediction in ocr_response.get('predictions', [])]
value = ''.join(labels)
# Validate RPM specifically for slot 3
if i == 2: # Slot 3 is for RPM
value = validate_rpm(value)
if value is None:
value = "Invalid RPM"
slot_values.append(value)
# --- Output Section ---
message = " | ".join(f"{label}: {value}" for label, value in zip(slot_labels, slot_values))
# Print results
os.system('cls' if os.name == 'nt' else 'clear')
print(f"\r{message}\n", end="")
# Write OCR text to log file
dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(ocrlogFile, "a") as file:
file.write(f"{dt}, {message}\n")
# Encode and send UDP messages
if equipment == "bike":
udp_messages = [
f"Changed RPM {slot_values[2]}",
f"Changed Watts {slot_values[3]}",
f"Changed Grade {slot_values[5]}",
f"Changed Resistance {slot_values[6]}",
]
elif equipment == "treadmill":
udp_messages = [
f"Changed Grade {slot_values[5]}",
f"Changed KPH {slot_values[6]}",
]
for udp_message in udp_messages:
sock.sendto(udp_message.encode('utf-8'), broadcast_address)
# Wait before the next iteration
# time.sleep(5)
except KeyboardInterrupt:
print("\nLoop interrupted.")
finally:
sock.close()
subprocess.run(["adb", "disconnect"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment