Skip to content

Instantly share code, notes, and snippets.

@emctoo
Last active May 14, 2025 07:31
Show Gist options
  • Select an option

  • Save emctoo/f12df77da77272dd8671da194bc4fbc1 to your computer and use it in GitHub Desktop.

Select an option

Save emctoo/f12df77da77272dd8671da194bc4fbc1 to your computer and use it in GitHub Desktop.
A simple file server

nginx

location /my-test {
    proxy_pass http://localhost:8088/;

    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    proxy_set_header X-Forwarded-Proto $scheme;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
}
#!/usr/bin/env python3
"""
HTTP File Server CLI Client
A command-line tool for interacting with the HTTP file server.
Uses only Python standard library.
"""
import argparse
import json
import os
import sys
import urllib.request
import urllib.error
import urllib.parse
import http.client
import mimetypes
import uuid
import logging
import dotenv
dotenv.load_dotenv()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
log = logging.getLogger(__name__)
DEFAULT_SERVER = "http://localhost:2025"
def generate_boundary():
"""Generate a unique boundary for multipart form data."""
return f"----WebKitFormBoundary{uuid.uuid4().hex}"
def create_form_data(file_path, boundary):
"""Create multipart form data for file upload."""
filename = os.path.basename(file_path)
mime_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream'
with open(file_path, 'rb') as f:
file_data = f.read()
data = []
data.append(f'--{boundary}'.encode())
data.append(f'Content-Disposition: form-data; name="file"; filename="{filename}"'.encode())
data.append(f'Content-Type: {mime_type}'.encode())
data.append(b'')
data.append(file_data)
data.append(f'--{boundary}--'.encode())
return b'\r\n'.join(data)
def list_files(server_url):
"""List all files available on the server."""
url = f"{server_url}/list/"
log.info('visiting %s', url)
try:
with urllib.request.urlopen(url) as response:
data = json.loads(response.read().decode())
if 'files' in data and data['files']:
print("Available files:")
for i, file in enumerate(data['files'], 1):
print(f"{i}. {file}")
else:
print("No files available on the server.")
except urllib.error.URLError as e:
log.error("Could not connect to the server. %s", e)
except json.JSONDecodeError:
log.error("Invalid response from server.")
def upload_file(server_url, file_path):
"""Upload a file to the server."""
if not os.path.exists(file_path):
print(f"Error: File '{file_path}' does not exist.")
return
url = f"{server_url}/upload/"
parsed_url = urllib.parse.urlparse(url)
boundary = generate_boundary()
form_data = create_form_data(file_path, boundary)
try:
if parsed_url.scheme == 'https':
conn = http.client.HTTPSConnection(parsed_url.netloc)
else:
conn = http.client.HTTPConnection(parsed_url.netloc)
headers = {
'Content-Type': f'multipart/form-data; boundary={boundary}',
'Content-Length': str(len(form_data))
}
path = parsed_url.path
if parsed_url.query:
path += '?' + parsed_url.query
conn.request('POST', path, body=form_data, headers=headers)
response = conn.getresponse()
if response.status == 200:
result = json.loads(response.read().decode())
print(f"Success: {result.get('status', 'File uploaded')}")
else:
print(f"Error: {response.status} {response.reason}")
data = response.read().decode()
try:
error_data = json.loads(data)
if 'detail' in error_data:
print(error_data['detail'])
except json.JSONDecodeError:
print(data)
conn.close()
except Exception as e:
print(f"Error: {str(e)}")
def download_file(server_url, filename, output_path=None):
"""Download a file from the server."""
url = f"{server_url}/download/{urllib.parse.quote(filename)}"
if not output_path:
output_path = filename
try:
with urllib.request.urlopen(url) as response:
with open(output_path, 'wb') as out_file:
out_file.write(response.read())
print(f"File '{filename}' downloaded successfully to '{output_path}'")
except urllib.error.HTTPError as e:
if e.code == 404:
print(f"Error: File '{filename}' not found on the server.")
else:
print(f"Error: HTTP {e.code} - {e.reason}")
except urllib.error.URLError as e:
print(f"Error: Could not connect to the server. {e}")
except Exception as e:
print(f"Error downloading file: {str(e)}")
def main():
parser = argparse.ArgumentParser(description="HTTP File Server CLI Client")
parser.add_argument('--server',
default=os.getenv('SERVER', DEFAULT_SERVER),
help=f"Server URL (default: {DEFAULT_SERVER})")
subparsers = parser.add_subparsers(dest='command', required=True, help='Command to execute')
# List command
list_parser = subparsers.add_parser('list', help='List files on the server')
# Upload command
upload_parser = subparsers.add_parser('upload', help='Upload a file to the server')
upload_parser.add_argument('file', help='Path to the file to upload')
# Download command
download_parser = subparsers.add_parser('download', help='Download a file from the server')
download_parser.add_argument('filename', help='Name of the file to download')
download_parser.add_argument('-o', '--output', help='Output path (default: same as filename)')
args = parser.parse_args()
if args.command == 'list':
list_files(args.server)
elif args.command == 'upload':
upload_file(args.server, args.file)
elif args.command == 'download':
download_file(args.server, args.filename, args.output)
if __name__ == "__main__":
main()
import os
import shutil
from typing import List, Optional
import pathlib
import logging
import secrets
from fastapi import FastAPI, UploadFile, File, HTTPException, Request, Depends, status
from fastapi.responses import FileResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBasic, HTTPBasicCredentials
import dotenv
dotenv.load_dotenv()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
log = logging.getLogger(__name__)
# Get root path from environment variable or use empty string for direct access
ROOT_PATH = os.environ.get("ROOT_PATH", "")
log.info(f"Using root path: {ROOT_PATH}")
# Authentication credentials from environment variables or use defaults
API_USERNAME = os.environ.get("API_USERNAME", "admin")
API_PASSWORD = os.environ.get("API_PASSWORD", "password")
log.info(f"Auth enabled with username: {API_USERNAME}")
# When behind Nginx with location /my-test, set ROOT_PATH="/my-test"
app = FastAPI(title="HTTP File Server", root_path=ROOT_PATH)
# Security scheme
security = HTTPBasic()
# Authentication function
def authenticate(credentials: HTTPBasicCredentials = Depends(security)):
is_username_correct = secrets.compare_digest(credentials.username, API_USERNAME)
is_password_correct = secrets.compare_digest(credentials.password, API_PASSWORD)
if not (is_username_correct and is_password_correct):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Basic"},
)
return credentials.username
# Add CORS middleware to allow requests from other domains
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Get upload directory from environment variable or use default
UPLOAD_DIR = pathlib.Path(os.environ.get("UPLOAD_DIR", "./uploads"))
os.makedirs(UPLOAD_DIR, exist_ok=True)
log.info(f"Files will be uploaded to: {os.path.abspath(UPLOAD_DIR)}")
def get_base_url(request: Request) -> str:
"""Get the base URL including the root path if present"""
host = request.headers.get("host", "localhost")
scheme = request.headers.get("x-forwarded-proto", "http")
prefix = ROOT_PATH.rstrip("/")
return f"{scheme}://{host}{prefix}"
# Add endpoint that handles both /upload/ and //upload paths
@app.post("/upload/")
@app.post("//upload")
async def upload_file(file: UploadFile = File(...), request: Optional[Request] = None, username: str = Depends(authenticate)):
"""
Upload a single file to the server
"""
if file.filename is None:
log.info('filename is None')
raise HTTPException(status_code=400, detail="No file provided")
try:
file_path = UPLOAD_DIR / file.filename
log.info(f"Uploading file to {file_path} by user {username}")
if file_path.exists():
raise HTTPException(status_code=400, detail="File already exists")
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
return {
"filename": file.filename,
"status": "File uploaded successfully",
"download_url": f"{get_base_url(request)}/download/{file.filename}"
}
except Exception as e:
log.error(f"Upload failed: {str(e)}")
raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}")
# Also add endpoint for //upload-multiple
@app.post("/upload-multiple/")
@app.post("//upload-multiple")
async def upload_multiple_files(files: List[UploadFile] = File(...), request: Request = None, username: str = Depends(authenticate)):
"""
Upload multiple files to the server
"""
results = []
base_url = get_base_url(request)
for file in files:
try:
if file.filename is None:
log.info('filename is None')
continue
file_path = UPLOAD_DIR / file.filename
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
results.append({
"filename": file.filename,
"status": "success",
"download_url": f"{base_url}/download/{file.filename}"
})
except Exception as e:
results.append({"filename": file.filename, "status": "failed", "error": str(e)})
return {"uploaded_files": results}
# Also add endpoint for //download/{filename}
@app.get("/download/{filename}")
@app.get("//download/{filename}")
async def download_file(filename: str, username: str = Depends(authenticate)):
"""
Download a file from the server
"""
file_path = os.path.join(UPLOAD_DIR, filename)
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(file_path, filename=filename)
# Also add endpoint for //list
@app.get("/list/")
@app.get("//list")
async def list_files(request: Request = None, username: str = Depends(authenticate)):
"""
List all files available for download
"""
try:
files = os.listdir(UPLOAD_DIR)
# Include download URLs in the response
base_url = get_base_url(request)
file_details = [
{
"filename": filename,
"download_url": f"{base_url}/download/{filename}"
}
for filename in files
]
return {"files": file_details}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to list files: {str(e)}")
# Add root route for easy testing
@app.get("/")
@app.get("//")
async def root():
return {
"message": "HTTP File Server API is running",
"endpoints": {
"upload": "/upload/",
"upload_multiple": "/upload-multiple/",
"download": "/download/{filename}",
"list": "/list/"
}
}
if __name__ == "__main__":
import uvicorn
port = int(os.environ.get("PORT", 2025))
log.info(f"Starting server on port {port}")
uvicorn.run(app, host="0.0.0.0", port=port)
fastapi==0.115.12
uvicorn==0.34.2
python-multipart==0.0.20
python-dotenv==1.0.0
#!/usr/bin/env bash
# Configuration
SERVER="${SERVER:-http://localhost:2025}"
USERNAME="${API_USERNAME:-admin}"
PASSWORD="${API_PASSWORD:-password}"
# Check command line arguments
if [ $# -lt 1 ]; then
echo "Error: Watch directory not specified."
echo "Usage: $0 <watch_directory>"
echo "Example: $0 /path/to/watch"
exit 1
fi
WATCH_DIR="$1"
# Verify that the directory exists
if [ ! -d "$WATCH_DIR" ]; then
echo "Error: Directory '$WATCH_DIR' does not exist or is not a directory."
exit 1
fi
# Check if watchexec is installed
if ! command -v watchexec &> /dev/null; then
echo "Error: watchexec is not installed. Please install it first."
echo "You can install it via cargo: cargo install watchexec-cli"
echo "Or check https://github.com/watchexec/watchexec for other installation methods."
exit 1
fi
# Check if curl is installed
if ! command -v curl &> /dev/null; then
echo "Error: curl is not installed. Please install it first."
exit 1
fi
echo "Starting file watch in directory: $WATCH_DIR"
echo "Server endpoint: $SERVER"
echo "Using auth username: $USERNAME"
echo "Use Ctrl+C to stop watching"
# Run watchexec to monitor file changes
# -w: Watch directory
# -e: File extensions to monitor (adjust as needed)
# --ignore: Ignore patterns (e.g., temporary files)
watchexec -w "$WATCH_DIR" -e "*" --ignore "*.tmp,*.swp,*.~,*.log" \
"FILE={path}; echo 'File changed: $FILE'; if [ -f \"$FILE\" ]; then
echo 'Uploading $FILE to $SERVER/upload/';
curl -v --user \"$USERNAME:$PASSWORD\" -F 'file=@$FILE' '$SERVER/upload/';
echo '';
fi"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment