Skip to content

Instantly share code, notes, and snippets.

@recalde
Created September 26, 2024 14:43
Show Gist options
  • Select an option

  • Save recalde/1386b5c9cd3589cd24f254eddfa3b2ef to your computer and use it in GitHub Desktop.

Select an option

Save recalde/1386b5c9cd3589cd24f254eddfa3b2ef to your computer and use it in GitHub Desktop.
python_load_balancer
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
import requests
import random
import time
import boto3
from boto3.dynamodb.conditions import Key
app = FastAPI()
# DynamoDB client and table configuration
dynamodb = boto3.resource('dynamodb', region_name='us-east-1') # Specify your region
table = dynamodb.Table('LoadBalancerState')
class CalculationRequest(BaseModel):
data: dict
callback_url: str
class CallbackRequest(BaseModel):
service_url: str
status: str
@app.post("/calculate")
async def calculate_route(request: CalculationRequest, background_tasks: BackgroundTasks):
# Get the service to forward the request to
service_url = get_next_service()
if service_url:
# Forward the request asynchronously
background_tasks.add_task(forward_request, service_url, request.data, request.callback_url)
return {"message": f"Request forwarded to {service_url}"}
else:
return {"message": "No available services"}, 503
@app.post("/callback")
async def callback_route(request: CallbackRequest):
# Decrement the in-progress count after receiving the callback
decrement_in_progress(request.service_url)
return {"message": "Callback received, load updated"}
def get_next_service():
# Prioritize wave 1 over wave 2
for wave in ["wave1", "wave2"]:
services = get_services_by_wave(wave)
for service in services:
if service['in_progress'] < 5: # Assume max 5 concurrent requests
return service['service_url']
return None
def forward_request(service_url, data, callback_url):
# Increment the in-progress count
increment_in_progress(service_url)
try:
# Forward the request to the service
response = requests.post(service_url, json=data)
response.raise_for_status()
# Simulate a completion callback
simulate_callback(callback_url, service_url)
except Exception as e:
print(f"Failed to forward request to {service_url}: {str(e)}")
finally:
# Decrement the in-progress count after the request completes
decrement_in_progress(service_url)
def increment_in_progress(service_url):
"""Increment the in-progress counter in DynamoDB."""
try:
response = table.update_item(
Key={'service_url': service_url},
UpdateExpression='SET in_progress = if_not_exists(in_progress, :start) + :inc',
ExpressionAttributeValues={':inc': 1, ':start': 0},
ReturnValues="UPDATED_NEW"
)
print(f"Updated in-progress count: {response['Attributes']['in_progress']}")
except Exception as e:
print(f"Failed to increment in-progress for {service_url}: {str(e)}")
def decrement_in_progress(service_url):
"""Decrement the in-progress counter in DynamoDB."""
try:
response = table.update_item(
Key={'service_url': service_url},
UpdateExpression='SET in_progress = if_not_exists(in_progress, :start) - :dec',
ExpressionAttributeValues={':dec': 1, ':start': 0},
ConditionExpression="in_progress > :zero",
ExpressionAttributeValues={':zero': 0},
ReturnValues="UPDATED_NEW"
)
print(f"Updated in-progress count: {response['Attributes']['in_progress']}")
except Exception as e:
print(f"Failed to decrement in-progress for {service_url}: {str(e)}")
def get_services_by_wave(wave):
"""Fetch all services from DynamoDB belonging to the given wave."""
try:
response = table.scan(
FilterExpression=Key('wave').eq(wave)
)
return response.get('Items', [])
except Exception as e:
print(f"Failed to fetch services for wave {wave}: {str(e)}")
return []
def simulate_callback(callback_url, service_url):
"""Simulate a delayed callback when request is complete."""
time.sleep(random.randint(1, 5)) # Simulate delay
callback_data = {
'service_url': service_url,
'status': 'complete'
}
requests.post(callback_url, json=callback_data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment