Created
March 26, 2025 21:43
-
-
Save fakedrake/e199cd77a0f812c9e234b2a09555ebf4 to your computer and use it in GitHub Desktop.
Dummy implementation of ollama
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import json | |
import time | |
import datetime | |
import random | |
import string | |
import hashlib | |
from flask import Flask, request, jsonify, Response, stream_with_context | |
app = Flask(__name__) | |
# --- Helper Functions --- | |
def generate_dummy_digest(): | |
"""Generates a realistic-looking dummy SHA256 digest.""" | |
return hashlib.sha256(str(random.random()).encode()).hexdigest() | |
def generate_dummy_embedding(size=384): | |
"""Generates a dummy embedding vector.""" | |
return [random.uniform(-1, 1) for _ in range(size)] | |
def get_iso_timestamp(): | |
"""Returns the current UTC time in ISO 8601 format with Z.""" | |
return datetime.datetime.now(datetime.timezone.utc).isoformat().replace("+00:00", "Z") | |
def simulate_processing_time(min_ms=50, max_ms=500): | |
"""Sleeps for a random duration to simulate work.""" | |
time.sleep(random.uniform(min_ms / 1000.0, max_ms / 1000.0)) | |
# --- In-memory 'storage' for mock state --- | |
MOCK_MODELS = { | |
"llama3:latest": { | |
"name": "llama3:latest", | |
"modified_at": get_iso_timestamp(), | |
"size": 3825819519, | |
"digest": generate_dummy_digest(), | |
"details": { | |
"format": "gguf", | |
"family": "llama", | |
"families": ["llama"], | |
"parameter_size": "7B", | |
"quantization_level": "Q4_0" | |
} | |
}, | |
"codellama:13b": { | |
"name": "codellama:13b", | |
"modified_at": get_iso_timestamp(), | |
"size": 7365960935, | |
"digest": generate_dummy_digest(), | |
"details": { | |
"format": "gguf", | |
"family": "llama", | |
"families": ["llama"], | |
"parameter_size": "13B", | |
"quantization_level": "Q4_0" | |
} | |
}, | |
"llava:latest": { | |
"name": "llava:latest", | |
"modified_at": get_iso_timestamp(), | |
"size": 4100000000, | |
"digest": generate_dummy_digest(), | |
"details": { | |
"format": "gguf", | |
"family": "llava", | |
"families": ["llama", "clip"], | |
"parameter_size": "7B", | |
"quantization_level": "Q4_K_M" | |
} | |
}, | |
"all-minilm:latest": { | |
"name": "all-minilm:latest", | |
"modified_at": get_iso_timestamp(), | |
"size": 134000000, | |
"digest": generate_dummy_digest(), | |
"details": { | |
"format": "gguf", | |
"family": "bert", | |
"families": ["bert"], | |
"parameter_size": "33M", | |
"quantization_level": "Q4_K_M" # Embedding models often smaller | |
} | |
}, | |
} | |
MOCK_RUNNING_MODELS = {} # track which models are 'loaded' with expiry | |
MOCK_BLOBS = set() # store digests of 'uploaded' blobs | |
# --- API Endpoints --- | |
@app.route('/api/version', methods=['GET']) | |
def get_version(): | |
return jsonify({"version": "0.5.1-mock"}) | |
@app.route('/api/tags', methods=['GET']) | |
def list_local_models(): | |
return jsonify({"models": list(MOCK_MODELS.values())}) | |
@app.route('/api/ps', methods=['GET']) | |
def list_running_models(): | |
now = datetime.datetime.now(datetime.timezone.utc) | |
running = [] | |
# Clean up expired models | |
expired_keys = [k for k, v in MOCK_RUNNING_MODELS.items() if v['expires_at'] < now] | |
for k in expired_keys: | |
del MOCK_RUNNING_MODELS[k] | |
for name, data in MOCK_RUNNING_MODELS.items(): | |
if name in MOCK_MODELS: | |
model_details = MOCK_MODELS[name] | |
running.append({ | |
"name": name, | |
"model": name, # Ollama API uses both name and model key | |
"size": model_details['size'], | |
"digest": model_details['digest'], | |
"details": model_details['details'], | |
"expires_at": data['expires_at'].isoformat().replace("+00:00", "Z"), | |
"size_vram": model_details['size'] # Mock VRAM usage as model size | |
}) | |
return jsonify({"models": running}) | |
def _load_model_if_needed(model_name, keep_alive_str="5m"): | |
"""Simulates loading a model into memory.""" | |
if model_name not in MOCK_MODELS: | |
return False # Model doesn't exist | |
# Calculate expiry time based on keep_alive | |
if isinstance(keep_alive_str, (int, float)) and keep_alive_str <= 0: | |
# Unload immediately if exists | |
if model_name in MOCK_RUNNING_MODELS: | |
del MOCK_RUNNING_MODELS[model_name] | |
return True # Signal unload request | |
elif isinstance(keep_alive_str, str) and keep_alive_str.endswith('m'): | |
try: | |
minutes = int(keep_alive_str[:-1]) | |
delta = datetime.timedelta(minutes=minutes) | |
except ValueError: | |
delta = datetime.timedelta(minutes=5) # Default | |
else: # Default keep_alive | |
delta = datetime.timedelta(minutes=5) | |
now = datetime.datetime.now(datetime.timezone.utc) | |
MOCK_RUNNING_MODELS[model_name] = {"expires_at": now + delta} | |
return True # Signal loaded or updated | |
def _unload_model(model_name): | |
if model_name in MOCK_RUNNING_MODELS: | |
del MOCK_RUNNING_MODELS[model_name] | |
return True | |
@app.route('/api/generate', methods=['POST']) | |
def generate_completion(): | |
try: | |
data = request.get_json() | |
if not data: | |
return jsonify({"error": "Invalid JSON"}), 400 | |
except Exception as e: | |
return jsonify({"error": f"Failed to parse JSON: {e}"}), 400 | |
model_name = data.get('model') | |
prompt = data.get('prompt', "") | |
stream = data.get('stream', True) | |
keep_alive = data.get('keep_alive', '5m') # Default 5 minutes | |
images = data.get('images') | |
format_req = data.get('format') | |
raw_mode = data.get('raw', False) | |
if not model_name: | |
return jsonify({"error": "'model' field is required"}), 400 | |
if model_name not in MOCK_MODELS: | |
return jsonify({"error": f"Model '{model_name}' not found"}), 404 | |
# Handle loading/unloading based on empty prompt and keep_alive | |
if not prompt: | |
if isinstance(keep_alive, (int, float)) and keep_alive <= 0: | |
_unload_model(model_name) | |
return jsonify({ | |
"model": model_name, | |
"created_at": get_iso_timestamp(), | |
"response": "", | |
"done": True, | |
"done_reason": "unload" | |
}) | |
else: | |
_load_model_if_needed(model_name, keep_alive) | |
return jsonify({ | |
"model": model_name, | |
"created_at": get_iso_timestamp(), | |
"response": "", | |
"done": True, | |
"done_reason": "load" # Added for clarity | |
}) | |
# Actual generation simulation | |
_load_model_if_needed(model_name, keep_alive) # Ensure model is 'loaded' | |
start_time_ns = time.time_ns() | |
load_duration_ns = random.randint(5000000, 200000000) # Simulate some load time if just loaded | |
prompt_eval_count = len(prompt.split()) # Rough token count | |
prompt_eval_duration_ns = prompt_eval_count * random.randint(5000000, 15000000) | |
dummy_response_words = ["This", "is", "a", "dummy", "response", "generated", "by", "the", "mock", "Ollama", "server."] | |
if images: | |
dummy_response_words.insert(4, f"(processing {len(images)} image(s))") | |
if format_req == "json": | |
dummy_response_words = ['{\n "answer": "This is a dummy JSON response." \n}'] | |
elif isinstance(format_req, dict): | |
dummy_response_words = [f'{{\n "comment": "Structured output requested, providing dummy JSON.",\n "field1": {random.randint(1,100)},\n "field2": "{random.choice(["A", "B", "C"])}"\n}}'] | |
def generate_stream(): | |
eval_count = 0 | |
eval_duration_ns = 0 | |
context = [random.randint(1, 10000) for _ in range(10)] # Dummy context | |
for i, word in enumerate(dummy_response_words): | |
chunk_start_ns = time.time_ns() | |
resp_chunk = word + (" " if i < len(dummy_response_words) - 1 else "") | |
yield json.dumps({ | |
"model": model_name, | |
"created_at": get_iso_timestamp(), | |
"response": resp_chunk, | |
"done": False | |
}) + '\n' | |
simulate_processing_time(10, 50) | |
chunk_end_ns = time.time_ns() | |
eval_count += len(word.split()) # Approx tokens | |
eval_duration_ns += (chunk_end_ns - chunk_start_ns) | |
final_response = { | |
"model": model_name, | |
"created_at": get_iso_timestamp(), | |
"response": "", # Empty in final streaming response | |
"done": True, | |
"total_duration": time.time_ns() - start_time_ns, | |
"load_duration": load_duration_ns, | |
"prompt_eval_count": prompt_eval_count, | |
"prompt_eval_duration": prompt_eval_duration_ns, | |
"eval_count": eval_count, | |
"eval_duration": eval_duration_ns, | |
} | |
# Context is not returned in raw mode | |
if not raw_mode: | |
final_response["context"] = context | |
yield json.dumps(final_response) + '\n' | |
if stream: | |
return Response(stream_with_context(generate_stream()), mimetype='application/x-ndjson') | |
else: | |
# Simulate the whole process for non-streaming | |
eval_count_total = 0 | |
eval_duration_total_ns = 0 | |
full_response_text = "" | |
for word in dummy_response_words: | |
simulate_processing_time(10, 50) | |
eval_count_total += len(word.split()) | |
full_response_text += word + " " | |
eval_duration_total_ns = eval_count_total * random.randint(4000000, 12000000) | |
final_response = { | |
"model": model_name, | |
"created_at": get_iso_timestamp(), | |
"response": full_response_text.strip(), | |
"done": True, | |
"total_duration": time.time_ns() - start_time_ns, | |
"load_duration": load_duration_ns, | |
"prompt_eval_count": prompt_eval_count, | |
"prompt_eval_duration": prompt_eval_duration_ns, | |
"eval_count": eval_count_total, | |
"eval_duration": eval_duration_total_ns, | |
} | |
if not raw_mode: | |
final_response["context"] = [random.randint(1, 10000) for _ in range(10)] | |
return jsonify(final_response) | |
@app.route('/api/chat', methods=['POST']) | |
def generate_chat_completion(): | |
try: | |
data = request.get_json() | |
if not data: | |
return jsonify({"error": "Invalid JSON"}), 400 | |
except Exception as e: | |
return jsonify({"error": f"Failed to parse JSON: {e}"}), 400 | |
model_name = data.get('model') | |
messages = data.get('messages', []) | |
stream = data.get('stream', True) | |
keep_alive = data.get('keep_alive', '5m') | |
format_req = data.get('format') | |
tools = data.get('tools') | |
if not model_name: | |
return jsonify({"error": "'model' field is required"}), 400 | |
if model_name not in MOCK_MODELS: | |
return jsonify({"error": f"Model '{model_name}' not found"}), 404 | |
# Handle loading/unloading based on empty messages and keep_alive | |
if not messages: | |
if isinstance(keep_alive, (int, float)) and keep_alive <= 0: | |
_unload_model(model_name) | |
return jsonify({ | |
"model": model_name, | |
"created_at": get_iso_timestamp(), | |
"message": {"role": "assistant", "content": ""}, | |
"done_reason": "unload", | |
"done": True | |
}) | |
else: | |
_load_model_if_needed(model_name, keep_alive) | |
return jsonify({ | |
"model": model_name, | |
"created_at": get_iso_timestamp(), | |
"message": {"role": "assistant", "content": ""}, | |
"done_reason": "load", | |
"done": True | |
}) | |
# Actual chat simulation | |
_load_model_if_needed(model_name, keep_alive) # Ensure model is 'loaded' | |
start_time_ns = time.time_ns() | |
load_duration_ns = random.randint(5000000, 100000000) | |
prompt_eval_count = sum(len(msg.get('content', '').split()) for msg in messages) # Rough token count | |
prompt_eval_duration_ns = prompt_eval_count * random.randint(5000000, 15000000) | |
dummy_response_words = ["This", "is", "a", "dummy", "chat", "response."] | |
has_images = any(msg.get('images') for msg in messages) | |
if has_images: | |
dummy_response_words.append("(Acknowledging images in input)") | |
# Simulate tool use if tools are provided | |
tool_calls = None | |
if tools and random.random() > 0.5: # Randomly decide to use a tool | |
dummy_response_words = [] # Tool call replaces content | |
chosen_tool = random.choice(tools) | |
func_name = chosen_tool.get('function', {}).get('name', 'unknown_function') | |
params = chosen_tool.get('function', {}).get('parameters', {}).get('properties', {}) | |
args = {} | |
for param, details in params.items(): | |
if details.get('type') == 'string': | |
args[param] = f"dummy_{param}_{random.randint(1,100)}" | |
elif details.get('type') == 'integer': | |
args[param] = random.randint(1, 100) | |
elif details.get('type') == 'boolean': | |
args[param] = random.choice([True, False]) | |
elif 'enum' in details: | |
args[param] = random.choice(details['enum']) | |
else: | |
args[param] = None # Default for unhandled types | |
tool_calls = [{ "function": { "name": func_name, "arguments": args } }] | |
if format_req == "json": | |
dummy_response_words = ['{\n "chat_answer": "This is a dummy JSON chat response." \n}'] | |
elif isinstance(format_req, dict): | |
dummy_response_words = [f'{{\n "comment": "Structured output requested for chat.",\n "chat_field": "{random.choice(["Yes", "No", "Maybe"])}"\n}}'] | |
def chat_stream(): | |
eval_count = 0 | |
eval_duration_ns = 0 | |
for i, word in enumerate(dummy_response_words): | |
chunk_start_ns = time.time_ns() | |
resp_chunk = word + (" " if i < len(dummy_response_words) - 1 else "") | |
yield json.dumps({ | |
"model": model_name, | |
"created_at": get_iso_timestamp(), | |
"message": { | |
"role": "assistant", | |
"content": resp_chunk, | |
# Images are typically only in user messages | |
}, | |
"done": False | |
}) + '\n' | |
simulate_processing_time(10, 50) | |
chunk_end_ns = time.time_ns() | |
eval_count += len(word.split()) # Approx tokens | |
eval_duration_ns += (chunk_end_ns - chunk_start_ns) | |
# Simulate tool call generation if applicable (appears after content chunks) | |
if tool_calls: | |
yield json.dumps({ | |
"model": model_name, | |
"created_at": get_iso_timestamp(), | |
"message": { | |
"role": "assistant", | |
"content": "", # Content might be empty if only tool calls generated | |
"tool_calls": tool_calls | |
}, | |
"done": False # Usually tool call is intermediate | |
}) + '\n' | |
eval_count += 10 # Add some dummy count for tool call generation | |
final_response = { | |
"model": model_name, | |
"created_at": get_iso_timestamp(), | |
"message": { | |
"role": "assistant", | |
"content": "" # Empty in final streaming response | |
}, | |
"done": True, | |
"total_duration": time.time_ns() - start_time_ns, | |
"load_duration": load_duration_ns, | |
"prompt_eval_count": prompt_eval_count, | |
"prompt_eval_duration": prompt_eval_duration_ns, | |
"eval_count": eval_count, | |
"eval_duration": eval_duration_ns | |
} | |
yield json.dumps(final_response) + '\n' | |
if stream: | |
return Response(stream_with_context(chat_stream()), mimetype='application/x-ndjson') | |
else: | |
# Simulate the whole process for non-streaming | |
eval_count_total = 0 | |
eval_duration_total_ns = 0 | |
full_response_text = "" | |
for word in dummy_response_words: | |
simulate_processing_time(10, 50) | |
eval_count_total += len(word.split()) | |
full_response_text += word + " " | |
eval_duration_total_ns = eval_count_total * random.randint(4000000, 12000000) | |
final_message = {"role": "assistant"} | |
if tool_calls: | |
final_message["tool_calls"] = tool_calls | |
final_message["content"] = "" # Or potentially some text leading to the call | |
eval_count_total += 10 # Add dummy count for tool call | |
else: | |
final_message["content"] = full_response_text.strip() | |
final_response = { | |
"model": model_name, | |
"created_at": get_iso_timestamp(), | |
"message": final_message, | |
"done": True, | |
"total_duration": time.time_ns() - start_time_ns, | |
"load_duration": load_duration_ns, | |
"prompt_eval_count": prompt_eval_count, | |
"prompt_eval_duration": prompt_eval_duration_ns, | |
"eval_count": eval_count_total, | |
"eval_duration": eval_duration_total_ns, | |
} | |
return jsonify(final_response) | |
@app.route('/api/create', methods=['POST']) | |
def create_model(): | |
try: | |
data = request.get_json() | |
if not data: | |
return jsonify({"error": "Invalid JSON"}), 400 | |
except Exception as e: | |
return jsonify({"error": f"Failed to parse JSON: {e}"}), 400 | |
model_name = data.get('model') | |
from_model = data.get('from') | |
files = data.get('files') | |
quantize = data.get('quantize') | |
stream_resp = data.get('stream', True) # Note: Renamed variable to avoid conflict | |
if not model_name: | |
return jsonify({"error": "'model' field is required"}), 400 | |
def create_stream(): | |
yield json.dumps({"status": "reading model metadata"}) + '\n' | |
simulate_processing_time(100, 300) | |
if from_model: | |
if from_model not in MOCK_MODELS: | |
yield json.dumps({"status": f"error: base model '{from_model}' not found"}) + '\n' | |
return # Stop simulation | |
yield json.dumps({"status": f"creating model '{model_name}' from '{from_model}'"}) + '\n' | |
simulate_processing_time(50, 150) | |
# Simulate using existing layers | |
for _ in range(random.randint(2, 5)): | |
yield json.dumps({"status": f"using already created layer sha256:{generate_dummy_digest()[:12]}..."}) + '\n' | |
simulate_processing_time(20, 80) | |
# Simulate writing new layers (e.g., system prompt) | |
yield json.dumps({"status": f"writing layer sha256:{generate_dummy_digest()[:12]}..."}) + '\n' | |
simulate_processing_time(100, 400) | |
elif files: | |
is_gguf = any(fname.endswith(".gguf") for fname in files.keys()) | |
is_safetensors = any(fname.endswith(".safetensors") for fname in files.keys()) | |
if is_gguf: | |
yield json.dumps({"status": "parsing GGUF"}) + '\n' | |
simulate_processing_time(200, 600) | |
for fname, digest in files.items(): | |
if digest not in MOCK_BLOBS: | |
yield json.dumps({"status": f"error: blob {digest} not found"}) + '\n' | |
return | |
yield json.dumps({"status": f"using layer {digest}"}) + '\n' | |
simulate_processing_time(50, 100) | |
elif is_safetensors: | |
yield json.dumps({"status": "converting model"}) + '\n' | |
simulate_processing_time(500, 1500) | |
missing_blobs = [d for d in files.values() if d not in MOCK_BLOBS] | |
if missing_blobs: | |
yield json.dumps({"status": f"error: blobs not found: {', '.join(missing_blobs)}"}) + '\n' | |
return | |
yield json.dumps({"status": "creating new layer sha256:..."}) + '\n' | |
simulate_processing_time(200, 500) | |
yield json.dumps({"status": "using autodetected template dummy-template"}) + '\n' | |
else: | |
yield json.dumps({"status": "error: unknown file types provided"}) + '\n' | |
return | |
else: | |
yield json.dumps({"status": "error: must provide 'from' or 'files'"}) + '\n' | |
return # Stop simulation | |
if quantize: | |
yield json.dumps({"status": f"quantizing model to {quantize}"}) + '\n' | |
simulate_processing_time(1000, 5000) # Quantization takes time | |
yield json.dumps({"status": "creating new quantized layer sha256:..."}) + '\n' | |
simulate_processing_time(200, 600) | |
yield json.dumps({"status": "writing manifest"}) + '\n' | |
simulate_processing_time(50, 150) | |
yield json.dumps({"status": "success"}) + '\n' | |
# Add the new model to our mock list | |
new_digest = generate_dummy_digest() | |
MOCK_MODELS[model_name] = { | |
"name": model_name, | |
"modified_at": get_iso_timestamp(), | |
"size": random.randint(1000000000, 8000000000), | |
"digest": new_digest, | |
"details": { | |
"format": "gguf", # Assume GGUF output for simplicity | |
"family": "unknown", | |
"families": [], | |
"parameter_size": "N/A", | |
"quantization_level": quantize if quantize else "N/A" | |
} | |
} | |
# Add its digest to blobs as it 'exists' now | |
MOCK_BLOBS.add(f"sha256:{new_digest}") | |
if stream_resp: | |
return Response(stream_with_context(create_stream()), mimetype='application/x-ndjson') | |
else: | |
# Simulate the process non-streamingly (just check for errors) | |
if from_model and from_model not in MOCK_MODELS: | |
return jsonify({"status": f"error: base model '{from_model}' not found"}), 404 | |
if files: | |
missing_blobs = [d for d in files.values() if d not in MOCK_BLOBS] | |
if missing_blobs: | |
return jsonify({"status": f"error: blobs not found: {', '.join(missing_blobs)}"}), 400 | |
if not from_model and not files: | |
return jsonify({"status": "error: must provide 'from' or 'files'"}), 400 | |
# If checks pass, simulate success and add model | |
new_digest = generate_dummy_digest() | |
MOCK_MODELS[model_name] = { | |
"name": model_name, | |
"modified_at": get_iso_timestamp(), | |
"size": random.randint(1000000000, 8000000000), | |
"digest": new_digest, | |
"details": { | |
"format": "gguf", | |
"family": "unknown", | |
"families": [], | |
"parameter_size": "N/A", | |
"quantization_level": quantize if quantize else "N/A" | |
} | |
} | |
MOCK_BLOBS.add(f"sha256:{new_digest}") | |
return jsonify({"status": "success"}) | |
@app.route('/api/blobs/<digest>', methods=['HEAD']) | |
def check_blob_exists(digest): | |
# Digest format is like sha256:abcdef... | |
if digest in MOCK_BLOBS: | |
return Response(status=200) | |
else: | |
# Randomly pretend some exist even if not explicitly created? | |
# if random.random() > 0.8: | |
# return Response(status=200) | |
return Response(status=404) | |
@app.route('/api/blobs/<digest>', methods=['POST']) | |
def push_blob(digest): | |
# Simulate receiving the file data (request.data) | |
# In a real scenario, you'd stream this to a file and verify the hash. | |
# Here, we just check if the provided digest looks valid and add it. | |
if not digest.startswith("sha256:") or len(digest) != 71: # sha256: + 64 hex chars | |
return Response("Invalid digest format", status=400) | |
# Simulate potential mismatch (optional) | |
# if random.random() < 0.1: # 10% chance of mismatch | |
# return Response("Digest mismatch", status=400) | |
MOCK_BLOBS.add(digest) | |
print(f"Mock: Blob {digest} 'uploaded'.") | |
return Response(status=201) # Created | |
@app.route('/api/show', methods=['POST']) | |
def show_model_info(): | |
try: | |
data = request.get_json() | |
if not data: | |
return jsonify({"error": "Invalid JSON"}), 400 | |
model_name = data.get('model') | |
verbose = data.get('verbose', False) | |
except Exception as e: | |
return jsonify({"error": f"Failed to parse JSON: {e}"}), 400 | |
if not model_name: | |
return jsonify({"error": "'model' field is required"}), 400 | |
if model_name not in MOCK_MODELS: | |
return jsonify({"error": f"Model '{model_name}' not found"}), 404 | |
model_data = MOCK_MODELS[model_name] | |
# Generate dummy verbose data if requested | |
tokens_data = ["[UNUSED]", "[UNK]", "[CLS]", "hello", "world"] if verbose else [] | |
merges_data = ["h e", "e l", "l o"] if verbose else [] | |
token_types_data = [1, 1, 1, 1, 1] if verbose else [] | |
response_data = { | |
"modelfile": f"# Mock Modelfile for {model_name}\nFROM sha256:{model_data['digest']}\nTEMPLATE \"\"\"{{{{ .Prompt }}}}\"\"\"\nPARAMETER stop \"\\n\"", | |
"parameters": "num_ctx 4096\nstop \"<|endoftext|>\"\n", | |
"template": "{{ .Prompt }}", | |
"details": model_data['details'], | |
"model_info": { | |
"general.architecture": model_data['details'].get('family', 'unknown'), | |
"general.file_type": 1, # Dummy value | |
"general.parameter_count": model_data['size'] * 2, # Wild guess | |
"general.quantization_version": 2, | |
f"{model_data['details'].get('family', 'unknown')}.context_length": 4096, | |
f"{model_data['details'].get('family', 'unknown')}.embedding_length": 4096, | |
f"{model_data['details'].get('family', 'unknown')}.block_count": 32, | |
# Add more dummy keys based on family if needed | |
"tokenizer.ggml.model": "gpt2", # common default | |
"tokenizer.ggml.tokens": tokens_data, | |
"tokenizer.ggml.merges": merges_data, | |
"tokenizer.ggml.token_type": token_types_data, | |
"tokenizer.ggml.bos_token_id": 1, | |
"tokenizer.ggml.eos_token_id": 2, | |
} | |
} | |
# Add optional fields based on verbose or specific model types if needed | |
if verbose: | |
response_data["license"] = "Mock License: Apache 2.0" | |
response_data["system"] = f"This is a mock system prompt for {model_name}." | |
return jsonify(response_data) | |
@app.route('/api/copy', methods=['POST']) | |
def copy_model(): | |
try: | |
data = request.get_json() | |
source = data.get('source') | |
destination = data.get('destination') | |
except Exception as e: | |
return jsonify({"error": f"Failed to parse JSON: {e}"}), 400 | |
if not source or not destination: | |
return jsonify({"error": "'source' and 'destination' fields are required"}), 400 | |
if source not in MOCK_MODELS: | |
return jsonify({"error": f"Source model '{source}' not found"}), 404 | |
if destination in MOCK_MODELS: | |
# Ollama seems to overwrite, so we mimic that behavior | |
print(f"Mock: Overwriting existing model '{destination}' during copy.") | |
pass | |
# Create a copy with a new modified time | |
MOCK_MODELS[destination] = MOCK_MODELS[source].copy() | |
MOCK_MODELS[destination]["name"] = destination | |
MOCK_MODELS[destination]["modified_at"] = get_iso_timestamp() | |
print(f"Mock: Copied model '{source}' to '{destination}'.") | |
return Response(status=200) | |
@app.route('/api/delete', methods=['DELETE']) | |
def delete_model(): | |
try: | |
data = request.get_json() | |
model_name = data.get('model') | |
except Exception as e: | |
return jsonify({"error": f"Failed to parse JSON: {e}"}), 400 | |
if not model_name: | |
return jsonify({"error": "'model' field is required"}), 400 | |
if model_name in MOCK_MODELS: | |
# Also remove associated blob digest maybe? Depends on real Ollama logic. | |
# digest_to_remove = f"sha256:{MOCK_MODELS[model_name]['digest']}" | |
# if digest_to_remove in MOCK_BLOBS: | |
# MOCK_BLOBS.remove(digest_to_remove) | |
del MOCK_MODELS[model_name] | |
# Also unload if running | |
_unload_model(model_name) | |
print(f"Mock: Deleted model '{model_name}'.") | |
return Response(status=200) | |
else: | |
return jsonify({"error": f"Model '{model_name}' not found"}), 404 | |
@app.route('/api/pull', methods=['POST']) | |
def pull_model(): | |
try: | |
data = request.get_json() | |
model_name = data.get('model') | |
stream_resp = data.get('stream', True) # Renamed variable | |
insecure = data.get('insecure', False) # Parameter exists but unused in mock | |
except Exception as e: | |
return jsonify({"error": f"Failed to parse JSON: {e}"}), 400 | |
if not model_name: | |
return jsonify({"error": "'model' field is required"}), 400 | |
# Simulate checking if model already exists locally | |
if model_name in MOCK_MODELS: | |
print(f"Mock: Model '{model_name}' already exists locally.") | |
if stream_resp: | |
def already_exists_stream(): | |
yield json.dumps({"status": "success"}) + '\n' | |
return Response(stream_with_context(already_exists_stream()), mimetype='application/x-ndjson') | |
else: | |
return jsonify({"status": "success"}) | |
def pull_stream(): | |
yield json.dumps({"status": "pulling manifest"}) + '\n' | |
simulate_processing_time(200, 500) | |
num_layers = random.randint(1, 5) | |
total_size = random.randint(100000000, 5000000000) | |
layer_size = total_size // num_layers | |
new_digest = generate_dummy_digest() # For the final model | |
for i in range(num_layers): | |
layer_digest = generate_dummy_digest() | |
MOCK_BLOBS.add(f"sha256:{layer_digest}") # Add blob as it's 'downloaded' | |
yield json.dumps({ | |
"status": f"downloading sha256:{layer_digest[:12]}...", | |
"digest": f"sha256:{layer_digest}", | |
"total": layer_size, | |
"completed": 0 | |
}) + '\n' | |
completed = 0 | |
while completed < layer_size: | |
simulate_processing_time(50, 200) | |
increment = min(layer_size // random.randint(5, 15), layer_size - completed) | |
completed += increment | |
yield json.dumps({ | |
"status": f"downloading sha256:{layer_digest[:12]}...", | |
"digest": f"sha256:{layer_digest}", | |
"total": layer_size, | |
"completed": completed | |
}) + '\n' | |
yield json.dumps({ | |
"status": f"verifying sha256 digest", | |
"digest": f"sha256:{layer_digest}", # Added digest for context | |
}) + '\n' | |
simulate_processing_time(50, 150) | |
yield json.dumps({"status": "writing manifest"}) + '\n' | |
simulate_processing_time(50, 100) | |
yield json.dumps({"status": "removing any unused layers"}) + '\n' | |
simulate_processing_time(20, 50) | |
yield json.dumps({"status": "success"}) + '\n' | |
# Add the pulled model to our mock list | |
MOCK_MODELS[model_name] = { | |
"name": model_name, | |
"modified_at": get_iso_timestamp(), | |
"size": total_size, | |
"digest": new_digest, | |
"details": { # Generic details for pulled model | |
"format": "gguf", | |
"family": "unknown", | |
"families": [], | |
"parameter_size": f"{(total_size / 1e9):.1f}B", | |
"quantization_level": "Q4_0" # Common default | |
} | |
} | |
MOCK_BLOBS.add(f"sha256:{new_digest}") | |
if stream_resp: | |
return Response(stream_with_context(pull_stream()), mimetype='application/x-ndjson') | |
else: | |
# Simulate non-streaming pull (just add the model) | |
total_size = random.randint(100000000, 5000000000) | |
new_digest = generate_dummy_digest() | |
MOCK_MODELS[model_name] = { | |
"name": model_name, | |
"modified_at": get_iso_timestamp(), | |
"size": total_size, | |
"digest": new_digest, | |
"details": { | |
"format": "gguf", | |
"family": "unknown", | |
"families": [], | |
"parameter_size": f"{(total_size / 1e9):.1f}B", | |
"quantization_level": "Q4_0" | |
} | |
} | |
# Simulate adding blobs for layers + manifest | |
for _ in range(random.randint(2, 6)): | |
MOCK_BLOBS.add(f"sha256:{generate_dummy_digest()}") | |
MOCK_BLOBS.add(f"sha256:{new_digest}") | |
print(f"Mock: Pulled model '{model_name}' (non-streaming).") | |
return jsonify({"status": "success"}) | |
@app.route('/api/push', methods=['POST']) | |
def push_model(): | |
try: | |
data = request.get_json() | |
model_name = data.get('model') # Expects <namespace>/<model>:<tag> | |
stream_resp = data.get('stream', True) # Renamed variable | |
insecure = data.get('insecure', False) # Parameter exists but unused in mock | |
except Exception as e: | |
return jsonify({"error": f"Failed to parse JSON: {e}"}), 400 | |
if not model_name or '/' not in model_name: | |
return jsonify({"error": "'model' field must be in the format <namespace>/<model>:<tag>"}), 400 | |
# Extract base model name to check if it exists locally | |
base_model_name = model_name.split('/')[-1] | |
if base_model_name not in MOCK_MODELS: | |
# Try without tag | |
base_model_name_no_tag = base_model_name.split(':')[0] + ":latest" | |
if base_model_name_no_tag not in MOCK_MODELS: | |
# Try just the name part assuming :latest | |
if base_model_name.split(':')[0] + ":latest" in MOCK_MODELS: | |
base_model_name = base_model_name.split(':')[0] + ":latest" | |
else: | |
return jsonify({"error": f"Model '{base_model_name}' not found locally"}), 404 | |
else: | |
base_model_name = base_model_name_no_tag | |
local_model_data = MOCK_MODELS[base_model_name] | |
def push_stream(): | |
yield json.dumps({"status": "retrieving manifest"}) + '\n' | |
simulate_processing_time(100, 300) | |
# Simulate checking layers/blobs on the 'remote' | |
# Assume some layers need uploading | |
num_layers_to_upload = random.randint(1, 3) | |
layer_digests = [f"sha256:{generate_dummy_digest()}" for _ in range(num_layers_to_upload)] | |
total_upload_size = local_model_data['size'] // random.randint(2, 5) # Simulate partial upload | |
layer_size = total_upload_size // num_layers_to_upload if num_layers_to_upload > 0 else 0 | |
for layer_digest in layer_digests: | |
yield json.dumps({ | |
"status": "starting upload", | |
"digest": layer_digest, | |
"total": layer_size | |
}) + '\n' | |
completed = 0 | |
while completed < layer_size: | |
simulate_processing_time(50, 200) | |
increment = min(layer_size // random.randint(5, 15), layer_size - completed) | |
completed += increment | |
yield json.dumps({ | |
"status": "uploading", # Different status? API doc uses 'starting upload' repeatedly | |
"digest": layer_digest, | |
"total": layer_size, | |
"completed": completed | |
}) + '\n' | |
yield json.dumps({"status": "pushing manifest"}) + '\n' | |
simulate_processing_time(100, 200) | |
yield json.dumps({"status": "success"}) + '\n' | |
if stream_resp: | |
return Response(stream_with_context(push_stream()), mimetype='application/x-ndjson') | |
else: | |
# Simulate non-streaming push | |
print(f"Mock: Pushed model '{model_name}' (non-streaming).") | |
return jsonify({"status": "success"}) | |
@app.route('/api/embed', methods=['POST']) | |
def generate_embeddings_new(): | |
try: | |
data = request.get_json() | |
model_name = data.get('model') | |
input_data = data.get('input') # Can be string or list of strings | |
# options = data.get('options') # Unused in mock | |
# keep_alive = data.get('keep_alive', '5m') # Unused in mock | |
except Exception as e: | |
return jsonify({"error": f"Failed to parse JSON: {e}"}), 400 | |
if not model_name: | |
return jsonify({"error": "'model' field is required"}), 400 | |
if not input_data: | |
return jsonify({"error": "'input' field is required"}), 400 | |
# Basic check for embedding model type (optional) | |
# if "minilm" not in model_name and "bert" not in model_name: | |
# print(f"Warning: Model '{model_name}' might not be an embedding model.") | |
if model_name not in MOCK_MODELS: | |
# Let's auto-create a dummy embedding model if not present | |
MOCK_MODELS[model_name] = { | |
"name": model_name, "modified_at": get_iso_timestamp(), "size": 134000000, | |
"digest": generate_dummy_digest(), | |
"details": {"format": "gguf", "family": "bert", "families": ["bert"], | |
"parameter_size": "33M", "quantization_level": "Q4_K_M"} | |
} | |
print(f"Mock: Auto-created dummy embedding model '{model_name}'.") | |
start_time_ns = time.time_ns() | |
_load_model_if_needed(model_name) # Simulate loading | |
load_duration_ns = random.randint(1000000, 50000000) | |
if isinstance(input_data, str): | |
inputs = [input_data] | |
elif isinstance(input_data, list): | |
inputs = input_data | |
else: | |
return jsonify({"error": "'input' must be a string or a list of strings"}), 400 | |
embeddings = [] | |
prompt_eval_count = 0 | |
for text in inputs: | |
simulate_processing_time(10, 100) | |
embeddings.append(generate_dummy_embedding()) | |
prompt_eval_count += len(text.split()) # Approx tokens | |
total_duration_ns = time.time_ns() - start_time_ns | |
response_data = { | |
"model": model_name, | |
"embeddings": embeddings, | |
# Add dummy stats matching the example format | |
"total_duration": total_duration_ns, | |
"load_duration": load_duration_ns, | |
"prompt_eval_count": prompt_eval_count, | |
# eval_duration, prompt_eval_duration are missing in the example, so omit for now | |
} | |
return jsonify(response_data) | |
@app.route('/api/embeddings', methods=['POST']) | |
def generate_embeddings_old(): | |
print("Warning: /api/embeddings is deprecated, use /api/embed instead.") | |
try: | |
data = request.get_json() | |
model_name = data.get('model') | |
prompt = data.get('prompt') | |
# options = data.get('options') # Unused in mock | |
# keep_alive = data.get('keep_alive', '5m') # Unused in mock | |
except Exception as e: | |
return jsonify({"error": f"Failed to parse JSON: {e}"}), 400 | |
if not model_name: | |
return jsonify({"error": "'model' field is required"}), 400 | |
if not prompt: | |
return jsonify({"error": "'prompt' field is required"}), 400 | |
# Reuse model check/creation from /api/embed | |
if model_name not in MOCK_MODELS: | |
MOCK_MODELS[model_name] = { | |
"name": model_name, "modified_at": get_iso_timestamp(), "size": 134000000, | |
"digest": generate_dummy_digest(), | |
"details": {"format": "gguf", "family": "bert", "families": ["bert"], | |
"parameter_size": "33M", "quantization_level": "Q4_K_M"} | |
} | |
print(f"Mock: Auto-created dummy embedding model '{model_name}' for deprecated endpoint.") | |
_load_model_if_needed(model_name) # Simulate loading | |
simulate_processing_time(10, 100) | |
embedding = generate_dummy_embedding() | |
return jsonify({"embedding": embedding}) | |
# --- Main Execution --- | |
if __name__ == '__main__': | |
print("Starting Mock Ollama API server on port 11434...") | |
print(f"Mock Models Available: {list(MOCK_MODELS.keys())}") | |
# Run on 0.0.0.0 to be accessible externally, default port 11434 used by Ollama | |
app.run(host='0.0.0.0', port=11434, debug=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment