location /my-test {
proxy_pass http://localhost:8088/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
Last active
May 14, 2025 07:31
-
-
Save emctoo/f12df77da77272dd8671da194bc4fbc1 to your computer and use it in GitHub Desktop.
A simple file server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| .env | |
| .envrc |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| HTTP File Server CLI Client | |
| A command-line tool for interacting with the HTTP file server. | |
| Uses only Python standard library. | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| import sys | |
| import urllib.request | |
| import urllib.error | |
| import urllib.parse | |
| import http.client | |
| import mimetypes | |
| import uuid | |
| import logging | |
| import dotenv | |
| dotenv.load_dotenv() | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| log = logging.getLogger(__name__) | |
| DEFAULT_SERVER = "http://localhost:2025" | |
| def generate_boundary(): | |
| """Generate a unique boundary for multipart form data.""" | |
| return f"----WebKitFormBoundary{uuid.uuid4().hex}" | |
| def create_form_data(file_path, boundary): | |
| """Create multipart form data for file upload.""" | |
| filename = os.path.basename(file_path) | |
| mime_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream' | |
| with open(file_path, 'rb') as f: | |
| file_data = f.read() | |
| data = [] | |
| data.append(f'--{boundary}'.encode()) | |
| data.append(f'Content-Disposition: form-data; name="file"; filename="{filename}"'.encode()) | |
| data.append(f'Content-Type: {mime_type}'.encode()) | |
| data.append(b'') | |
| data.append(file_data) | |
| data.append(f'--{boundary}--'.encode()) | |
| return b'\r\n'.join(data) | |
| def list_files(server_url): | |
| """List all files available on the server.""" | |
| url = f"{server_url}/list/" | |
| log.info('visiting %s', url) | |
| try: | |
| with urllib.request.urlopen(url) as response: | |
| data = json.loads(response.read().decode()) | |
| if 'files' in data and data['files']: | |
| print("Available files:") | |
| for i, file in enumerate(data['files'], 1): | |
| print(f"{i}. {file}") | |
| else: | |
| print("No files available on the server.") | |
| except urllib.error.URLError as e: | |
| log.error("Could not connect to the server. %s", e) | |
| except json.JSONDecodeError: | |
| log.error("Invalid response from server.") | |
| def upload_file(server_url, file_path): | |
| """Upload a file to the server.""" | |
| if not os.path.exists(file_path): | |
| print(f"Error: File '{file_path}' does not exist.") | |
| return | |
| url = f"{server_url}/upload/" | |
| parsed_url = urllib.parse.urlparse(url) | |
| boundary = generate_boundary() | |
| form_data = create_form_data(file_path, boundary) | |
| try: | |
| if parsed_url.scheme == 'https': | |
| conn = http.client.HTTPSConnection(parsed_url.netloc) | |
| else: | |
| conn = http.client.HTTPConnection(parsed_url.netloc) | |
| headers = { | |
| 'Content-Type': f'multipart/form-data; boundary={boundary}', | |
| 'Content-Length': str(len(form_data)) | |
| } | |
| path = parsed_url.path | |
| if parsed_url.query: | |
| path += '?' + parsed_url.query | |
| conn.request('POST', path, body=form_data, headers=headers) | |
| response = conn.getresponse() | |
| if response.status == 200: | |
| result = json.loads(response.read().decode()) | |
| print(f"Success: {result.get('status', 'File uploaded')}") | |
| else: | |
| print(f"Error: {response.status} {response.reason}") | |
| data = response.read().decode() | |
| try: | |
| error_data = json.loads(data) | |
| if 'detail' in error_data: | |
| print(error_data['detail']) | |
| except json.JSONDecodeError: | |
| print(data) | |
| conn.close() | |
| except Exception as e: | |
| print(f"Error: {str(e)}") | |
| def download_file(server_url, filename, output_path=None): | |
| """Download a file from the server.""" | |
| url = f"{server_url}/download/{urllib.parse.quote(filename)}" | |
| if not output_path: | |
| output_path = filename | |
| try: | |
| with urllib.request.urlopen(url) as response: | |
| with open(output_path, 'wb') as out_file: | |
| out_file.write(response.read()) | |
| print(f"File '{filename}' downloaded successfully to '{output_path}'") | |
| except urllib.error.HTTPError as e: | |
| if e.code == 404: | |
| print(f"Error: File '{filename}' not found on the server.") | |
| else: | |
| print(f"Error: HTTP {e.code} - {e.reason}") | |
| except urllib.error.URLError as e: | |
| print(f"Error: Could not connect to the server. {e}") | |
| except Exception as e: | |
| print(f"Error downloading file: {str(e)}") | |
| def main(): | |
| parser = argparse.ArgumentParser(description="HTTP File Server CLI Client") | |
| parser.add_argument('--server', | |
| default=os.getenv('SERVER', DEFAULT_SERVER), | |
| help=f"Server URL (default: {DEFAULT_SERVER})") | |
| subparsers = parser.add_subparsers(dest='command', required=True, help='Command to execute') | |
| # List command | |
| list_parser = subparsers.add_parser('list', help='List files on the server') | |
| # Upload command | |
| upload_parser = subparsers.add_parser('upload', help='Upload a file to the server') | |
| upload_parser.add_argument('file', help='Path to the file to upload') | |
| # Download command | |
| download_parser = subparsers.add_parser('download', help='Download a file from the server') | |
| download_parser.add_argument('filename', help='Name of the file to download') | |
| download_parser.add_argument('-o', '--output', help='Output path (default: same as filename)') | |
| args = parser.parse_args() | |
| if args.command == 'list': | |
| list_files(args.server) | |
| elif args.command == 'upload': | |
| upload_file(args.server, args.file) | |
| elif args.command == 'download': | |
| download_file(args.server, args.filename, args.output) | |
| if __name__ == "__main__": | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import shutil | |
| from typing import List, Optional | |
| import pathlib | |
| import logging | |
| import secrets | |
| from fastapi import FastAPI, UploadFile, File, HTTPException, Request, Depends, status | |
| from fastapi.responses import FileResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.security import HTTPBasic, HTTPBasicCredentials | |
| import dotenv | |
| dotenv.load_dotenv() | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| log = logging.getLogger(__name__) | |
| # Get root path from environment variable or use empty string for direct access | |
| ROOT_PATH = os.environ.get("ROOT_PATH", "") | |
| log.info(f"Using root path: {ROOT_PATH}") | |
| # Authentication credentials from environment variables or use defaults | |
| API_USERNAME = os.environ.get("API_USERNAME", "admin") | |
| API_PASSWORD = os.environ.get("API_PASSWORD", "password") | |
| log.info(f"Auth enabled with username: {API_USERNAME}") | |
| # When behind Nginx with location /my-test, set ROOT_PATH="/my-test" | |
| app = FastAPI(title="HTTP File Server", root_path=ROOT_PATH) | |
| # Security scheme | |
| security = HTTPBasic() | |
| # Authentication function | |
| def authenticate(credentials: HTTPBasicCredentials = Depends(security)): | |
| is_username_correct = secrets.compare_digest(credentials.username, API_USERNAME) | |
| is_password_correct = secrets.compare_digest(credentials.password, API_PASSWORD) | |
| if not (is_username_correct and is_password_correct): | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid authentication credentials", | |
| headers={"WWW-Authenticate": "Basic"}, | |
| ) | |
| return credentials.username | |
| # Add CORS middleware to allow requests from other domains | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Get upload directory from environment variable or use default | |
| UPLOAD_DIR = pathlib.Path(os.environ.get("UPLOAD_DIR", "./uploads")) | |
| os.makedirs(UPLOAD_DIR, exist_ok=True) | |
| log.info(f"Files will be uploaded to: {os.path.abspath(UPLOAD_DIR)}") | |
| def get_base_url(request: Request) -> str: | |
| """Get the base URL including the root path if present""" | |
| host = request.headers.get("host", "localhost") | |
| scheme = request.headers.get("x-forwarded-proto", "http") | |
| prefix = ROOT_PATH.rstrip("/") | |
| return f"{scheme}://{host}{prefix}" | |
| # Add endpoint that handles both /upload/ and //upload paths | |
| @app.post("/upload/") | |
| @app.post("//upload") | |
| async def upload_file(file: UploadFile = File(...), request: Optional[Request] = None, username: str = Depends(authenticate)): | |
| """ | |
| Upload a single file to the server | |
| """ | |
| if file.filename is None: | |
| log.info('filename is None') | |
| raise HTTPException(status_code=400, detail="No file provided") | |
| try: | |
| file_path = UPLOAD_DIR / file.filename | |
| log.info(f"Uploading file to {file_path} by user {username}") | |
| if file_path.exists(): | |
| raise HTTPException(status_code=400, detail="File already exists") | |
| with open(file_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| return { | |
| "filename": file.filename, | |
| "status": "File uploaded successfully", | |
| "download_url": f"{get_base_url(request)}/download/{file.filename}" | |
| } | |
| except Exception as e: | |
| log.error(f"Upload failed: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}") | |
| # Also add endpoint for //upload-multiple | |
| @app.post("/upload-multiple/") | |
| @app.post("//upload-multiple") | |
| async def upload_multiple_files(files: List[UploadFile] = File(...), request: Request = None, username: str = Depends(authenticate)): | |
| """ | |
| Upload multiple files to the server | |
| """ | |
| results = [] | |
| base_url = get_base_url(request) | |
| for file in files: | |
| try: | |
| if file.filename is None: | |
| log.info('filename is None') | |
| continue | |
| file_path = UPLOAD_DIR / file.filename | |
| with open(file_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| results.append({ | |
| "filename": file.filename, | |
| "status": "success", | |
| "download_url": f"{base_url}/download/{file.filename}" | |
| }) | |
| except Exception as e: | |
| results.append({"filename": file.filename, "status": "failed", "error": str(e)}) | |
| return {"uploaded_files": results} | |
| # Also add endpoint for //download/{filename} | |
| @app.get("/download/{filename}") | |
| @app.get("//download/{filename}") | |
| async def download_file(filename: str, username: str = Depends(authenticate)): | |
| """ | |
| Download a file from the server | |
| """ | |
| file_path = os.path.join(UPLOAD_DIR, filename) | |
| if not os.path.exists(file_path): | |
| raise HTTPException(status_code=404, detail="File not found") | |
| return FileResponse(file_path, filename=filename) | |
| # Also add endpoint for //list | |
| @app.get("/list/") | |
| @app.get("//list") | |
| async def list_files(request: Request = None, username: str = Depends(authenticate)): | |
| """ | |
| List all files available for download | |
| """ | |
| try: | |
| files = os.listdir(UPLOAD_DIR) | |
| # Include download URLs in the response | |
| base_url = get_base_url(request) | |
| file_details = [ | |
| { | |
| "filename": filename, | |
| "download_url": f"{base_url}/download/{filename}" | |
| } | |
| for filename in files | |
| ] | |
| return {"files": file_details} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to list files: {str(e)}") | |
| # Add root route for easy testing | |
| @app.get("/") | |
| @app.get("//") | |
| async def root(): | |
| return { | |
| "message": "HTTP File Server API is running", | |
| "endpoints": { | |
| "upload": "/upload/", | |
| "upload_multiple": "/upload-multiple/", | |
| "download": "/download/{filename}", | |
| "list": "/list/" | |
| } | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| port = int(os.environ.get("PORT", 2025)) | |
| log.info(f"Starting server on port {port}") | |
| uvicorn.run(app, host="0.0.0.0", port=port) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| fastapi==0.115.12 | |
| uvicorn==0.34.2 | |
| python-multipart==0.0.20 | |
| python-dotenv==1.0.0 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env bash | |
| # Configuration | |
| SERVER="${SERVER:-http://localhost:2025}" | |
| USERNAME="${API_USERNAME:-admin}" | |
| PASSWORD="${API_PASSWORD:-password}" | |
| # Check command line arguments | |
| if [ $# -lt 1 ]; then | |
| echo "Error: Watch directory not specified." | |
| echo "Usage: $0 <watch_directory>" | |
| echo "Example: $0 /path/to/watch" | |
| exit 1 | |
| fi | |
| WATCH_DIR="$1" | |
| # Verify that the directory exists | |
| if [ ! -d "$WATCH_DIR" ]; then | |
| echo "Error: Directory '$WATCH_DIR' does not exist or is not a directory." | |
| exit 1 | |
| fi | |
| # Check if watchexec is installed | |
| if ! command -v watchexec &> /dev/null; then | |
| echo "Error: watchexec is not installed. Please install it first." | |
| echo "You can install it via cargo: cargo install watchexec-cli" | |
| echo "Or check https://github.com/watchexec/watchexec for other installation methods." | |
| exit 1 | |
| fi | |
| # Check if curl is installed | |
| if ! command -v curl &> /dev/null; then | |
| echo "Error: curl is not installed. Please install it first." | |
| exit 1 | |
| fi | |
| echo "Starting file watch in directory: $WATCH_DIR" | |
| echo "Server endpoint: $SERVER" | |
| echo "Using auth username: $USERNAME" | |
| echo "Use Ctrl+C to stop watching" | |
| # Run watchexec to monitor file changes | |
| # -w: Watch directory | |
| # -e: File extensions to monitor (adjust as needed) | |
| # --ignore: Ignore patterns (e.g., temporary files) | |
| watchexec -w "$WATCH_DIR" -e "*" --ignore "*.tmp,*.swp,*.~,*.log" \ | |
| "FILE={path}; echo 'File changed: $FILE'; if [ -f \"$FILE\" ]; then | |
| echo 'Uploading $FILE to $SERVER/upload/'; | |
| curl -v --user \"$USERNAME:$PASSWORD\" -F 'file=@$FILE' '$SERVER/upload/'; | |
| echo ''; | |
| fi" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment