Created
September 26, 2024 14:43
-
-
Save recalde/1386b5c9cd3589cd24f254eddfa3b2ef to your computer and use it in GitHub Desktop.
python_load_balancer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from fastapi import FastAPI, BackgroundTasks | |
| from pydantic import BaseModel | |
| import requests | |
| import random | |
| import time | |
| import boto3 | |
| from boto3.dynamodb.conditions import Key | |
| app = FastAPI() | |
| # DynamoDB client and table configuration | |
| dynamodb = boto3.resource('dynamodb', region_name='us-east-1') # Specify your region | |
| table = dynamodb.Table('LoadBalancerState') | |
| class CalculationRequest(BaseModel): | |
| data: dict | |
| callback_url: str | |
| class CallbackRequest(BaseModel): | |
| service_url: str | |
| status: str | |
| @app.post("/calculate") | |
| async def calculate_route(request: CalculationRequest, background_tasks: BackgroundTasks): | |
| # Get the service to forward the request to | |
| service_url = get_next_service() | |
| if service_url: | |
| # Forward the request asynchronously | |
| background_tasks.add_task(forward_request, service_url, request.data, request.callback_url) | |
| return {"message": f"Request forwarded to {service_url}"} | |
| else: | |
| return {"message": "No available services"}, 503 | |
| @app.post("/callback") | |
| async def callback_route(request: CallbackRequest): | |
| # Decrement the in-progress count after receiving the callback | |
| decrement_in_progress(request.service_url) | |
| return {"message": "Callback received, load updated"} | |
| def get_next_service(): | |
| # Prioritize wave 1 over wave 2 | |
| for wave in ["wave1", "wave2"]: | |
| services = get_services_by_wave(wave) | |
| for service in services: | |
| if service['in_progress'] < 5: # Assume max 5 concurrent requests | |
| return service['service_url'] | |
| return None | |
| def forward_request(service_url, data, callback_url): | |
| # Increment the in-progress count | |
| increment_in_progress(service_url) | |
| try: | |
| # Forward the request to the service | |
| response = requests.post(service_url, json=data) | |
| response.raise_for_status() | |
| # Simulate a completion callback | |
| simulate_callback(callback_url, service_url) | |
| except Exception as e: | |
| print(f"Failed to forward request to {service_url}: {str(e)}") | |
| finally: | |
| # Decrement the in-progress count after the request completes | |
| decrement_in_progress(service_url) | |
| def increment_in_progress(service_url): | |
| """Increment the in-progress counter in DynamoDB.""" | |
| try: | |
| response = table.update_item( | |
| Key={'service_url': service_url}, | |
| UpdateExpression='SET in_progress = if_not_exists(in_progress, :start) + :inc', | |
| ExpressionAttributeValues={':inc': 1, ':start': 0}, | |
| ReturnValues="UPDATED_NEW" | |
| ) | |
| print(f"Updated in-progress count: {response['Attributes']['in_progress']}") | |
| except Exception as e: | |
| print(f"Failed to increment in-progress for {service_url}: {str(e)}") | |
| def decrement_in_progress(service_url): | |
| """Decrement the in-progress counter in DynamoDB.""" | |
| try: | |
| response = table.update_item( | |
| Key={'service_url': service_url}, | |
| UpdateExpression='SET in_progress = if_not_exists(in_progress, :start) - :dec', | |
| ExpressionAttributeValues={':dec': 1, ':start': 0}, | |
| ConditionExpression="in_progress > :zero", | |
| ExpressionAttributeValues={':zero': 0}, | |
| ReturnValues="UPDATED_NEW" | |
| ) | |
| print(f"Updated in-progress count: {response['Attributes']['in_progress']}") | |
| except Exception as e: | |
| print(f"Failed to decrement in-progress for {service_url}: {str(e)}") | |
| def get_services_by_wave(wave): | |
| """Fetch all services from DynamoDB belonging to the given wave.""" | |
| try: | |
| response = table.scan( | |
| FilterExpression=Key('wave').eq(wave) | |
| ) | |
| return response.get('Items', []) | |
| except Exception as e: | |
| print(f"Failed to fetch services for wave {wave}: {str(e)}") | |
| return [] | |
| def simulate_callback(callback_url, service_url): | |
| """Simulate a delayed callback when request is complete.""" | |
| time.sleep(random.randint(1, 5)) # Simulate delay | |
| callback_data = { | |
| 'service_url': service_url, | |
| 'status': 'complete' | |
| } | |
| requests.post(callback_url, json=callback_data) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment