Created
November 20, 2024 09:03
-
-
Save c2h2/c15acbed5da90573445782665e3d2cae to your computer and use it in GitHub Desktop.
flask server for st7789 lcd.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import spidev | |
import re | |
import time | |
from PIL import Image | |
from periphery import GPIO | |
from flask import Flask, request, redirect, url_for, render_template_string, flash | |
from werkzeug.utils import secure_filename | |
import threading | |
import os | |
import sys | |
import signal | |
# ---------------------------- Configuration ---------------------------- # | |
# GPIO Pin Definitions | |
GPIO_DC_PIN = 21 # Data/Command (DC) pin | |
GPIO_RST_PIN = 6 # Reset (RST) pin | |
# SPI Configuration | |
SPI_DEVICE_PATH = "/dev/spidev0.0" | |
SPI_MAX_SPEED_HZ = 40000000 # 8 MHz | |
# Display Configuration | |
SCREEN_WIDTH = 172 | |
SCREEN_HEIGHT = 320 | |
ROTATION = 0 # 0, 90, 180, 270 degrees | |
# ST7786 Command Definitions (与ST7789类似) | |
CMD_SWRESET = 0x01 | |
CMD_SLPOUT = 0x11 | |
CMD_DISPON = 0x29 | |
CMD_CASET = 0x2A | |
CMD_RASET = 0x2B | |
CMD_RAMWR = 0x2C | |
CMD_MADCTL = 0x36 | |
CMD_COLMOD = 0x3A | |
CMD_INVOFF = 0x20 | |
CMD_INVON = 0x21 | |
# MADCTL Settings for rotation | |
MADCTL_RGB = 0x00 # RGB 顺序 | |
MADCTL_BGR = 0x08 # BGR 顺序 | |
MADCTL_DICT = { | |
0: MADCTL_BGR, # 或者 MADCTL_BGR,取决于实际需要 | |
90: MADCTL_RGB | 0x60, | |
180: MADCTL_RGB | 0xC0, | |
270: MADCTL_RGB | 0xA0 | |
} | |
# Flask Configuration | |
UPLOAD_FOLDER = 'uploads' | |
ALLOWED_EXTENSIONS = {'jpg', 'jpeg'} | |
app = Flask(__name__) | |
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER | |
app.secret_key = 'supersecretkey' # Needed for flashing messages | |
# Ensure upload folder exists | |
os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
# Thread-safe lock for display operations | |
display_lock = threading.Lock() | |
# Shared State | |
current_image = None | |
zoom_level = 1.0 | |
pan_x = 0.5 # Centered horizontally (0.0 to 1.0) | |
pan_y = 0.5 # Centered vertically (0.0 to 1.0) | |
# ---------------------------- Utility Functions ---------------------------- # | |
def parse_spidev_path(path): | |
""" | |
Parse /dev/spidevB.D to get bus and device numbers. | |
Args: | |
path (str): SPI device path (e.g., "/dev/spidev0.0") | |
Returns: | |
tuple: (bus, device) as integers | |
""" | |
match = re.match(r'/dev/spidev(\d+)\.(\d+)', path) | |
if not match: | |
raise ValueError(f"Invalid SPI device path: {path}") | |
bus = int(match.group(1)) | |
device = int(match.group(2)) | |
return bus, device | |
def allowed_file(filename): | |
"""Check if the file has an allowed extension.""" | |
return '.' in filename and \ | |
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
# ---------------------------- ST7786 Driver ---------------------------- # | |
class ST7786: | |
def __init__(self, spi_device_path, dc_pin, rst_pin, spi_max_speed_hz=8000000): | |
self.spi = spidev.SpiDev() | |
# Parse bus and device from path | |
bus, device = parse_spidev_path(spi_device_path) | |
# Open SPI connection | |
self.spi.open(bus, device) | |
self.spi.max_speed_hz = spi_max_speed_hz | |
self.spi.mode = 0b00 # SPI mode 0 | |
# Initialize GPIO | |
try: | |
self.dc = GPIO(dc_pin, "out") | |
self.rst = GPIO(rst_pin, "out") | |
except Exception as e: | |
print(f"Error initializing GPIO: {e}") | |
sys.exit(1) | |
self.dc.write(False) # Default to command mode | |
self.rst.write(False) | |
def set_starting_x(self, start_x): | |
"""设置显示屏的起始 X 坐标。""" | |
end_x = start_x + SCREEN_WIDTH - 1 | |
self.send_command(CMD_CASET) | |
self.send_data([ | |
(start_x >> 8) & 0xFF, start_x & 0xFF, # Start column high byte, low byte | |
(end_x >> 8) & 0xFF, end_x & 0xFF # End column high byte, low byte | |
]) | |
def close(self): | |
self.spi.close() | |
self.dc.close() | |
self.rst.close() | |
def reset_display(self): | |
"""Reset the ST7786 display.""" | |
self.rst.write(False) | |
time.sleep(0.1) | |
self.rst.write(True) | |
time.sleep(0.1) | |
def send_command(self, cmd): | |
"""Send a command byte to the ST7786.""" | |
self.dc.write(False) # Command mode | |
self.spi.writebytes([cmd]) | |
def send_data(self, data): | |
"""Send data bytes to the ST7786 in chunks to avoid OverflowError.""" | |
CHUNK_SIZE = 4096 | |
self.dc.write(True) # Data mode | |
for i in range(0, len(data), CHUNK_SIZE): | |
chunk = data[i:i + CHUNK_SIZE] | |
self.spi.writebytes(chunk) | |
def init_display(self, rotation=0): | |
"""Initialize the ST7786 display with necessary commands.""" | |
self.reset_display() | |
self.send_command(CMD_SWRESET) # Software reset | |
time.sleep(0.150) # Wait for reset | |
self.send_command(CMD_SLPOUT) # Exit sleep mode | |
time.sleep(0.500) # Wait for sleep out | |
self.send_command(CMD_MADCTL) # Memory access control | |
self.send_data([MADCTL_DICT.get(rotation, 0x00)]) | |
self.send_command(CMD_COLMOD) # Interface pixel format | |
self.send_data([0x55]) # 16-bit color | |
# Set column address | |
self.send_command(CMD_CASET) | |
self.send_data([ | |
0x00, 0x00, # Start column | |
(SCREEN_WIDTH - 1) >> 8, # End column high byte | |
(SCREEN_WIDTH - 1) & 0xFF # End column low byte | |
]) | |
# Set row address | |
self.send_command(CMD_RASET) | |
self.send_data([ | |
0x00, 0x00, # Start row | |
(SCREEN_HEIGHT - 1) >> 8, # End row high byte | |
(SCREEN_HEIGHT - 1) & 0xFF # End row low byte | |
]) | |
self.send_command(CMD_DISPON) # Turn display on | |
time.sleep(0.200) # Wait for display to turn on | |
self.send_command(CMD_INVON) # 关闭显示反转 | |
time.sleep(0.1) # 根据需要添加延时 | |
def write_display(self, pixel_data): | |
"""Write pixel data to the display RAM.""" | |
self.send_command(CMD_RAMWR) | |
self.send_data(pixel_data) | |
# ---------------------------- Image Processing ---------------------------- # | |
def process_image(image_path): | |
"""Process the uploaded image for display.""" | |
global zoom_level, pan_x, pan_y | |
try: | |
with Image.open(image_path) as img: | |
img = img.convert("RGB") | |
return img.copy() | |
except Exception as e: | |
print(f"Error processing image {image_path}: {e}") | |
return None | |
def convert_image_to_rgb565(img): | |
"""Convert PIL Image to RGB565 format.""" | |
pixel_data = bytearray() | |
for pixel in img.getdata(): | |
r, g, b = pixel | |
# Convert to RGB565 | |
rgb565 = ((b & 0xF8) << 8) | ((g & 0xFC) << 3) | (r >> 3) | |
pixel_data.append((rgb565 >> 8) & 0xFF) # High byte | |
pixel_data.append(rgb565 & 0xFF) # Low byte | |
return pixel_data | |
def update_display(display): | |
"""Background thread function to update the display.""" | |
global current_image, zoom_level, pan_x, pan_y | |
while True: | |
with display_lock: | |
if current_image: | |
# Calculate new size based on zoom level | |
new_width = int(SCREEN_WIDTH * zoom_level) | |
new_height = int(SCREEN_HEIGHT * zoom_level) | |
img = current_image.copy() | |
img = img.resize((new_width, new_height), Image.ANTIALIAS) | |
# Calculate cropping box for panning | |
left = int((new_width - SCREEN_WIDTH) * pan_x) | |
top = int((new_height - SCREEN_HEIGHT) * pan_y) | |
right = left + SCREEN_WIDTH | |
bottom = top + SCREEN_HEIGHT | |
# Ensure the box is within image boundaries | |
left = max(0, min(left, new_width - SCREEN_WIDTH)) | |
top = max(0, min(top, new_height - SCREEN_HEIGHT)) | |
right = left + SCREEN_WIDTH | |
bottom = top + SCREEN_HEIGHT | |
img = img.crop((left, top, right, bottom)) | |
pixel_data = convert_image_to_rgb565(img) | |
# Update the display | |
display.set_starting_x(35) | |
display.write_display(pixel_data) | |
time.sleep(0.01) # Refresh rate (10 FPS) | |
# ---------------------------- Flask Routes ---------------------------- # | |
# HTML Template for Upload and Controls Page | |
UPLOAD_PAGE = ''' | |
<!doctype html> | |
<html> | |
<head> | |
<title>SPI ST7786 Display Control</title> | |
<style> | |
body { font-family: Arial, sans-serif; text-align: center; } | |
.controls { margin-top: 20px; } | |
.controls button { width: 100px; height: 50px; margin: 5px; font-size: 16px; } | |
.upload { margin-top: 20px; } | |
.container { display: inline-block; text-align: center; } | |
</style> | |
</head> | |
<body> | |
<h1>Upload Image to Display</h1> | |
<div class="upload"> | |
<form method="post" enctype="multipart/form-data" action="/upload"> | |
<input type="file" name="file" accept=".jpg, .jpeg" required> | |
<br><br> | |
<input type="submit" value="Upload"> | |
</form> | |
</div> | |
{% with messages = get_flashed_messages() %} | |
{% if messages %} | |
<ul> | |
{% for message in messages %} | |
<li>{{ message }}</li> | |
{% endfor %} | |
</ul> | |
{% endif %} | |
{% endwith %} | |
<div class="controls"> | |
<h2>Controls</h2> | |
<form method="post" action="/control"> | |
<button name="action" value="up">⬆️ Up</button> | |
<br> | |
<button name="action" value="left">⬅️ Left</button> | |
<button name="action" value="down">⬇️ Down</button> | |
<button name="action" value="right">➡️ Right</button> | |
<br><br> | |
<button name="action" value="zoom_in">🔍 Zoom In</button> | |
<button name="action" value="zoom_out">🔎 Zoom Out</button> | |
</form> | |
</div> | |
</body> | |
</html> | |
''' | |
@app.route('/', methods=['GET']) | |
def index(): | |
return render_template_string(UPLOAD_PAGE) | |
@app.route('/upload', methods=['POST']) | |
def upload_file(): | |
global current_image, zoom_level, pan_x, pan_y | |
if 'file' not in request.files: | |
flash('No file part') | |
return redirect(request.url) | |
file = request.files['file'] | |
if file.filename == '': | |
flash('No selected file') | |
return redirect(request.url) | |
if file and allowed_file(file.filename): | |
filename = secure_filename(file.filename) | |
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
file.save(filepath) | |
img = process_image(filepath) | |
if img: | |
with display_lock: | |
current_image = img | |
zoom_level = 1.0 | |
pan_x = 0.5 | |
pan_y = 0.5 | |
flash('Image successfully uploaded and displayed') | |
else: | |
flash('Failed to process the image') | |
return redirect(url_for('index')) | |
else: | |
flash('Allowed file types are jpg, jpeg') | |
return redirect(url_for('index')) | |
@app.route('/control', methods=['POST']) | |
def control(): | |
global zoom_level, pan_x, pan_y | |
action = request.form.get('action') | |
with display_lock: | |
if action == 'up': | |
pan_y -= 0.05 | |
elif action == 'down': | |
pan_y += 0.05 | |
elif action == 'left': | |
pan_x -= 0.05 | |
elif action == 'right': | |
pan_x += 0.05 | |
elif action == 'zoom_in': | |
zoom_level *= 1.1 | |
elif action == 'zoom_out': | |
zoom_level /= 1.1 | |
# Clamp values to prevent excessive panning/zooming | |
zoom_level = max(0.5, min(zoom_level, 3.0)) | |
pan_x = max(0.0, min(pan_x, 1.0)) | |
pan_y = max(0.0, min(pan_y, 1.0)) | |
return redirect(url_for('index')) | |
# ---------------------------- Cleanup ---------------------------- # | |
def cleanup(signum, frame): | |
print("Cleaning up resources...") | |
try: | |
display.close() | |
except Exception as e: | |
print(f"Error closing display: {e}") | |
sys.exit(0) | |
# Register cleanup handler for SIGINT and SIGTERM | |
signal.signal(signal.SIGINT, cleanup) | |
signal.signal(signal.SIGTERM, cleanup) | |
# ---------------------------- Main Execution ---------------------------- # | |
if __name__ == '__main__': | |
# Initialize the display | |
try: | |
display = ST7786(SPI_DEVICE_PATH, GPIO_DC_PIN, GPIO_RST_PIN, SPI_MAX_SPEED_HZ) | |
display.init_display(rotation=ROTATION) | |
print("Display initialized successfully.") | |
except Exception as e: | |
print(f"Failed to initialize display: {e}") | |
sys.exit(1) | |
# Start the display update thread | |
display_thread = threading.Thread(target=update_display, args=(display,), daemon=True) | |
display_thread.start() | |
print("Display update thread started.") | |
# Run the Flask app | |
try: | |
app.run(host='0.0.0.0', port=5000, threaded=True) | |
except Exception as e: | |
print(f"Error running Flask app: {e}") | |
cleanup(None, None) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment