Skip to content

Instantly share code, notes, and snippets.

@elcritch
Last active September 22, 2025 23:41
Show Gist options
  • Save elcritch/3ef18bbd1b718b9d6bc26774a870b1b8 to your computer and use it in GitHub Desktop.
Save elcritch/3ef18bbd1b718b9d6bc26774a870b1b8 to your computer and use it in GitHub Desktop.
Simple Pyauto Rest API
import pyautogui
import subprocess
import time
import logging
from datetime import datetime
# Create logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.ERROR)
# Create formatters
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# # Create file handler
# file_handler = logging.FileHandler('app.log')
# file_handler.setLevel(logging.INFO)
# file_handler.setFormatter(formatter)
# Create console handler
# console_handler = logging.StreamHandler()
# console_handler.setLevel(logging.DEBUG)
# console_handler.setFormatter(formatter)
# Add handlers to logger
# logger.addHandler(file_handler)
# logger.addHandler(console_handler)
def launch_trinamix_app():
"""Launch the trinamiX Spectroscopy UWP app"""
try:
# Method 1: Using subprocess.run (recommended)
result = subprocess.run([
"explorer.exe",
"shell:AppsFolder\\trinamiXGmbH.trinamiXSpectroscopy_fsz33ynyg1jg0!App"
], capture_output=True, text=True)
if result.returncode == 0:
logging.info("trinamiX app launched successfully")
return True
else:
logging.error(f"Failed to launch app. Return code: {result.returncode}")
logging.error(f"Error output: {result.stderr}")
return False
except Exception as e:
logging.error(f"Exception launching app: {str(e)}")
return False
def setup_automation():
try:
# Log script start
logging.info("Script started - waiting 60 seconds before beginning messages")
print("Script started - waiting 60 seconds before beginning messages")
# Disable pyautogui failsafe to prevent accidental termination
pyautogui.FAILSAFE = False
# Wait 60 seconds after startup before beginning
# time.sleep(6)
logging.info("Starting message display loop")
if launch_trinamix_app():
logging.info("Initial app launch successful")
else:
logging.error("Initial app launch failed, trying alternative method")
except KeyboardInterrupt:
logging.info("Script stopped by user")
except Exception as e:
logging.error(f"Fatal error: {str(e)}")
# if __name__ == "__main__":
# setup_automation()
import asyncio
import json
import logging
from datetime import datetime
from typing import List, Optional
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import uvicorn
import pyautogui
from io import BytesIO
import os
# Import our local automation module
import pyauto
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
pyauto.setup_automation()
# Data models
class Message(BaseModel):
content: str
type: str = "message"
class BroadcastMessage(BaseModel):
message: str
type: str = "broadcast"
class PingMessage(BaseModel):
type: str = "ping"
class ClickRequest(BaseModel):
x: int
y: int
clicks: int = 1
button: str = "left" # left, right, middle
interval: Optional[float] = None
class FindImageRequest(BaseModel):
image_path: str
confidence: Optional[float] = 0.8 # Confidence level for image matching
return_center: bool = True # Return center coordinates instead of top-left
class EnterTextRequest(BaseModel):
text: str
interval: Optional[float] = None # Seconds between keystrokes
presses: Optional[List[str]] = None # Optional sequence of keys to press after typing
# Global state for broadcasting
broadcast_queue: asyncio.Queue = asyncio.Queue()
polling_clients: List[asyncio.Queue] = []
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifespan context manager for FastAPI"""
logger.info("Starting simple REST server...")
try:
yield
finally:
logger.info("Shutting down simple REST server...")
app = FastAPI(
title="Simple Automation Server REST API",
description="Simplified HTTP REST API with broadcasting to all polling clients",
version="1.0.0",
lifespan=lifespan
)
@app.post("/api/message")
async def send_message(message: Message):
"""Send a message and get echo response"""
logger.info(f"Received message: {message.content}")
# Echo the message back
response = {
"type": "echo",
"original_message": message.content,
"timestamp": datetime.now().isoformat()
}
return response
@app.post("/api/ping")
async def ping():
"""Handle ping request"""
logger.info("Received ping")
# Send pong response
response = {
"type": "pong",
"timestamp": datetime.now().isoformat()
}
return response
@app.post("/api/broadcast")
async def broadcast_message(broadcast_msg: BroadcastMessage):
"""Broadcast a message to all polling clients"""
logger.info(f"Broadcasting message: {broadcast_msg.message}")
# Create broadcast message
broadcast_data = {
"type": "broadcast",
"message": broadcast_msg.message,
"timestamp": datetime.now().isoformat()
}
# Send to all polling clients
clients_notified = 0
for client_queue in polling_clients.copy(): # Use copy to avoid modification during iteration
try:
await client_queue.put(broadcast_data)
clients_notified += 1
except Exception as e:
logger.error(f"Failed to send broadcast to client: {e}")
# Remove broken client queue
try:
polling_clients.remove(client_queue)
except ValueError:
pass
logger.info(f"Broadcast sent to {clients_notified} polling clients")
# Return confirmation
return {
"type": "broadcast_sent",
"message": f"Message broadcasted to {clients_notified} clients",
"timestamp": datetime.now().isoformat()
}
@app.post("/api/enter_text")
async def enter_text(req: EnterTextRequest):
"""Type text using the keyboard with optional interval and post-typing key presses"""
logger.info(
(
"Entering text of length %d with interval=%s, presses=%s"
% (len(req.text), str(req.interval), str(req.presses))
)
)
try:
kwargs = {}
if req.interval is not None:
kwargs["interval"] = req.interval
pyautogui.write(req.text, **kwargs)
# Execute any requested key presses after typing
if req.presses:
for key in req.presses:
try:
pyautogui.press(key)
except Exception as e:
logger.error(f"Failed to press key '{key}': {e}")
raise
return {
"type": "enter_text_success",
"message": "Text entered successfully",
"chars": len(req.text),
"presses": req.presses or [],
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
logger.error(f"Error entering text: {e}")
raise HTTPException(status_code=500, detail=f"Failed to enter text: {str(e)}")
@app.post("/api/click")
async def click_gui(click_request: ClickRequest):
"""Perform a GUI click at specified coordinates"""
logger.info(f"Performing GUI click at ({click_request.x}, {click_request.y}) with {click_request.clicks} clicks")
try:
# Perform the click using pyautogui
click_kwargs = {
"x": click_request.x,
"y": click_request.y,
"clicks": click_request.clicks,
"button": click_request.button
}
# Only add interval if it's not None
if click_request.interval is not None:
click_kwargs["interval"] = click_request.interval
pyautogui.click(**click_kwargs)
response = {
"type": "click_success",
"message": f"Successfully clicked at ({click_request.x}, {click_request.y})",
"coordinates": {"x": click_request.x, "y": click_request.y},
"clicks": click_request.clicks,
"button": click_request.button,
"timestamp": datetime.now().isoformat()
}
logger.info(f"GUI click successful: {response}")
return response
except Exception as e:
logger.error(f"Error performing GUI click: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to perform GUI click: {str(e)}"
)
@app.post("/api/find_image")
async def find_image(find_request: FindImageRequest):
"""Find an image on the screen and return its coordinates"""
logger.info(f"Searching for image: {find_request.image_path}")
try:
image_path = os.path.join(os.path.dirname(__file__), find_request.image_path)
# Check if image file exists
if not os.path.exists(image_path):
logger.error(f"Image file not found: {image_path}")
raise HTTPException(
status_code=404,
detail=f"Image file not found: {image_path}"
)
# Try to find the image on screen
try:
if find_request.return_center:
# Find center coordinates
location = pyautogui.locateCenterOnScreen(image_path, confidence=find_request.confidence)
else:
# Find top-left coordinates
location = pyautogui.locateOnScreen(image_path, confidence=find_request.confidence)
except pyautogui.ImageNotFoundException:
location = None
if location is None:
# Image not found
response = {
"type": "image_not_found",
"message": f"Image not found on screen: {image_path}",
"image_path": image_path,
"confidence": find_request.confidence,
"timestamp": datetime.now().isoformat()
}
logger.info(f"Image not found: {image_path}")
return response
else:
# Image found
if find_request.return_center:
# location is a Point-like object with x, y coordinates
coordinates = {"x": int(location[0]), "y": int(location[1])}
else:
# location is a Box-like object with left, top, width, height
coordinates = {
"left": location.left,
"top": location.top,
"width": location.width,
"height": location.height,
"center_x": location.left + location.width // 2,
"center_y": location.top + location.height // 2
}
response = {
"type": "image_found",
"message": f"Image found on screen: {find_request.image_path}",
"image_path": find_request.image_path,
"coordinates": coordinates,
"confidence": find_request.confidence,
"return_center": find_request.return_center,
"timestamp": datetime.now().isoformat()
}
logger.info(f"Image found: {image_path} at {coordinates}")
return response
except Exception as e:
logger.error(f"Error finding image: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to find image: {str(e)}"
)
@app.get("/api/screenshot")
async def take_screenshot(filename: str = "screenshot.png"):
"""Take a screenshot and return it as a downloadable file"""
logger.info(f"Taking screenshot with filename: {filename}")
try:
# Take screenshot using pyautogui
screenshot = pyautogui.screenshot()
# Save to an in-memory buffer as PNG
buffer = BytesIO()
screenshot.save(buffer, format="PNG")
buffer.seek(0)
logger.info(f"Screenshot generated in-memory ({buffer.getbuffer().nbytes} bytes)")
# Stream the PNG directly without writing to disk
return StreamingResponse(
buffer,
media_type="image/png",
)
except Exception as e:
logger.error(f"Error taking screenshot: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to take screenshot: {str(e)}"
)
@app.get("/api/poll")
async def poll_messages(timeout: int = 30):
"""Long polling endpoint for receiving broadcast messages"""
# Create a queue for this polling client
client_queue = asyncio.Queue()
polling_clients.append(client_queue)
try:
# Wait for a message with timeout
message = await asyncio.wait_for(client_queue.get(), timeout=timeout)
logger.info(f"Sending broadcast to polling client: {message}")
return message
except asyncio.TimeoutError:
# Return timeout response
return {
"type": "timeout",
"message": "No broadcasts received",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error in polling: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
finally:
# Remove client queue when done
try:
polling_clients.remove(client_queue)
except ValueError:
pass
@app.get("/api/status")
async def get_server_status():
"""Get server status"""
return {
"server_status": "running",
"polling_clients": len(polling_clients),
"timestamp": datetime.now().isoformat()
}
@app.get("/")
async def root():
"""Root endpoint with API information"""
return {
"message": "Simple Automation Server REST API",
"version": "1.0.0",
"endpoints": {
"send_message": "POST /api/message",
"ping": "POST /api/ping",
"broadcast": "POST /api/broadcast",
"click": "POST /api/click",
"enter_text": "POST /api/enter_text",
"find_image": "POST /api/find_image",
"screenshot": "GET /api/screenshot?filename=screenshot.png",
"poll": "GET /api/poll?timeout=30",
"status": "GET /api/status"
},
"documentation": "/docs",
"timestamp": datetime.now().isoformat()
}
if __name__ == "__main__":
try:
uvicorn.run(
app,
host="0.0.0.0",
port=8766,
log_level="info",
access_log=True
)
except KeyboardInterrupt:
logger.info("Server stopped by user")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment