Skip to content

Instantly share code, notes, and snippets.

@derekantrican
Created August 1, 2025 03:52
Show Gist options
  • Save derekantrican/8678fd407f95969a9197c4016826b134 to your computer and use it in GitHub Desktop.
Save derekantrican/8678fd407f95969a9197c4016826b134 to your computer and use it in GitHub Desktop.
This is a simple flask server demonstrating a small spy camera on a raspberry pi
import os
import subprocess
import threading
import time
import socket
import requests
import logging
from flask import Flask, render_template_string, request, send_from_directory, redirect, url_for, jsonify
logging.basicConfig(
filename='spycam.log',
level=logging.INFO,
format='%(asctime)s %(levelname)s: %(message)s',
)
app = Flask(__name__)
# Discord webhook URL — replace with your actual webhook URL
DISCORD_WEBHOOK_URL = "DISCORD_WEBHOOK"
VIDEO_DIR = "recordings"
if not os.path.exists(VIDEO_DIR):
os.makedirs(VIDEO_DIR)
PREVIEW_IMAGE = "preview.jpg"
RPICAM_JPEG = "/usr/bin/rpicam-jpeg"
RPICAM_VID = "/usr/bin/rpicam-vid"
MP4BOX = "/usr/bin/MP4Box"
recording_process = None
current_h264 = None
recording_start_time = None
camera_lock = threading.Lock()
HTML = '''
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>SpyCam</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
</head>
<body>
<h1>SpyCam Control</h1>
<h2>Live Preview</h2>
<img src="/preview.jpg?{{time}}" alt="Preview" width="320" height="240" style="border:1px solid #ccc"><br><br>
<form method="POST" action="/start" style="display:inline;">
<button type="submit" {{ 'disabled' if recording }}>Start Recording</button>
</form>
<form method="POST" action="/stop" style="display:inline;">
<button type="submit" {{ '' if recording else 'disabled' }}>Stop Recording</button>
</form>
<h2>Recordings</h2>
<ul>
{% for fname, is_converting in recordings %}
<li>
{{ fname }}
{% if is_converting %}
<span style="color: gray;">(Converting...)</span>
{% else %}
<a href="/download/{{ fname }}">[Download]</a>
<a href="/delete/{{ fname }}" onclick="return confirm('Delete this recording?');">[Delete]</a>
{% endif %}
</li>
{% else %}
<li>No recordings yet.</li>
{% endfor %}
</ul>
<script>
const previewImg = document.querySelector('img[alt="Preview"]');
setInterval(() => {
const timestamp = new Date().getTime();
previewImg.src = '/preview.jpg?' + timestamp;
}, 1000);
function fetchLogs() {
fetch('/logs')
.then(response => response.json())
.then(data => {
if(data.lines){
console.clear();
data.lines.forEach(line => console.log(line.trim()));
}
if(data.error){
console.error('Log error:', data.error);
}
})
.catch(err => console.error('Fetch error:', err));
}
// Fetch logs every 5 seconds
setInterval(fetchLogs, 5000);
fetchLogs();
</script>
</body>
</html>
'''
def get_ip():
"""Get the local IP address of the Pi."""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
except Exception:
ip = "Unknown"
finally:
s.close()
return ip
def send_discord_startup_message():
ip = get_ip()
content = f"🛡️ SpyCam server started! Access it at: http://{ip}:5000"
try:
requests.post(DISCORD_WEBHOOK_URL, json={"content": content})
print("Discord startup message sent.")
except Exception as e:
print("Failed to send Discord message:", e)
def capture_preview():
while True:
acquired = camera_lock.acquire(timeout=0.1)
if acquired:
try:
subprocess.run([
RPICAM_JPEG,
"-o", PREVIEW_IMAGE,
"--width", "640",
"--height", "480",
"--nopreview",
"-t", "1000"
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except Exception as e:
print("Preview capture error:", e)
finally:
camera_lock.release()
time.sleep(1)
def convert_to_mp4_async(h264_path, mp4_path):
logging.info(f"launching conversion for {h264_path} to {mp4_path}")
flag_file = mp4_path + ".converting"
open(flag_file, "w").close() # create empty flag file
try:
logging.info("starting conversion")
subprocess.run([MP4BOX, "-add", h264_path, mp4_path],
check=True, capture_output=True, text=True)
logging.info(f"Conversion complete: {mp4_path}")
os.remove(h264_path)
except subprocess.CalledProcessError as e:
logging.error(f"Conversion failed: {e}")
logging.error("stdout:\n" + e.stdout)
logging.error("stderr:\n" + e.stderr)
finally:
if os.path.exists(flag_file):
os.remove(flag_file)
@app.route("/")
def index():
recordings = []
for f in os.listdir(VIDEO_DIR):
if f.endswith(".mp4"):
full_path = os.path.join(VIDEO_DIR, f)
converting_flag = full_path + ".converting"
recordings.append((f, os.path.exists(converting_flag)))
recordings.sort(reverse=True, key=lambda x: x[0])
recording = recording_process is not None
return render_template_string(HTML, recordings=recordings, recording=recording, time=int(time.time()))
@app.route("/preview.jpg")
def preview():
return send_from_directory(".", PREVIEW_IMAGE)
@app.route("/start", methods=["POST"])
def start_recording():
global recording_process, current_h264, recording_start_time
if recording_process is not None:
return redirect(url_for("index"))
timestamp = time.strftime("%Y%m%d-%H%M%S")
current_h264 = os.path.join(VIDEO_DIR, f"{timestamp}.h264")
mp4_file = os.path.join(VIDEO_DIR, f"{timestamp}.mp4")
if os.path.exists(mp4_file):
os.remove(mp4_file)
try:
logging.info("Starting recording...")
with camera_lock:
recording_process = subprocess.Popen([
RPICAM_VID,
"-o", current_h264,
"--codec", "h264",
"--timeout", "0"
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True # Python 3.7+ for automatic decoding
)
logging.info(f"Recording process started with PID {recording_process.pid}")
except Exception as e:
logging.error(f"Failed to start recording: {e}")
recording_start_time = time.time()
return redirect(url_for("index"))
@app.route("/stop", methods=["POST"])
def stop_recording():
global recording_process, current_h264, recording_start_time
if recording_process is None:
return redirect(url_for("index"))
with camera_lock:
recording_process.terminate()
exitcode = recording_process.wait()
stdout_data, stderr_data = recording_process.communicate()
if exitcode != 0:
logging.error(f"Recording process exited with code {exitcode}")
logging.error(f"stderr: {stderr_data}")
logging.error(f"stdout: {stdout_data}")
else:
logging.info("Recording stopped successfully")
recording_process = None
if current_h264 and os.path.exists(current_h264):
h264_path = current_h264
mp4_file = h264_path.replace(".h264", ".mp4")
current_h264 = None
threading.Thread( # Kick off the conversion async
target=convert_to_mp4_async,
args=(h264_path, mp4_file),
daemon=True
).start()
recording_start_time = None
return redirect(url_for("index"))
@app.route("/download/<filename>")
def download(filename):
return send_from_directory(VIDEO_DIR, filename, as_attachment=True)
@app.route("/delete/<filename>")
def delete(filename):
path = os.path.join(VIDEO_DIR, filename)
if os.path.exists(path):
os.remove(path)
return redirect(url_for("index"))
@app.route('/logs')
def get_logs():
try:
with open('spycam.log', 'r') as f:
lines = f.readlines()
# Send last 20 lines as JSON
return jsonify(lines=lines[-20:])
except Exception as e:
return jsonify(error=str(e)), 500
if __name__ == "__main__":
send_discord_startup_message()
preview_thread = threading.Thread(target=capture_preview, daemon=True)
preview_thread.start()
app.run(host="0.0.0.0", port=5000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment