Created
August 1, 2025 03:52
-
-
Save derekantrican/8678fd407f95969a9197c4016826b134 to your computer and use it in GitHub Desktop.
This is a simple flask server demonstrating a small spy camera on a raspberry pi
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import subprocess | |
import threading | |
import time | |
import socket | |
import requests | |
import logging | |
from flask import Flask, render_template_string, request, send_from_directory, redirect, url_for, jsonify | |
logging.basicConfig( | |
filename='spycam.log', | |
level=logging.INFO, | |
format='%(asctime)s %(levelname)s: %(message)s', | |
) | |
app = Flask(__name__) | |
# Discord webhook URL — replace with your actual webhook URL | |
DISCORD_WEBHOOK_URL = "DISCORD_WEBHOOK" | |
VIDEO_DIR = "recordings" | |
if not os.path.exists(VIDEO_DIR): | |
os.makedirs(VIDEO_DIR) | |
PREVIEW_IMAGE = "preview.jpg" | |
RPICAM_JPEG = "/usr/bin/rpicam-jpeg" | |
RPICAM_VID = "/usr/bin/rpicam-vid" | |
MP4BOX = "/usr/bin/MP4Box" | |
recording_process = None | |
current_h264 = None | |
recording_start_time = None | |
camera_lock = threading.Lock() | |
HTML = ''' | |
<!doctype html> | |
<html lang="en"> | |
<head> | |
<meta charset="utf-8"> | |
<title>SpyCam</title> | |
<meta name="viewport" content="width=device-width, initial-scale=1"> | |
</head> | |
<body> | |
<h1>SpyCam Control</h1> | |
<h2>Live Preview</h2> | |
<img src="/preview.jpg?{{time}}" alt="Preview" width="320" height="240" style="border:1px solid #ccc"><br><br> | |
<form method="POST" action="/start" style="display:inline;"> | |
<button type="submit" {{ 'disabled' if recording }}>Start Recording</button> | |
</form> | |
<form method="POST" action="/stop" style="display:inline;"> | |
<button type="submit" {{ '' if recording else 'disabled' }}>Stop Recording</button> | |
</form> | |
<h2>Recordings</h2> | |
<ul> | |
{% for fname, is_converting in recordings %} | |
<li> | |
{{ fname }} | |
{% if is_converting %} | |
<span style="color: gray;">(Converting...)</span> | |
{% else %} | |
<a href="/download/{{ fname }}">[Download]</a> | |
<a href="/delete/{{ fname }}" onclick="return confirm('Delete this recording?');">[Delete]</a> | |
{% endif %} | |
</li> | |
{% else %} | |
<li>No recordings yet.</li> | |
{% endfor %} | |
</ul> | |
<script> | |
const previewImg = document.querySelector('img[alt="Preview"]'); | |
setInterval(() => { | |
const timestamp = new Date().getTime(); | |
previewImg.src = '/preview.jpg?' + timestamp; | |
}, 1000); | |
function fetchLogs() { | |
fetch('/logs') | |
.then(response => response.json()) | |
.then(data => { | |
if(data.lines){ | |
console.clear(); | |
data.lines.forEach(line => console.log(line.trim())); | |
} | |
if(data.error){ | |
console.error('Log error:', data.error); | |
} | |
}) | |
.catch(err => console.error('Fetch error:', err)); | |
} | |
// Fetch logs every 5 seconds | |
setInterval(fetchLogs, 5000); | |
fetchLogs(); | |
</script> | |
</body> | |
</html> | |
''' | |
def get_ip(): | |
"""Get the local IP address of the Pi.""" | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
try: | |
s.connect(("8.8.8.8", 80)) | |
ip = s.getsockname()[0] | |
except Exception: | |
ip = "Unknown" | |
finally: | |
s.close() | |
return ip | |
def send_discord_startup_message(): | |
ip = get_ip() | |
content = f"🛡️ SpyCam server started! Access it at: http://{ip}:5000" | |
try: | |
requests.post(DISCORD_WEBHOOK_URL, json={"content": content}) | |
print("Discord startup message sent.") | |
except Exception as e: | |
print("Failed to send Discord message:", e) | |
def capture_preview(): | |
while True: | |
acquired = camera_lock.acquire(timeout=0.1) | |
if acquired: | |
try: | |
subprocess.run([ | |
RPICAM_JPEG, | |
"-o", PREVIEW_IMAGE, | |
"--width", "640", | |
"--height", "480", | |
"--nopreview", | |
"-t", "1000" | |
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
except Exception as e: | |
print("Preview capture error:", e) | |
finally: | |
camera_lock.release() | |
time.sleep(1) | |
def convert_to_mp4_async(h264_path, mp4_path): | |
logging.info(f"launching conversion for {h264_path} to {mp4_path}") | |
flag_file = mp4_path + ".converting" | |
open(flag_file, "w").close() # create empty flag file | |
try: | |
logging.info("starting conversion") | |
subprocess.run([MP4BOX, "-add", h264_path, mp4_path], | |
check=True, capture_output=True, text=True) | |
logging.info(f"Conversion complete: {mp4_path}") | |
os.remove(h264_path) | |
except subprocess.CalledProcessError as e: | |
logging.error(f"Conversion failed: {e}") | |
logging.error("stdout:\n" + e.stdout) | |
logging.error("stderr:\n" + e.stderr) | |
finally: | |
if os.path.exists(flag_file): | |
os.remove(flag_file) | |
@app.route("/") | |
def index(): | |
recordings = [] | |
for f in os.listdir(VIDEO_DIR): | |
if f.endswith(".mp4"): | |
full_path = os.path.join(VIDEO_DIR, f) | |
converting_flag = full_path + ".converting" | |
recordings.append((f, os.path.exists(converting_flag))) | |
recordings.sort(reverse=True, key=lambda x: x[0]) | |
recording = recording_process is not None | |
return render_template_string(HTML, recordings=recordings, recording=recording, time=int(time.time())) | |
@app.route("/preview.jpg") | |
def preview(): | |
return send_from_directory(".", PREVIEW_IMAGE) | |
@app.route("/start", methods=["POST"]) | |
def start_recording(): | |
global recording_process, current_h264, recording_start_time | |
if recording_process is not None: | |
return redirect(url_for("index")) | |
timestamp = time.strftime("%Y%m%d-%H%M%S") | |
current_h264 = os.path.join(VIDEO_DIR, f"{timestamp}.h264") | |
mp4_file = os.path.join(VIDEO_DIR, f"{timestamp}.mp4") | |
if os.path.exists(mp4_file): | |
os.remove(mp4_file) | |
try: | |
logging.info("Starting recording...") | |
with camera_lock: | |
recording_process = subprocess.Popen([ | |
RPICAM_VID, | |
"-o", current_h264, | |
"--codec", "h264", | |
"--timeout", "0" | |
], | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True # Python 3.7+ for automatic decoding | |
) | |
logging.info(f"Recording process started with PID {recording_process.pid}") | |
except Exception as e: | |
logging.error(f"Failed to start recording: {e}") | |
recording_start_time = time.time() | |
return redirect(url_for("index")) | |
@app.route("/stop", methods=["POST"]) | |
def stop_recording(): | |
global recording_process, current_h264, recording_start_time | |
if recording_process is None: | |
return redirect(url_for("index")) | |
with camera_lock: | |
recording_process.terminate() | |
exitcode = recording_process.wait() | |
stdout_data, stderr_data = recording_process.communicate() | |
if exitcode != 0: | |
logging.error(f"Recording process exited with code {exitcode}") | |
logging.error(f"stderr: {stderr_data}") | |
logging.error(f"stdout: {stdout_data}") | |
else: | |
logging.info("Recording stopped successfully") | |
recording_process = None | |
if current_h264 and os.path.exists(current_h264): | |
h264_path = current_h264 | |
mp4_file = h264_path.replace(".h264", ".mp4") | |
current_h264 = None | |
threading.Thread( # Kick off the conversion async | |
target=convert_to_mp4_async, | |
args=(h264_path, mp4_file), | |
daemon=True | |
).start() | |
recording_start_time = None | |
return redirect(url_for("index")) | |
@app.route("/download/<filename>") | |
def download(filename): | |
return send_from_directory(VIDEO_DIR, filename, as_attachment=True) | |
@app.route("/delete/<filename>") | |
def delete(filename): | |
path = os.path.join(VIDEO_DIR, filename) | |
if os.path.exists(path): | |
os.remove(path) | |
return redirect(url_for("index")) | |
@app.route('/logs') | |
def get_logs(): | |
try: | |
with open('spycam.log', 'r') as f: | |
lines = f.readlines() | |
# Send last 20 lines as JSON | |
return jsonify(lines=lines[-20:]) | |
except Exception as e: | |
return jsonify(error=str(e)), 500 | |
if __name__ == "__main__": | |
send_discord_startup_message() | |
preview_thread = threading.Thread(target=capture_preview, daemon=True) | |
preview_thread.start() | |
app.run(host="0.0.0.0", port=5000) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment