Skip to content

Instantly share code, notes, and snippets.

@fakedrake
Created March 26, 2025 21:43
Show Gist options
  • Save fakedrake/e199cd77a0f812c9e234b2a09555ebf4 to your computer and use it in GitHub Desktop.
Save fakedrake/e199cd77a0f812c9e234b2a09555ebf4 to your computer and use it in GitHub Desktop.
Dummy implementation of ollama
#!/usr/bin/env python3
import json
import time
import datetime
import random
import string
import hashlib
from flask import Flask, request, jsonify, Response, stream_with_context
app = Flask(__name__)
# --- Helper Functions ---
def generate_dummy_digest():
"""Generates a realistic-looking dummy SHA256 digest."""
return hashlib.sha256(str(random.random()).encode()).hexdigest()
def generate_dummy_embedding(size=384):
"""Generates a dummy embedding vector."""
return [random.uniform(-1, 1) for _ in range(size)]
def get_iso_timestamp():
"""Returns the current UTC time in ISO 8601 format with Z."""
return datetime.datetime.now(datetime.timezone.utc).isoformat().replace("+00:00", "Z")
def simulate_processing_time(min_ms=50, max_ms=500):
"""Sleeps for a random duration to simulate work."""
time.sleep(random.uniform(min_ms / 1000.0, max_ms / 1000.0))
# --- In-memory 'storage' for mock state ---
MOCK_MODELS = {
"llama3:latest": {
"name": "llama3:latest",
"modified_at": get_iso_timestamp(),
"size": 3825819519,
"digest": generate_dummy_digest(),
"details": {
"format": "gguf",
"family": "llama",
"families": ["llama"],
"parameter_size": "7B",
"quantization_level": "Q4_0"
}
},
"codellama:13b": {
"name": "codellama:13b",
"modified_at": get_iso_timestamp(),
"size": 7365960935,
"digest": generate_dummy_digest(),
"details": {
"format": "gguf",
"family": "llama",
"families": ["llama"],
"parameter_size": "13B",
"quantization_level": "Q4_0"
}
},
"llava:latest": {
"name": "llava:latest",
"modified_at": get_iso_timestamp(),
"size": 4100000000,
"digest": generate_dummy_digest(),
"details": {
"format": "gguf",
"family": "llava",
"families": ["llama", "clip"],
"parameter_size": "7B",
"quantization_level": "Q4_K_M"
}
},
"all-minilm:latest": {
"name": "all-minilm:latest",
"modified_at": get_iso_timestamp(),
"size": 134000000,
"digest": generate_dummy_digest(),
"details": {
"format": "gguf",
"family": "bert",
"families": ["bert"],
"parameter_size": "33M",
"quantization_level": "Q4_K_M" # Embedding models often smaller
}
},
}
MOCK_RUNNING_MODELS = {} # track which models are 'loaded' with expiry
MOCK_BLOBS = set() # store digests of 'uploaded' blobs
# --- API Endpoints ---
@app.route('/api/version', methods=['GET'])
def get_version():
return jsonify({"version": "0.5.1-mock"})
@app.route('/api/tags', methods=['GET'])
def list_local_models():
return jsonify({"models": list(MOCK_MODELS.values())})
@app.route('/api/ps', methods=['GET'])
def list_running_models():
now = datetime.datetime.now(datetime.timezone.utc)
running = []
# Clean up expired models
expired_keys = [k for k, v in MOCK_RUNNING_MODELS.items() if v['expires_at'] < now]
for k in expired_keys:
del MOCK_RUNNING_MODELS[k]
for name, data in MOCK_RUNNING_MODELS.items():
if name in MOCK_MODELS:
model_details = MOCK_MODELS[name]
running.append({
"name": name,
"model": name, # Ollama API uses both name and model key
"size": model_details['size'],
"digest": model_details['digest'],
"details": model_details['details'],
"expires_at": data['expires_at'].isoformat().replace("+00:00", "Z"),
"size_vram": model_details['size'] # Mock VRAM usage as model size
})
return jsonify({"models": running})
def _load_model_if_needed(model_name, keep_alive_str="5m"):
"""Simulates loading a model into memory."""
if model_name not in MOCK_MODELS:
return False # Model doesn't exist
# Calculate expiry time based on keep_alive
if isinstance(keep_alive_str, (int, float)) and keep_alive_str <= 0:
# Unload immediately if exists
if model_name in MOCK_RUNNING_MODELS:
del MOCK_RUNNING_MODELS[model_name]
return True # Signal unload request
elif isinstance(keep_alive_str, str) and keep_alive_str.endswith('m'):
try:
minutes = int(keep_alive_str[:-1])
delta = datetime.timedelta(minutes=minutes)
except ValueError:
delta = datetime.timedelta(minutes=5) # Default
else: # Default keep_alive
delta = datetime.timedelta(minutes=5)
now = datetime.datetime.now(datetime.timezone.utc)
MOCK_RUNNING_MODELS[model_name] = {"expires_at": now + delta}
return True # Signal loaded or updated
def _unload_model(model_name):
if model_name in MOCK_RUNNING_MODELS:
del MOCK_RUNNING_MODELS[model_name]
return True
@app.route('/api/generate', methods=['POST'])
def generate_completion():
try:
data = request.get_json()
if not data:
return jsonify({"error": "Invalid JSON"}), 400
except Exception as e:
return jsonify({"error": f"Failed to parse JSON: {e}"}), 400
model_name = data.get('model')
prompt = data.get('prompt', "")
stream = data.get('stream', True)
keep_alive = data.get('keep_alive', '5m') # Default 5 minutes
images = data.get('images')
format_req = data.get('format')
raw_mode = data.get('raw', False)
if not model_name:
return jsonify({"error": "'model' field is required"}), 400
if model_name not in MOCK_MODELS:
return jsonify({"error": f"Model '{model_name}' not found"}), 404
# Handle loading/unloading based on empty prompt and keep_alive
if not prompt:
if isinstance(keep_alive, (int, float)) and keep_alive <= 0:
_unload_model(model_name)
return jsonify({
"model": model_name,
"created_at": get_iso_timestamp(),
"response": "",
"done": True,
"done_reason": "unload"
})
else:
_load_model_if_needed(model_name, keep_alive)
return jsonify({
"model": model_name,
"created_at": get_iso_timestamp(),
"response": "",
"done": True,
"done_reason": "load" # Added for clarity
})
# Actual generation simulation
_load_model_if_needed(model_name, keep_alive) # Ensure model is 'loaded'
start_time_ns = time.time_ns()
load_duration_ns = random.randint(5000000, 200000000) # Simulate some load time if just loaded
prompt_eval_count = len(prompt.split()) # Rough token count
prompt_eval_duration_ns = prompt_eval_count * random.randint(5000000, 15000000)
dummy_response_words = ["This", "is", "a", "dummy", "response", "generated", "by", "the", "mock", "Ollama", "server."]
if images:
dummy_response_words.insert(4, f"(processing {len(images)} image(s))")
if format_req == "json":
dummy_response_words = ['{\n "answer": "This is a dummy JSON response." \n}']
elif isinstance(format_req, dict):
dummy_response_words = [f'{{\n "comment": "Structured output requested, providing dummy JSON.",\n "field1": {random.randint(1,100)},\n "field2": "{random.choice(["A", "B", "C"])}"\n}}']
def generate_stream():
eval_count = 0
eval_duration_ns = 0
context = [random.randint(1, 10000) for _ in range(10)] # Dummy context
for i, word in enumerate(dummy_response_words):
chunk_start_ns = time.time_ns()
resp_chunk = word + (" " if i < len(dummy_response_words) - 1 else "")
yield json.dumps({
"model": model_name,
"created_at": get_iso_timestamp(),
"response": resp_chunk,
"done": False
}) + '\n'
simulate_processing_time(10, 50)
chunk_end_ns = time.time_ns()
eval_count += len(word.split()) # Approx tokens
eval_duration_ns += (chunk_end_ns - chunk_start_ns)
final_response = {
"model": model_name,
"created_at": get_iso_timestamp(),
"response": "", # Empty in final streaming response
"done": True,
"total_duration": time.time_ns() - start_time_ns,
"load_duration": load_duration_ns,
"prompt_eval_count": prompt_eval_count,
"prompt_eval_duration": prompt_eval_duration_ns,
"eval_count": eval_count,
"eval_duration": eval_duration_ns,
}
# Context is not returned in raw mode
if not raw_mode:
final_response["context"] = context
yield json.dumps(final_response) + '\n'
if stream:
return Response(stream_with_context(generate_stream()), mimetype='application/x-ndjson')
else:
# Simulate the whole process for non-streaming
eval_count_total = 0
eval_duration_total_ns = 0
full_response_text = ""
for word in dummy_response_words:
simulate_processing_time(10, 50)
eval_count_total += len(word.split())
full_response_text += word + " "
eval_duration_total_ns = eval_count_total * random.randint(4000000, 12000000)
final_response = {
"model": model_name,
"created_at": get_iso_timestamp(),
"response": full_response_text.strip(),
"done": True,
"total_duration": time.time_ns() - start_time_ns,
"load_duration": load_duration_ns,
"prompt_eval_count": prompt_eval_count,
"prompt_eval_duration": prompt_eval_duration_ns,
"eval_count": eval_count_total,
"eval_duration": eval_duration_total_ns,
}
if not raw_mode:
final_response["context"] = [random.randint(1, 10000) for _ in range(10)]
return jsonify(final_response)
@app.route('/api/chat', methods=['POST'])
def generate_chat_completion():
try:
data = request.get_json()
if not data:
return jsonify({"error": "Invalid JSON"}), 400
except Exception as e:
return jsonify({"error": f"Failed to parse JSON: {e}"}), 400
model_name = data.get('model')
messages = data.get('messages', [])
stream = data.get('stream', True)
keep_alive = data.get('keep_alive', '5m')
format_req = data.get('format')
tools = data.get('tools')
if not model_name:
return jsonify({"error": "'model' field is required"}), 400
if model_name not in MOCK_MODELS:
return jsonify({"error": f"Model '{model_name}' not found"}), 404
# Handle loading/unloading based on empty messages and keep_alive
if not messages:
if isinstance(keep_alive, (int, float)) and keep_alive <= 0:
_unload_model(model_name)
return jsonify({
"model": model_name,
"created_at": get_iso_timestamp(),
"message": {"role": "assistant", "content": ""},
"done_reason": "unload",
"done": True
})
else:
_load_model_if_needed(model_name, keep_alive)
return jsonify({
"model": model_name,
"created_at": get_iso_timestamp(),
"message": {"role": "assistant", "content": ""},
"done_reason": "load",
"done": True
})
# Actual chat simulation
_load_model_if_needed(model_name, keep_alive) # Ensure model is 'loaded'
start_time_ns = time.time_ns()
load_duration_ns = random.randint(5000000, 100000000)
prompt_eval_count = sum(len(msg.get('content', '').split()) for msg in messages) # Rough token count
prompt_eval_duration_ns = prompt_eval_count * random.randint(5000000, 15000000)
dummy_response_words = ["This", "is", "a", "dummy", "chat", "response."]
has_images = any(msg.get('images') for msg in messages)
if has_images:
dummy_response_words.append("(Acknowledging images in input)")
# Simulate tool use if tools are provided
tool_calls = None
if tools and random.random() > 0.5: # Randomly decide to use a tool
dummy_response_words = [] # Tool call replaces content
chosen_tool = random.choice(tools)
func_name = chosen_tool.get('function', {}).get('name', 'unknown_function')
params = chosen_tool.get('function', {}).get('parameters', {}).get('properties', {})
args = {}
for param, details in params.items():
if details.get('type') == 'string':
args[param] = f"dummy_{param}_{random.randint(1,100)}"
elif details.get('type') == 'integer':
args[param] = random.randint(1, 100)
elif details.get('type') == 'boolean':
args[param] = random.choice([True, False])
elif 'enum' in details:
args[param] = random.choice(details['enum'])
else:
args[param] = None # Default for unhandled types
tool_calls = [{ "function": { "name": func_name, "arguments": args } }]
if format_req == "json":
dummy_response_words = ['{\n "chat_answer": "This is a dummy JSON chat response." \n}']
elif isinstance(format_req, dict):
dummy_response_words = [f'{{\n "comment": "Structured output requested for chat.",\n "chat_field": "{random.choice(["Yes", "No", "Maybe"])}"\n}}']
def chat_stream():
eval_count = 0
eval_duration_ns = 0
for i, word in enumerate(dummy_response_words):
chunk_start_ns = time.time_ns()
resp_chunk = word + (" " if i < len(dummy_response_words) - 1 else "")
yield json.dumps({
"model": model_name,
"created_at": get_iso_timestamp(),
"message": {
"role": "assistant",
"content": resp_chunk,
# Images are typically only in user messages
},
"done": False
}) + '\n'
simulate_processing_time(10, 50)
chunk_end_ns = time.time_ns()
eval_count += len(word.split()) # Approx tokens
eval_duration_ns += (chunk_end_ns - chunk_start_ns)
# Simulate tool call generation if applicable (appears after content chunks)
if tool_calls:
yield json.dumps({
"model": model_name,
"created_at": get_iso_timestamp(),
"message": {
"role": "assistant",
"content": "", # Content might be empty if only tool calls generated
"tool_calls": tool_calls
},
"done": False # Usually tool call is intermediate
}) + '\n'
eval_count += 10 # Add some dummy count for tool call generation
final_response = {
"model": model_name,
"created_at": get_iso_timestamp(),
"message": {
"role": "assistant",
"content": "" # Empty in final streaming response
},
"done": True,
"total_duration": time.time_ns() - start_time_ns,
"load_duration": load_duration_ns,
"prompt_eval_count": prompt_eval_count,
"prompt_eval_duration": prompt_eval_duration_ns,
"eval_count": eval_count,
"eval_duration": eval_duration_ns
}
yield json.dumps(final_response) + '\n'
if stream:
return Response(stream_with_context(chat_stream()), mimetype='application/x-ndjson')
else:
# Simulate the whole process for non-streaming
eval_count_total = 0
eval_duration_total_ns = 0
full_response_text = ""
for word in dummy_response_words:
simulate_processing_time(10, 50)
eval_count_total += len(word.split())
full_response_text += word + " "
eval_duration_total_ns = eval_count_total * random.randint(4000000, 12000000)
final_message = {"role": "assistant"}
if tool_calls:
final_message["tool_calls"] = tool_calls
final_message["content"] = "" # Or potentially some text leading to the call
eval_count_total += 10 # Add dummy count for tool call
else:
final_message["content"] = full_response_text.strip()
final_response = {
"model": model_name,
"created_at": get_iso_timestamp(),
"message": final_message,
"done": True,
"total_duration": time.time_ns() - start_time_ns,
"load_duration": load_duration_ns,
"prompt_eval_count": prompt_eval_count,
"prompt_eval_duration": prompt_eval_duration_ns,
"eval_count": eval_count_total,
"eval_duration": eval_duration_total_ns,
}
return jsonify(final_response)
@app.route('/api/create', methods=['POST'])
def create_model():
try:
data = request.get_json()
if not data:
return jsonify({"error": "Invalid JSON"}), 400
except Exception as e:
return jsonify({"error": f"Failed to parse JSON: {e}"}), 400
model_name = data.get('model')
from_model = data.get('from')
files = data.get('files')
quantize = data.get('quantize')
stream_resp = data.get('stream', True) # Note: Renamed variable to avoid conflict
if not model_name:
return jsonify({"error": "'model' field is required"}), 400
def create_stream():
yield json.dumps({"status": "reading model metadata"}) + '\n'
simulate_processing_time(100, 300)
if from_model:
if from_model not in MOCK_MODELS:
yield json.dumps({"status": f"error: base model '{from_model}' not found"}) + '\n'
return # Stop simulation
yield json.dumps({"status": f"creating model '{model_name}' from '{from_model}'"}) + '\n'
simulate_processing_time(50, 150)
# Simulate using existing layers
for _ in range(random.randint(2, 5)):
yield json.dumps({"status": f"using already created layer sha256:{generate_dummy_digest()[:12]}..."}) + '\n'
simulate_processing_time(20, 80)
# Simulate writing new layers (e.g., system prompt)
yield json.dumps({"status": f"writing layer sha256:{generate_dummy_digest()[:12]}..."}) + '\n'
simulate_processing_time(100, 400)
elif files:
is_gguf = any(fname.endswith(".gguf") for fname in files.keys())
is_safetensors = any(fname.endswith(".safetensors") for fname in files.keys())
if is_gguf:
yield json.dumps({"status": "parsing GGUF"}) + '\n'
simulate_processing_time(200, 600)
for fname, digest in files.items():
if digest not in MOCK_BLOBS:
yield json.dumps({"status": f"error: blob {digest} not found"}) + '\n'
return
yield json.dumps({"status": f"using layer {digest}"}) + '\n'
simulate_processing_time(50, 100)
elif is_safetensors:
yield json.dumps({"status": "converting model"}) + '\n'
simulate_processing_time(500, 1500)
missing_blobs = [d for d in files.values() if d not in MOCK_BLOBS]
if missing_blobs:
yield json.dumps({"status": f"error: blobs not found: {', '.join(missing_blobs)}"}) + '\n'
return
yield json.dumps({"status": "creating new layer sha256:..."}) + '\n'
simulate_processing_time(200, 500)
yield json.dumps({"status": "using autodetected template dummy-template"}) + '\n'
else:
yield json.dumps({"status": "error: unknown file types provided"}) + '\n'
return
else:
yield json.dumps({"status": "error: must provide 'from' or 'files'"}) + '\n'
return # Stop simulation
if quantize:
yield json.dumps({"status": f"quantizing model to {quantize}"}) + '\n'
simulate_processing_time(1000, 5000) # Quantization takes time
yield json.dumps({"status": "creating new quantized layer sha256:..."}) + '\n'
simulate_processing_time(200, 600)
yield json.dumps({"status": "writing manifest"}) + '\n'
simulate_processing_time(50, 150)
yield json.dumps({"status": "success"}) + '\n'
# Add the new model to our mock list
new_digest = generate_dummy_digest()
MOCK_MODELS[model_name] = {
"name": model_name,
"modified_at": get_iso_timestamp(),
"size": random.randint(1000000000, 8000000000),
"digest": new_digest,
"details": {
"format": "gguf", # Assume GGUF output for simplicity
"family": "unknown",
"families": [],
"parameter_size": "N/A",
"quantization_level": quantize if quantize else "N/A"
}
}
# Add its digest to blobs as it 'exists' now
MOCK_BLOBS.add(f"sha256:{new_digest}")
if stream_resp:
return Response(stream_with_context(create_stream()), mimetype='application/x-ndjson')
else:
# Simulate the process non-streamingly (just check for errors)
if from_model and from_model not in MOCK_MODELS:
return jsonify({"status": f"error: base model '{from_model}' not found"}), 404
if files:
missing_blobs = [d for d in files.values() if d not in MOCK_BLOBS]
if missing_blobs:
return jsonify({"status": f"error: blobs not found: {', '.join(missing_blobs)}"}), 400
if not from_model and not files:
return jsonify({"status": "error: must provide 'from' or 'files'"}), 400
# If checks pass, simulate success and add model
new_digest = generate_dummy_digest()
MOCK_MODELS[model_name] = {
"name": model_name,
"modified_at": get_iso_timestamp(),
"size": random.randint(1000000000, 8000000000),
"digest": new_digest,
"details": {
"format": "gguf",
"family": "unknown",
"families": [],
"parameter_size": "N/A",
"quantization_level": quantize if quantize else "N/A"
}
}
MOCK_BLOBS.add(f"sha256:{new_digest}")
return jsonify({"status": "success"})
@app.route('/api/blobs/<digest>', methods=['HEAD'])
def check_blob_exists(digest):
# Digest format is like sha256:abcdef...
if digest in MOCK_BLOBS:
return Response(status=200)
else:
# Randomly pretend some exist even if not explicitly created?
# if random.random() > 0.8:
# return Response(status=200)
return Response(status=404)
@app.route('/api/blobs/<digest>', methods=['POST'])
def push_blob(digest):
# Simulate receiving the file data (request.data)
# In a real scenario, you'd stream this to a file and verify the hash.
# Here, we just check if the provided digest looks valid and add it.
if not digest.startswith("sha256:") or len(digest) != 71: # sha256: + 64 hex chars
return Response("Invalid digest format", status=400)
# Simulate potential mismatch (optional)
# if random.random() < 0.1: # 10% chance of mismatch
# return Response("Digest mismatch", status=400)
MOCK_BLOBS.add(digest)
print(f"Mock: Blob {digest} 'uploaded'.")
return Response(status=201) # Created
@app.route('/api/show', methods=['POST'])
def show_model_info():
try:
data = request.get_json()
if not data:
return jsonify({"error": "Invalid JSON"}), 400
model_name = data.get('model')
verbose = data.get('verbose', False)
except Exception as e:
return jsonify({"error": f"Failed to parse JSON: {e}"}), 400
if not model_name:
return jsonify({"error": "'model' field is required"}), 400
if model_name not in MOCK_MODELS:
return jsonify({"error": f"Model '{model_name}' not found"}), 404
model_data = MOCK_MODELS[model_name]
# Generate dummy verbose data if requested
tokens_data = ["[UNUSED]", "[UNK]", "[CLS]", "hello", "world"] if verbose else []
merges_data = ["h e", "e l", "l o"] if verbose else []
token_types_data = [1, 1, 1, 1, 1] if verbose else []
response_data = {
"modelfile": f"# Mock Modelfile for {model_name}\nFROM sha256:{model_data['digest']}\nTEMPLATE \"\"\"{{{{ .Prompt }}}}\"\"\"\nPARAMETER stop \"\\n\"",
"parameters": "num_ctx 4096\nstop \"<|endoftext|>\"\n",
"template": "{{ .Prompt }}",
"details": model_data['details'],
"model_info": {
"general.architecture": model_data['details'].get('family', 'unknown'),
"general.file_type": 1, # Dummy value
"general.parameter_count": model_data['size'] * 2, # Wild guess
"general.quantization_version": 2,
f"{model_data['details'].get('family', 'unknown')}.context_length": 4096,
f"{model_data['details'].get('family', 'unknown')}.embedding_length": 4096,
f"{model_data['details'].get('family', 'unknown')}.block_count": 32,
# Add more dummy keys based on family if needed
"tokenizer.ggml.model": "gpt2", # common default
"tokenizer.ggml.tokens": tokens_data,
"tokenizer.ggml.merges": merges_data,
"tokenizer.ggml.token_type": token_types_data,
"tokenizer.ggml.bos_token_id": 1,
"tokenizer.ggml.eos_token_id": 2,
}
}
# Add optional fields based on verbose or specific model types if needed
if verbose:
response_data["license"] = "Mock License: Apache 2.0"
response_data["system"] = f"This is a mock system prompt for {model_name}."
return jsonify(response_data)
@app.route('/api/copy', methods=['POST'])
def copy_model():
try:
data = request.get_json()
source = data.get('source')
destination = data.get('destination')
except Exception as e:
return jsonify({"error": f"Failed to parse JSON: {e}"}), 400
if not source or not destination:
return jsonify({"error": "'source' and 'destination' fields are required"}), 400
if source not in MOCK_MODELS:
return jsonify({"error": f"Source model '{source}' not found"}), 404
if destination in MOCK_MODELS:
# Ollama seems to overwrite, so we mimic that behavior
print(f"Mock: Overwriting existing model '{destination}' during copy.")
pass
# Create a copy with a new modified time
MOCK_MODELS[destination] = MOCK_MODELS[source].copy()
MOCK_MODELS[destination]["name"] = destination
MOCK_MODELS[destination]["modified_at"] = get_iso_timestamp()
print(f"Mock: Copied model '{source}' to '{destination}'.")
return Response(status=200)
@app.route('/api/delete', methods=['DELETE'])
def delete_model():
try:
data = request.get_json()
model_name = data.get('model')
except Exception as e:
return jsonify({"error": f"Failed to parse JSON: {e}"}), 400
if not model_name:
return jsonify({"error": "'model' field is required"}), 400
if model_name in MOCK_MODELS:
# Also remove associated blob digest maybe? Depends on real Ollama logic.
# digest_to_remove = f"sha256:{MOCK_MODELS[model_name]['digest']}"
# if digest_to_remove in MOCK_BLOBS:
# MOCK_BLOBS.remove(digest_to_remove)
del MOCK_MODELS[model_name]
# Also unload if running
_unload_model(model_name)
print(f"Mock: Deleted model '{model_name}'.")
return Response(status=200)
else:
return jsonify({"error": f"Model '{model_name}' not found"}), 404
@app.route('/api/pull', methods=['POST'])
def pull_model():
try:
data = request.get_json()
model_name = data.get('model')
stream_resp = data.get('stream', True) # Renamed variable
insecure = data.get('insecure', False) # Parameter exists but unused in mock
except Exception as e:
return jsonify({"error": f"Failed to parse JSON: {e}"}), 400
if not model_name:
return jsonify({"error": "'model' field is required"}), 400
# Simulate checking if model already exists locally
if model_name in MOCK_MODELS:
print(f"Mock: Model '{model_name}' already exists locally.")
if stream_resp:
def already_exists_stream():
yield json.dumps({"status": "success"}) + '\n'
return Response(stream_with_context(already_exists_stream()), mimetype='application/x-ndjson')
else:
return jsonify({"status": "success"})
def pull_stream():
yield json.dumps({"status": "pulling manifest"}) + '\n'
simulate_processing_time(200, 500)
num_layers = random.randint(1, 5)
total_size = random.randint(100000000, 5000000000)
layer_size = total_size // num_layers
new_digest = generate_dummy_digest() # For the final model
for i in range(num_layers):
layer_digest = generate_dummy_digest()
MOCK_BLOBS.add(f"sha256:{layer_digest}") # Add blob as it's 'downloaded'
yield json.dumps({
"status": f"downloading sha256:{layer_digest[:12]}...",
"digest": f"sha256:{layer_digest}",
"total": layer_size,
"completed": 0
}) + '\n'
completed = 0
while completed < layer_size:
simulate_processing_time(50, 200)
increment = min(layer_size // random.randint(5, 15), layer_size - completed)
completed += increment
yield json.dumps({
"status": f"downloading sha256:{layer_digest[:12]}...",
"digest": f"sha256:{layer_digest}",
"total": layer_size,
"completed": completed
}) + '\n'
yield json.dumps({
"status": f"verifying sha256 digest",
"digest": f"sha256:{layer_digest}", # Added digest for context
}) + '\n'
simulate_processing_time(50, 150)
yield json.dumps({"status": "writing manifest"}) + '\n'
simulate_processing_time(50, 100)
yield json.dumps({"status": "removing any unused layers"}) + '\n'
simulate_processing_time(20, 50)
yield json.dumps({"status": "success"}) + '\n'
# Add the pulled model to our mock list
MOCK_MODELS[model_name] = {
"name": model_name,
"modified_at": get_iso_timestamp(),
"size": total_size,
"digest": new_digest,
"details": { # Generic details for pulled model
"format": "gguf",
"family": "unknown",
"families": [],
"parameter_size": f"{(total_size / 1e9):.1f}B",
"quantization_level": "Q4_0" # Common default
}
}
MOCK_BLOBS.add(f"sha256:{new_digest}")
if stream_resp:
return Response(stream_with_context(pull_stream()), mimetype='application/x-ndjson')
else:
# Simulate non-streaming pull (just add the model)
total_size = random.randint(100000000, 5000000000)
new_digest = generate_dummy_digest()
MOCK_MODELS[model_name] = {
"name": model_name,
"modified_at": get_iso_timestamp(),
"size": total_size,
"digest": new_digest,
"details": {
"format": "gguf",
"family": "unknown",
"families": [],
"parameter_size": f"{(total_size / 1e9):.1f}B",
"quantization_level": "Q4_0"
}
}
# Simulate adding blobs for layers + manifest
for _ in range(random.randint(2, 6)):
MOCK_BLOBS.add(f"sha256:{generate_dummy_digest()}")
MOCK_BLOBS.add(f"sha256:{new_digest}")
print(f"Mock: Pulled model '{model_name}' (non-streaming).")
return jsonify({"status": "success"})
@app.route('/api/push', methods=['POST'])
def push_model():
try:
data = request.get_json()
model_name = data.get('model') # Expects <namespace>/<model>:<tag>
stream_resp = data.get('stream', True) # Renamed variable
insecure = data.get('insecure', False) # Parameter exists but unused in mock
except Exception as e:
return jsonify({"error": f"Failed to parse JSON: {e}"}), 400
if not model_name or '/' not in model_name:
return jsonify({"error": "'model' field must be in the format <namespace>/<model>:<tag>"}), 400
# Extract base model name to check if it exists locally
base_model_name = model_name.split('/')[-1]
if base_model_name not in MOCK_MODELS:
# Try without tag
base_model_name_no_tag = base_model_name.split(':')[0] + ":latest"
if base_model_name_no_tag not in MOCK_MODELS:
# Try just the name part assuming :latest
if base_model_name.split(':')[0] + ":latest" in MOCK_MODELS:
base_model_name = base_model_name.split(':')[0] + ":latest"
else:
return jsonify({"error": f"Model '{base_model_name}' not found locally"}), 404
else:
base_model_name = base_model_name_no_tag
local_model_data = MOCK_MODELS[base_model_name]
def push_stream():
yield json.dumps({"status": "retrieving manifest"}) + '\n'
simulate_processing_time(100, 300)
# Simulate checking layers/blobs on the 'remote'
# Assume some layers need uploading
num_layers_to_upload = random.randint(1, 3)
layer_digests = [f"sha256:{generate_dummy_digest()}" for _ in range(num_layers_to_upload)]
total_upload_size = local_model_data['size'] // random.randint(2, 5) # Simulate partial upload
layer_size = total_upload_size // num_layers_to_upload if num_layers_to_upload > 0 else 0
for layer_digest in layer_digests:
yield json.dumps({
"status": "starting upload",
"digest": layer_digest,
"total": layer_size
}) + '\n'
completed = 0
while completed < layer_size:
simulate_processing_time(50, 200)
increment = min(layer_size // random.randint(5, 15), layer_size - completed)
completed += increment
yield json.dumps({
"status": "uploading", # Different status? API doc uses 'starting upload' repeatedly
"digest": layer_digest,
"total": layer_size,
"completed": completed
}) + '\n'
yield json.dumps({"status": "pushing manifest"}) + '\n'
simulate_processing_time(100, 200)
yield json.dumps({"status": "success"}) + '\n'
if stream_resp:
return Response(stream_with_context(push_stream()), mimetype='application/x-ndjson')
else:
# Simulate non-streaming push
print(f"Mock: Pushed model '{model_name}' (non-streaming).")
return jsonify({"status": "success"})
@app.route('/api/embed', methods=['POST'])
def generate_embeddings_new():
try:
data = request.get_json()
model_name = data.get('model')
input_data = data.get('input') # Can be string or list of strings
# options = data.get('options') # Unused in mock
# keep_alive = data.get('keep_alive', '5m') # Unused in mock
except Exception as e:
return jsonify({"error": f"Failed to parse JSON: {e}"}), 400
if not model_name:
return jsonify({"error": "'model' field is required"}), 400
if not input_data:
return jsonify({"error": "'input' field is required"}), 400
# Basic check for embedding model type (optional)
# if "minilm" not in model_name and "bert" not in model_name:
# print(f"Warning: Model '{model_name}' might not be an embedding model.")
if model_name not in MOCK_MODELS:
# Let's auto-create a dummy embedding model if not present
MOCK_MODELS[model_name] = {
"name": model_name, "modified_at": get_iso_timestamp(), "size": 134000000,
"digest": generate_dummy_digest(),
"details": {"format": "gguf", "family": "bert", "families": ["bert"],
"parameter_size": "33M", "quantization_level": "Q4_K_M"}
}
print(f"Mock: Auto-created dummy embedding model '{model_name}'.")
start_time_ns = time.time_ns()
_load_model_if_needed(model_name) # Simulate loading
load_duration_ns = random.randint(1000000, 50000000)
if isinstance(input_data, str):
inputs = [input_data]
elif isinstance(input_data, list):
inputs = input_data
else:
return jsonify({"error": "'input' must be a string or a list of strings"}), 400
embeddings = []
prompt_eval_count = 0
for text in inputs:
simulate_processing_time(10, 100)
embeddings.append(generate_dummy_embedding())
prompt_eval_count += len(text.split()) # Approx tokens
total_duration_ns = time.time_ns() - start_time_ns
response_data = {
"model": model_name,
"embeddings": embeddings,
# Add dummy stats matching the example format
"total_duration": total_duration_ns,
"load_duration": load_duration_ns,
"prompt_eval_count": prompt_eval_count,
# eval_duration, prompt_eval_duration are missing in the example, so omit for now
}
return jsonify(response_data)
@app.route('/api/embeddings', methods=['POST'])
def generate_embeddings_old():
print("Warning: /api/embeddings is deprecated, use /api/embed instead.")
try:
data = request.get_json()
model_name = data.get('model')
prompt = data.get('prompt')
# options = data.get('options') # Unused in mock
# keep_alive = data.get('keep_alive', '5m') # Unused in mock
except Exception as e:
return jsonify({"error": f"Failed to parse JSON: {e}"}), 400
if not model_name:
return jsonify({"error": "'model' field is required"}), 400
if not prompt:
return jsonify({"error": "'prompt' field is required"}), 400
# Reuse model check/creation from /api/embed
if model_name not in MOCK_MODELS:
MOCK_MODELS[model_name] = {
"name": model_name, "modified_at": get_iso_timestamp(), "size": 134000000,
"digest": generate_dummy_digest(),
"details": {"format": "gguf", "family": "bert", "families": ["bert"],
"parameter_size": "33M", "quantization_level": "Q4_K_M"}
}
print(f"Mock: Auto-created dummy embedding model '{model_name}' for deprecated endpoint.")
_load_model_if_needed(model_name) # Simulate loading
simulate_processing_time(10, 100)
embedding = generate_dummy_embedding()
return jsonify({"embedding": embedding})
# --- Main Execution ---
if __name__ == '__main__':
print("Starting Mock Ollama API server on port 11434...")
print(f"Mock Models Available: {list(MOCK_MODELS.keys())}")
# Run on 0.0.0.0 to be accessible externally, default port 11434 used by Ollama
app.run(host='0.0.0.0', port=11434, debug=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment