Skip to content

Instantly share code, notes, and snippets.

@c2h2
Created November 20, 2024 09:03
Show Gist options
  • Save c2h2/c15acbed5da90573445782665e3d2cae to your computer and use it in GitHub Desktop.
Save c2h2/c15acbed5da90573445782665e3d2cae to your computer and use it in GitHub Desktop.
flask server for st7789 lcd.
import spidev
import re
import time
from PIL import Image
from periphery import GPIO
from flask import Flask, request, redirect, url_for, render_template_string, flash
from werkzeug.utils import secure_filename
import threading
import os
import sys
import signal
# ---------------------------- Configuration ---------------------------- #
# GPIO Pin Definitions
GPIO_DC_PIN = 21 # Data/Command (DC) pin
GPIO_RST_PIN = 6 # Reset (RST) pin
# SPI Configuration
SPI_DEVICE_PATH = "/dev/spidev0.0"
SPI_MAX_SPEED_HZ = 40000000 # 8 MHz
# Display Configuration
SCREEN_WIDTH = 172
SCREEN_HEIGHT = 320
ROTATION = 0 # 0, 90, 180, 270 degrees
# ST7786 Command Definitions (与ST7789类似)
CMD_SWRESET = 0x01
CMD_SLPOUT = 0x11
CMD_DISPON = 0x29
CMD_CASET = 0x2A
CMD_RASET = 0x2B
CMD_RAMWR = 0x2C
CMD_MADCTL = 0x36
CMD_COLMOD = 0x3A
CMD_INVOFF = 0x20
CMD_INVON = 0x21
# MADCTL Settings for rotation
MADCTL_RGB = 0x00 # RGB 顺序
MADCTL_BGR = 0x08 # BGR 顺序
MADCTL_DICT = {
0: MADCTL_BGR, # 或者 MADCTL_BGR,取决于实际需要
90: MADCTL_RGB | 0x60,
180: MADCTL_RGB | 0xC0,
270: MADCTL_RGB | 0xA0
}
# Flask Configuration
UPLOAD_FOLDER = 'uploads'
ALLOWED_EXTENSIONS = {'jpg', 'jpeg'}
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.secret_key = 'supersecretkey' # Needed for flashing messages
# Ensure upload folder exists
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
# Thread-safe lock for display operations
display_lock = threading.Lock()
# Shared State
current_image = None
zoom_level = 1.0
pan_x = 0.5 # Centered horizontally (0.0 to 1.0)
pan_y = 0.5 # Centered vertically (0.0 to 1.0)
# ---------------------------- Utility Functions ---------------------------- #
def parse_spidev_path(path):
"""
Parse /dev/spidevB.D to get bus and device numbers.
Args:
path (str): SPI device path (e.g., "/dev/spidev0.0")
Returns:
tuple: (bus, device) as integers
"""
match = re.match(r'/dev/spidev(\d+)\.(\d+)', path)
if not match:
raise ValueError(f"Invalid SPI device path: {path}")
bus = int(match.group(1))
device = int(match.group(2))
return bus, device
def allowed_file(filename):
"""Check if the file has an allowed extension."""
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
# ---------------------------- ST7786 Driver ---------------------------- #
class ST7786:
def __init__(self, spi_device_path, dc_pin, rst_pin, spi_max_speed_hz=8000000):
self.spi = spidev.SpiDev()
# Parse bus and device from path
bus, device = parse_spidev_path(spi_device_path)
# Open SPI connection
self.spi.open(bus, device)
self.spi.max_speed_hz = spi_max_speed_hz
self.spi.mode = 0b00 # SPI mode 0
# Initialize GPIO
try:
self.dc = GPIO(dc_pin, "out")
self.rst = GPIO(rst_pin, "out")
except Exception as e:
print(f"Error initializing GPIO: {e}")
sys.exit(1)
self.dc.write(False) # Default to command mode
self.rst.write(False)
def set_starting_x(self, start_x):
"""设置显示屏的起始 X 坐标。"""
end_x = start_x + SCREEN_WIDTH - 1
self.send_command(CMD_CASET)
self.send_data([
(start_x >> 8) & 0xFF, start_x & 0xFF, # Start column high byte, low byte
(end_x >> 8) & 0xFF, end_x & 0xFF # End column high byte, low byte
])
def close(self):
self.spi.close()
self.dc.close()
self.rst.close()
def reset_display(self):
"""Reset the ST7786 display."""
self.rst.write(False)
time.sleep(0.1)
self.rst.write(True)
time.sleep(0.1)
def send_command(self, cmd):
"""Send a command byte to the ST7786."""
self.dc.write(False) # Command mode
self.spi.writebytes([cmd])
def send_data(self, data):
"""Send data bytes to the ST7786 in chunks to avoid OverflowError."""
CHUNK_SIZE = 4096
self.dc.write(True) # Data mode
for i in range(0, len(data), CHUNK_SIZE):
chunk = data[i:i + CHUNK_SIZE]
self.spi.writebytes(chunk)
def init_display(self, rotation=0):
"""Initialize the ST7786 display with necessary commands."""
self.reset_display()
self.send_command(CMD_SWRESET) # Software reset
time.sleep(0.150) # Wait for reset
self.send_command(CMD_SLPOUT) # Exit sleep mode
time.sleep(0.500) # Wait for sleep out
self.send_command(CMD_MADCTL) # Memory access control
self.send_data([MADCTL_DICT.get(rotation, 0x00)])
self.send_command(CMD_COLMOD) # Interface pixel format
self.send_data([0x55]) # 16-bit color
# Set column address
self.send_command(CMD_CASET)
self.send_data([
0x00, 0x00, # Start column
(SCREEN_WIDTH - 1) >> 8, # End column high byte
(SCREEN_WIDTH - 1) & 0xFF # End column low byte
])
# Set row address
self.send_command(CMD_RASET)
self.send_data([
0x00, 0x00, # Start row
(SCREEN_HEIGHT - 1) >> 8, # End row high byte
(SCREEN_HEIGHT - 1) & 0xFF # End row low byte
])
self.send_command(CMD_DISPON) # Turn display on
time.sleep(0.200) # Wait for display to turn on
self.send_command(CMD_INVON) # 关闭显示反转
time.sleep(0.1) # 根据需要添加延时
def write_display(self, pixel_data):
"""Write pixel data to the display RAM."""
self.send_command(CMD_RAMWR)
self.send_data(pixel_data)
# ---------------------------- Image Processing ---------------------------- #
def process_image(image_path):
"""Process the uploaded image for display."""
global zoom_level, pan_x, pan_y
try:
with Image.open(image_path) as img:
img = img.convert("RGB")
return img.copy()
except Exception as e:
print(f"Error processing image {image_path}: {e}")
return None
def convert_image_to_rgb565(img):
"""Convert PIL Image to RGB565 format."""
pixel_data = bytearray()
for pixel in img.getdata():
r, g, b = pixel
# Convert to RGB565
rgb565 = ((b & 0xF8) << 8) | ((g & 0xFC) << 3) | (r >> 3)
pixel_data.append((rgb565 >> 8) & 0xFF) # High byte
pixel_data.append(rgb565 & 0xFF) # Low byte
return pixel_data
def update_display(display):
"""Background thread function to update the display."""
global current_image, zoom_level, pan_x, pan_y
while True:
with display_lock:
if current_image:
# Calculate new size based on zoom level
new_width = int(SCREEN_WIDTH * zoom_level)
new_height = int(SCREEN_HEIGHT * zoom_level)
img = current_image.copy()
img = img.resize((new_width, new_height), Image.ANTIALIAS)
# Calculate cropping box for panning
left = int((new_width - SCREEN_WIDTH) * pan_x)
top = int((new_height - SCREEN_HEIGHT) * pan_y)
right = left + SCREEN_WIDTH
bottom = top + SCREEN_HEIGHT
# Ensure the box is within image boundaries
left = max(0, min(left, new_width - SCREEN_WIDTH))
top = max(0, min(top, new_height - SCREEN_HEIGHT))
right = left + SCREEN_WIDTH
bottom = top + SCREEN_HEIGHT
img = img.crop((left, top, right, bottom))
pixel_data = convert_image_to_rgb565(img)
# Update the display
display.set_starting_x(35)
display.write_display(pixel_data)
time.sleep(0.01) # Refresh rate (10 FPS)
# ---------------------------- Flask Routes ---------------------------- #
# HTML Template for Upload and Controls Page
UPLOAD_PAGE = '''
<!doctype html>
<html>
<head>
<title>SPI ST7786 Display Control</title>
<style>
body { font-family: Arial, sans-serif; text-align: center; }
.controls { margin-top: 20px; }
.controls button { width: 100px; height: 50px; margin: 5px; font-size: 16px; }
.upload { margin-top: 20px; }
.container { display: inline-block; text-align: center; }
</style>
</head>
<body>
<h1>Upload Image to Display</h1>
<div class="upload">
<form method="post" enctype="multipart/form-data" action="/upload">
<input type="file" name="file" accept=".jpg, .jpeg" required>
<br><br>
<input type="submit" value="Upload">
</form>
</div>
{% with messages = get_flashed_messages() %}
{% if messages %}
<ul>
{% for message in messages %}
<li>{{ message }}</li>
{% endfor %}
</ul>
{% endif %}
{% endwith %}
<div class="controls">
<h2>Controls</h2>
<form method="post" action="/control">
<button name="action" value="up">⬆️ Up</button>
<br>
<button name="action" value="left">⬅️ Left</button>
<button name="action" value="down">⬇️ Down</button>
<button name="action" value="right">➡️ Right</button>
<br><br>
<button name="action" value="zoom_in">🔍 Zoom In</button>
<button name="action" value="zoom_out">🔎 Zoom Out</button>
</form>
</div>
</body>
</html>
'''
@app.route('/', methods=['GET'])
def index():
return render_template_string(UPLOAD_PAGE)
@app.route('/upload', methods=['POST'])
def upload_file():
global current_image, zoom_level, pan_x, pan_y
if 'file' not in request.files:
flash('No file part')
return redirect(request.url)
file = request.files['file']
if file.filename == '':
flash('No selected file')
return redirect(request.url)
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
img = process_image(filepath)
if img:
with display_lock:
current_image = img
zoom_level = 1.0
pan_x = 0.5
pan_y = 0.5
flash('Image successfully uploaded and displayed')
else:
flash('Failed to process the image')
return redirect(url_for('index'))
else:
flash('Allowed file types are jpg, jpeg')
return redirect(url_for('index'))
@app.route('/control', methods=['POST'])
def control():
global zoom_level, pan_x, pan_y
action = request.form.get('action')
with display_lock:
if action == 'up':
pan_y -= 0.05
elif action == 'down':
pan_y += 0.05
elif action == 'left':
pan_x -= 0.05
elif action == 'right':
pan_x += 0.05
elif action == 'zoom_in':
zoom_level *= 1.1
elif action == 'zoom_out':
zoom_level /= 1.1
# Clamp values to prevent excessive panning/zooming
zoom_level = max(0.5, min(zoom_level, 3.0))
pan_x = max(0.0, min(pan_x, 1.0))
pan_y = max(0.0, min(pan_y, 1.0))
return redirect(url_for('index'))
# ---------------------------- Cleanup ---------------------------- #
def cleanup(signum, frame):
print("Cleaning up resources...")
try:
display.close()
except Exception as e:
print(f"Error closing display: {e}")
sys.exit(0)
# Register cleanup handler for SIGINT and SIGTERM
signal.signal(signal.SIGINT, cleanup)
signal.signal(signal.SIGTERM, cleanup)
# ---------------------------- Main Execution ---------------------------- #
if __name__ == '__main__':
# Initialize the display
try:
display = ST7786(SPI_DEVICE_PATH, GPIO_DC_PIN, GPIO_RST_PIN, SPI_MAX_SPEED_HZ)
display.init_display(rotation=ROTATION)
print("Display initialized successfully.")
except Exception as e:
print(f"Failed to initialize display: {e}")
sys.exit(1)
# Start the display update thread
display_thread = threading.Thread(target=update_display, args=(display,), daemon=True)
display_thread.start()
print("Display update thread started.")
# Run the Flask app
try:
app.run(host='0.0.0.0', port=5000, threaded=True)
except Exception as e:
print(f"Error running Flask app: {e}")
cleanup(None, None)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment