Skip to content

Instantly share code, notes, and snippets.

@N8python
Created November 1, 2024 01:07
Show Gist options
  • Save N8python/b31c84d04caca53e0de3361161646883 to your computer and use it in GitHub Desktop.
Save N8python/b31c84d04caca53e0de3361161646883 to your computer and use it in GitHub Desktop.
import os
import time
from pynput import keyboard
from datetime import datetime
import subprocess
import threading
import tkinter as tk
import queue
# ML imports
import mlx.core as mx
from mlx_vlm import load, generate
from mlx_vlm.prompt_utils import apply_chat_template
from mlx_vlm.utils import load_config, stream_generate
from mlx_lm import load as lm_load, generate as lm_generate, stream_generate as lm_stream_generate
# Create screenshots directory if it doesn't exist
SCREENSHOT_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'screenshots')
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
class StatusOverlay:
def __init__(self):
self.root = tk.Tk()
self.root.title("Status")
# Platform-specific window flags to ensure it stays on top of everything
if os.system == 'Darwin': # macOS
self.root.attributes('-alpha', 0.8, '-topmost', True, '-fullscreen', False)
self.root.attributes('-float', True) # Makes it float above all windows
try:
# Try to use PyObjC for additional window level control if available
from Foundation import NSAppleScript
script = '''
tell application "System Events"
set frontmost of every process to false
end tell
'''
NSAppleScript.alloc().initWithSource_(script).executeAndReturnError_(None)
except ImportError:
pass
else: # Linux/Windows
self.root.attributes('-alpha', 0.8, '-topmost', True, '-type', 'dock')
self.root.lift()
self.root.attributes('-topmost', True) # Double-ensure topmost
self.root.overrideredirect(True) # Remove window decorations
# Initialize processing timer
self.start_time = None
self.processing = False
# Create UI elements...
# Ensure the window stays on top periodically
def keep_on_top():
self.root.lift()
self.root.attributes('-topmost', True)
self.root.after(100, keep_on_top) # Check every 100ms
keep_on_top()
# Calculate position (top right corner with padding)
screen_width = self.root.winfo_screenwidth()
# Create main frame with rounded corners effect
self.main_frame = tk.Frame(
self.root,
bg='#1a1a1a', # Dark background
highlightbackground='#2a2a2a', # Border color
highlightthickness=2,
)
self.main_frame.pack(padx=2, pady=2, fill='both', expand=True)
# Status label with icon
self.status_frame = tk.Frame(self.main_frame, bg='#1a1a1a')
self.status_frame.pack(fill='x', padx=10, pady=(10,5))
self.status_icon = tk.Label(
self.status_frame,
text="●", # Dot icon
font=("Arial", 14),
bg='#1a1a1a',
fg='#4CAF50' # Green dot by default
)
self.status_icon.pack(side='left', padx=(0,5))
self.label = tk.Label(
self.status_frame,
text="Idle",
font=("SF Pro Display", 12, "bold"), # Modern font
bg='#1a1a1a',
fg='#ffffff',
)
self.label.pack(side='left')
# Timer label (right-aligned)
self.timer_label = tk.Label(
self.status_frame,
text="",
font=("SF Pro Display", 10),
bg='#1a1a1a',
fg='#888888',
)
self.timer_label.pack(side='right')
# Answer display (hidden by default)
self.answer_frame = tk.Frame(self.main_frame, bg='#1a1a1a')
self.answer_frame.pack(fill='x', padx=10, pady=(0,10), expand=True)
self.answer_label = tk.Label(
self.answer_frame,
text="",
font=("SF Pro Display", 11),
bg='#1a1a1a',
fg='#cccccc',
wraplength=200,
justify='left'
)
self.answer_label.place(x=0, y=0, anchor='nw') # Fixed position at top-left
# Position window in top right with 20px padding
self.root.geometry(f"250x0+{screen_width - 250}+20")
# Add subtle fade animation for status changes
self.fade_alpha = 0.0
self.root.update()
self.root.geometry(f"250x{int(self.root.winfo_reqheight() + 20)}")
# Queue for thread-safe updates
self.update_queue = queue.Queue()
self.check_queue()
def check_queue(self):
"""Check for status updates in the queue"""
try:
while True:
update = self.update_queue.get_nowait()
if isinstance(update, tuple): # It's a status+answer update
status, answer = update
self.update_display(status, answer)
else: # It's just a status update
self.update_display(update)
except queue.Empty:
pass
self.root.after(100, self.check_queue)
def update_display(self, status, answer=None):
"""Update the display with new status and optional answer"""
# Update status text and colors
self.label.config(text=status)
if status == "Idle":
self.status_icon.config(fg='#4CAF50') # Green
self.answer_label.config(text="")
self.answer_frame.pack_forget() # Hide answer frame
if self.processing:
# Calculate and display processing time
end_time = time.time()
processing_time = end_time - self.start_time
self.timer_label.config(text=f"{processing_time:.1f}s")
self.processing = False
elif "Error" in status:
self.status_icon.config(fg='#f44336') # Red
self.answer_frame.pack_forget() # Hide answer frame
self.processing = False
elif status == "Taking Screenshot":
self.status_icon.config(fg='#FFA726') # Orange
self.start_time = time.time()
self.processing = True
self.timer_label.config(text="")
elif status == "Speaking":
self.status_icon.config(fg='#2196F3') # Blue
if answer:
self.answer_label.config(text=answer)
self.answer_frame.pack(fill='x', padx=10, pady=(0,10), expand=True) # Show answer
else:
self.status_icon.config(fg='#FFA726') # Orange for processing
# Add fade effect
#self.fade_in()
# Update the update_answer method:
def update_answer(self, answer):
"""Update the answer display"""
self.answer_label.config(text=answer)
# Don't pack the frame again if it's already packed
if not self.answer_frame.winfo_ismapped():
self.answer_frame.pack(fill='x', padx=10, pady=(0,10), expand=True)
self.root.update_idletasks()
# Calculate new height based on text
text_height = self.answer_label.winfo_reqheight()
self.answer_frame.configure(height=text_height + 20) # Add padding
screen_height_limit = self.root.winfo_screenheight() - 100
self.root.geometry(f"250x{min(int(self.root.winfo_reqheight()), screen_height_limit)}")
def fade_in(self):
"""Create a subtle fade-in effect"""
self.fade_alpha = 0.7
self.root.attributes('-alpha', self.fade_alpha)
self.fade_step()
def fade_step(self):
"""Gradually increase opacity"""
if self.fade_alpha < 0.9:
self.fade_alpha += 0.1
self.root.attributes('-alpha', self.fade_alpha)
self.root.after(20, self.fade_step)
def update_status(self, status, answer=None):
"""Thread-safe method to update status and answer"""
self.update_queue.put((status, answer) if answer else status)
def start(self):
"""Start the overlay"""
self.root.mainloop()
class ScreenshotAnalyzer:
def __init__(self):
self.status_overlay = StatusOverlay()
self.processing_thread = None
print("Loading Qwen2-VL model...")
self.status_overlay.update_status("Loading Models")
model_path = "mlx-community/pixtral-12b-4bit"
self.model, self.processor = load(model_path)
#self.processor = AutoProcessor.from_pretrained(
#"Qwen/Qwen2-VL-7B-Instruct",
#min_pixels=256*28*28,
# max_pixels=512*28*28
#)
#self.processor.detokenizer = self.p.detokenizer
self.config = load_config(model_path)
self.answer_model, self.answer_tokenizer = lm_load("mlx-community/Qwen2.5-Math-7B-Instruct-4bit")
self.summarizer_model, self.summarizer_tokenizer = lm_load("mlx-community/Llama-3.2-3B-Instruct-4bit")
print("Model loaded successfully!")
self.status_overlay.update_status("Idle")
def analyze_image(self, image_path):
"""Analyze the image using Qwen2-VL model"""
self.status_overlay.update_status("Analyzing Image")
prompt = "Extract all text, problem statements, and math notation from the image, and format the result in LaTeX. If there is a problem statement, you *must* extract it. Do NOT attempt to solve the problem. If the image doesn't contain anything math-related, output the latex \skip."
formatted_prompt = apply_chat_template(
self.processor,
self.config,
prompt,
num_images=1
)
output = ""
for token in stream_generate(
self.model,
self.processor,
[image_path],
formatted_prompt,
verbose=False,
max_tokens=4096,
temp=0.0
):
output += token
self.status_overlay.update_answer(output)
#self.status_overlay.update_answer(output)
return output
def solve_problem(self, problem_text):
self.status_overlay.update_status("Solving Problem")
prompt = f"Solve the following problem - ensure you simplify your answer as much as possible. Think step by step to get to your conclusion. The problem is below: \n{problem_text}"
messages = [{"role": "user", "content": prompt}]
prompt = self.answer_tokenizer.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
output = ""
for token in lm_stream_generate(
self.answer_model,
self.answer_tokenizer,
prompt,
max_tokens=4096,
temp=0.0,
min_p=0.05,
repetition_penalty=1.1
):
output += token
self.status_overlay.update_answer(output)
print(output)
self.status_overlay.update_status("Summarizing")
summarize_prompt = f"Below is a solved math problem - write out the answer presented without any fancy formatting: \n{output}"
formatted_summarize_prompt = self.summarizer_tokenizer.apply_chat_template(
[
{"role": "system", "content": "You are an AI that summarizes solutions. When doing so, you write negative numbers as 'negative 2' instead of '-2'." },
{"role": "user", "content": summarize_prompt},
], tokenize=False, add_generation_prompt=True
)
output = ""
for token in lm_stream_generate(
self.summarizer_model,
self.summarizer_tokenizer,
formatted_summarize_prompt,
max_tokens=4096,
temp=0.0,
min_p=0.05
):
output += token
self.status_overlay.update_answer(output)
return output
def process_screenshot(self, filepath):
"""Process the screenshot in a separate thread"""
try:
analysis = self.analyze_image(filepath)
if "\skip" in analysis:
self.status_overlay.update_status("No Math Found")
time.sleep(2)
else:
print("\n=== Problem ===")
print(analysis)
solution = self.solve_problem(analysis)
print("\n=== AI Solution ===")
print(solution)
self.status_overlay.update_status("Speaking", solution)
# Create a pipeline for direct audio streaming
process1 = subprocess.Popen(
['echo', solution],
stdout=subprocess.PIPE
)
process2 = subprocess.Popen(
['piper', '-m', 'en_US-lessac-high', '--output-raw'],
stdin=process1.stdout,
stdout=subprocess.PIPE
)
process3 = subprocess.Popen(
['play', '-r', '22050', '-b', '16', '-e', 'signed', '-t', 'raw', '-'],
stdin=process2.stdout
)
# Close the unused pipes
process1.stdout.close()
process2.stdout.close()
# Wait for the audio to finish
process3.wait()
except Exception as e:
print(f"Error analyzing screenshot: {str(e)}")
self.status_overlay.update_status(f"Error: {str(e)[:20]}...")
time.sleep(3)
finally:
self.status_overlay.update_status("Idle")
def on_activate(self):
"""Callback function when the hotkey is pressed"""
if self.processing_thread and self.processing_thread.is_alive():
print("Still processing previous screenshot...")
return
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'screenshot_{timestamp}.png'
filepath = os.path.join(SCREENSHOT_DIR, filename)
self.status_overlay.update_status("Taking Screenshot")
subprocess.run(['screencapture', '-i', '-x', filepath])
if os.path.exists(filepath):
print(f'\nScreenshot saved to: {filepath}')
self.processing_thread = threading.Thread(
target=self.process_screenshot,
args=(filepath,),
daemon=True
)
self.processing_thread.start()
def main():
analyzer = ScreenshotAnalyzer()
# Create the hotkey listener
hotkey = keyboard.HotKey(
keyboard.HotKey.parse('<ctrl>+m'),
analyzer.on_activate
)
def for_canonical(f):
return lambda k: f(listener.canonical(k))
# Start listening for the hotkey in a separate thread
listener = keyboard.Listener(
on_press=for_canonical(hotkey.press),
on_release=for_canonical(hotkey.release)
)
listener.start()
print('Screenshot analysis utility started!')
print(f'Press Ctrl+M to take a screenshot. Files will be saved to: {SCREENSHOT_DIR}')
print('Each screenshot will be automatically analyzed by AI')
print('Press Ctrl+C to exit.')
# Start the overlay (this will block in the main thread)
analyzer.status_overlay.start()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment