Created
April 10, 2025 14:53
-
-
Save kordless/861c4f119e78cdb9de6f569f3d87a840 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Adaptive Connector Framework (ACF) | |
A self-bootstrapping alternative to MCP that dynamically builds and tests | |
connectors based on current needs. The system evolves its own capabilities | |
through iterative learning and testing. | |
Key components: | |
1. Registry - Manages available connectors and their capabilities | |
2. Connector Builder - Dynamically creates new connectors | |
3. Test Suite - Validates connectors as they're built | |
4. Schema Extractor - Learns API formats from documentation | |
5. Redis Integration - Provides persistent storage | |
""" | |
import asyncio | |
import inspect | |
import json | |
import logging | |
import re | |
import typing | |
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union | |
import aiohttp | |
import redis.asyncio as redis | |
import importlib.util | |
import sys | |
import os | |
import uuid | |
from dataclasses import dataclass, field, asdict | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger("adaptive_connector") | |
# ========== Data Models ========== | |
@dataclass | |
class Parameter: | |
"""Parameter definition for connector functions""" | |
name: str | |
type: str | |
description: str | |
required: bool = True | |
default: Any = None | |
@dataclass | |
class ConnectorFunction: | |
"""Definition of a callable function in a connector""" | |
name: str | |
description: str | |
parameters: List[Parameter] = field(default_factory=list) | |
return_type: str = "Any" | |
success_rate: float = 0.0 | |
call_count: int = 0 | |
success_count: int = 0 | |
def to_dict(self) -> Dict[str, Any]: | |
"""Convert to dictionary for storage and serialization""" | |
return { | |
"name": self.name, | |
"description": self.description, | |
"parameters": [asdict(p) for p in self.parameters], | |
"return_type": self.return_type, | |
"success_rate": self.success_rate, | |
"call_count": self.call_count, | |
"success_count": self.success_count, | |
} | |
@classmethod | |
def from_dict(cls, data: Dict[str, Any]) -> 'ConnectorFunction': | |
"""Create from dictionary""" | |
params_data = data.pop("parameters", []) | |
params = [Parameter(**p) for p in params_data] | |
return cls(**{**data, "parameters": params}) | |
def update_metrics(self, success: bool) -> None: | |
"""Update success metrics after a call""" | |
self.call_count += 1 | |
if success: | |
self.success_count += 1 | |
self.success_rate = self.success_count / self.call_count if self.call_count else 0.0 | |
@dataclass | |
class Connector: | |
"""A connector to an external service""" | |
id: str | |
name: str | |
description: str | |
version: str = "0.1.0" | |
functions: Dict[str, ConnectorFunction] = field(default_factory=dict) | |
config: Dict[str, Any] = field(default_factory=dict) | |
def to_dict(self) -> Dict[str, Any]: | |
"""Convert to dictionary for storage""" | |
return { | |
"id": self.id, | |
"name": self.name, | |
"description": self.description, | |
"version": self.version, | |
"functions": {name: fn.to_dict() for name, fn in self.functions.items()}, | |
"config": self.config, | |
} | |
@classmethod | |
def from_dict(cls, data: Dict[str, Any]) -> 'Connector': | |
"""Create from dictionary""" | |
functions_data = data.pop("functions", {}) | |
functions = { | |
name: ConnectorFunction.from_dict(fn_data) | |
for name, fn_data in functions_data.items() | |
} | |
return cls(**{**data, "functions": functions}) | |
# ========== Registry ========== | |
class ConnectorRegistry: | |
"""Registry for managing connectors""" | |
def __init__(self, redis_client: redis.Redis): | |
self.redis = redis_client | |
self.connectors: Dict[str, Connector] = {} | |
async def load_all(self) -> None: | |
"""Load all connectors from Redis""" | |
keys = await self.redis.keys("connector:*") | |
for key in keys: | |
connector_id = key.decode('utf-8').split(':')[1] | |
await self.load(connector_id) | |
async def load(self, connector_id: str) -> Optional[Connector]: | |
"""Load a connector from Redis""" | |
data = await self.redis.get(f"connector:{connector_id}") | |
if data: | |
connector_dict = json.loads(data) | |
connector = Connector.from_dict(connector_dict) | |
self.connectors[connector_id] = connector | |
return connector | |
return None | |
async def save(self, connector: Connector) -> None: | |
"""Save a connector to Redis""" | |
self.connectors[connector.id] = connector | |
await self.redis.set( | |
f"connector:{connector.id}", | |
json.dumps(connector.to_dict()) | |
) | |
async def list(self) -> List[Dict[str, Any]]: | |
"""List all registered connectors""" | |
return [ | |
{"id": c.id, "name": c.name, "description": c.description, "version": c.version} | |
for c in self.connectors.values() | |
] | |
async def get(self, connector_id: str) -> Optional[Connector]: | |
"""Get a connector by ID""" | |
if connector_id in self.connectors: | |
return self.connectors[connector_id] | |
return await self.load(connector_id) | |
async def delete(self, connector_id: str) -> bool: | |
"""Delete a connector""" | |
if connector_id in self.connectors: | |
del self.connectors[connector_id] | |
return bool(await self.redis.delete(f"connector:{connector_id}")) | |
# ========== Connector Builder ========== | |
class ConnectorBuilder: | |
"""Dynamically builds connectors based on specifications""" | |
def __init__(self, registry: ConnectorRegistry, redis_client: redis.Redis): | |
self.registry = registry | |
self.redis = redis_client | |
async def create_connector( | |
self, | |
name: str, | |
description: str, | |
base_url: Optional[str] = None, | |
auth_type: Optional[str] = None, | |
auth_params: Optional[Dict[str, Any]] = None | |
) -> Connector: | |
"""Create a new connector with basic configuration""" | |
connector_id = str(uuid.uuid4()) | |
config = { | |
"base_url": base_url, | |
"auth_type": auth_type, | |
"auth_params": auth_params or {} | |
} | |
connector = Connector( | |
id=connector_id, | |
name=name, | |
description=description, | |
config=config | |
) | |
await self.registry.save(connector) | |
return connector | |
async def create_function( | |
self, | |
connector_id: str, | |
name: str, | |
description: str, | |
parameters: List[Parameter], | |
return_type: str, | |
implementation: str | |
) -> Optional[ConnectorFunction]: | |
""" | |
Add a function to a connector with implementation code | |
Args: | |
connector_id: ID of the connector | |
name: Name of the function | |
description: Description of what the function does | |
parameters: List of Parameter objects | |
return_type: Return type as string | |
implementation: Python code for the function implementation | |
""" | |
connector = await self.registry.get(connector_id) | |
if not connector: | |
logger.error(f"Connector {connector_id} not found") | |
return None | |
# Create the function metadata | |
func_def = ConnectorFunction( | |
name=name, | |
description=description, | |
parameters=parameters, | |
return_type=return_type | |
) | |
# Store the implementation code | |
await self.redis.set( | |
f"connector:{connector_id}:function:{name}:impl", | |
implementation | |
) | |
# Update the connector | |
connector.functions[name] = func_def | |
await self.registry.save(connector) | |
return func_def | |
async def generate_client_module(self, connector_id: str) -> Optional[str]: | |
"""Generate a Python module for the connector""" | |
connector = await self.registry.get(connector_id) | |
if not connector: | |
return None | |
# Start building the module code | |
module_code = [ | |
f'"""{connector.name} - {connector.description}"""', | |
"", | |
"import asyncio", | |
"import json", | |
"import aiohttp", | |
"from typing import Dict, Any, List, Optional, Union", | |
"", | |
f"class {connector.name.replace(' ', '')}Client:", | |
f' """Client for {connector.name}"""', | |
"", | |
" def __init__(self, config=None):", | |
" self.config = config or {}", | |
' self.base_url = self.config.get("base_url")', | |
" self.session = None", | |
"", | |
" async def __aenter__(self):", | |
" self.session = aiohttp.ClientSession()", | |
" return self", | |
"", | |
" async def __aexit__(self, exc_type, exc_val, exc_tb):", | |
" if self.session:", | |
" await self.session.close()", | |
"" | |
] | |
# Add function implementations | |
for func_name, func_def in connector.functions.items(): | |
impl = await self.redis.get(f"connector:{connector_id}:function:{func_name}:impl") | |
if not impl: | |
continue | |
# Extract the function code and indent it | |
impl_code = impl.decode('utf-8') | |
indented_impl = "\n ".join(impl_code.split('\n')) | |
# Generate the function signature | |
params_str = ", ".join([ | |
f"{p.name}: {p.type}" + (f" = {p.default}" if p.default is not None else "") | |
for p in func_def.parameters | |
]) | |
# Add the function to the module | |
module_code.extend([ | |
f" async def {func_name}(self, {params_str}):", | |
f' """{func_def.description}"""', | |
f" {indented_impl}", | |
"" | |
]) | |
return "\n".join(module_code) | |
async def load_connector_module(self, connector_id: str) -> Optional[Any]: | |
"""Load the connector as a Python module""" | |
module_code = await self.generate_client_module(connector_id) | |
if not module_code: | |
return None | |
connector = await self.registry.get(connector_id) | |
if not connector: | |
return None | |
module_name = f"adaptive_connector_{connector.id}" | |
# Create a module from the code | |
spec = importlib.util.find_spec("adaptive_connector_base") | |
if spec is None: | |
# Create a base module if it doesn't exist | |
module_spec = importlib.machinery.ModuleSpec( | |
"adaptive_connector_base", | |
importlib.machinery.SourceFileLoader("adaptive_connector_base", "") | |
) | |
module = importlib.util.module_from_spec(module_spec) | |
sys.modules["adaptive_connector_base"] = module | |
# Create and load the module | |
module_spec = importlib.machinery.ModuleSpec( | |
module_name, | |
importlib.machinery.SourceFileLoader(module_name, "") | |
) | |
module = importlib.util.module_from_spec(module_spec) | |
sys.modules[module_name] = module | |
# Execute the module code in the module's namespace | |
exec(module_code, module.__dict__) | |
return module | |
# ========== Schema Extractor ========== | |
class SchemaExtractor: | |
"""Extracts API schemas from documentation""" | |
async def extract_from_openapi(self, openapi_json: Dict[str, Any]) -> Dict[str, Any]: | |
"""Extract schema from OpenAPI JSON""" | |
endpoints = {} | |
# Process paths and methods | |
paths = openapi_json.get("paths", {}) | |
for path, methods in paths.items(): | |
for method, details in methods.items(): | |
if method in ["get", "post", "put", "delete", "patch"]: | |
endpoint_id = f"{method.upper()}_{path.replace('/', '_')}" | |
# Extract parameters | |
parameters = [] | |
for param in details.get("parameters", []): | |
param_schema = param.get("schema", {}) | |
parameters.append(Parameter( | |
name=param.get("name", ""), | |
type=param_schema.get("type", "string"), | |
description=param.get("description", ""), | |
required=param.get("required", False) | |
)) | |
# Handle request body if present | |
if "requestBody" in details: | |
content = details["requestBody"].get("content", {}) | |
schema = next(iter(content.values()), {}).get("schema", {}) | |
if "properties" in schema: | |
for prop_name, prop in schema["properties"].items(): | |
parameters.append(Parameter( | |
name=prop_name, | |
type=prop.get("type", "string"), | |
description=prop.get("description", ""), | |
required=prop_name in schema.get("required", []) | |
)) | |
# Add to endpoints | |
endpoints[endpoint_id] = { | |
"path": path, | |
"method": method, | |
"summary": details.get("summary", ""), | |
"description": details.get("description", ""), | |
"parameters": parameters, | |
"response_schema": details.get("responses", {}) | |
} | |
return endpoints | |
async def extract_from_text(self, api_docs: str) -> Dict[str, Any]: | |
"""Extract API schema from text documentation (basic implementation)""" | |
endpoints = {} | |
# Simple regex patterns to find endpoints | |
endpoint_pattern = r"(GET|POST|PUT|DELETE|PATCH)\s+(/[\w/{}]+)" | |
param_pattern = r"(\w+)\s*\((\w+)\):\s*([^\n]+)" | |
# Find endpoints | |
for match in re.finditer(endpoint_pattern, api_docs, re.IGNORECASE): | |
method, path = match.groups() | |
endpoint_id = f"{method.upper()}_{path.replace('/', '_')}" | |
# Find description (text after the endpoint until the next section) | |
start_pos = match.end() | |
next_section = api_docs.find("\n\n", start_pos) | |
if next_section == -1: | |
next_section = len(api_docs) | |
description = api_docs[start_pos:next_section].strip() | |
# Find parameters | |
parameters = [] | |
for param_match in re.finditer(param_pattern, description): | |
name, typ, desc = param_match.groups() | |
parameters.append(Parameter( | |
name=name, | |
type=typ, | |
description=desc, | |
required=True # Assume required by default | |
)) | |
endpoints[endpoint_id] = { | |
"path": path, | |
"method": method.upper(), | |
"description": description, | |
"parameters": parameters | |
} | |
return endpoints | |
async def generate_connector_from_schema( | |
self, | |
builder: ConnectorBuilder, | |
name: str, | |
description: str, | |
base_url: str, | |
endpoints: Dict[str, Any] | |
) -> Optional[Connector]: | |
"""Generate a connector from extracted schema""" | |
# Create the connector | |
connector = await builder.create_connector( | |
name=name, | |
description=description, | |
base_url=base_url | |
) | |
if not connector: | |
return None | |
# Create functions for each endpoint | |
for endpoint_id, endpoint in endpoints.items(): | |
func_name = endpoint_id.lower() | |
# Convert endpoint parameters to function parameters | |
parameters = endpoint.get("parameters", []) | |
# Generate implementation code | |
path = endpoint["path"] | |
method = endpoint["method"].lower() | |
# Replace path parameters with f-string format | |
for param in parameters: | |
if f"{{{param.name}}}" in path: | |
path = path.replace(f"{{{param.name}}}", f"{{{param.name}}}") | |
# Build the implementation code | |
impl_code = [ | |
"if not self.session:", | |
" self.session = aiohttp.ClientSession()", | |
"", | |
f'url = f"{self.base_url}{path}"', | |
"" | |
] | |
# Add query parameters for GET | |
if method == "get": | |
query_params = [p for p in parameters if f"{{{p.name}}}" not in path] | |
if query_params: | |
impl_code.append("params = {") | |
for p in query_params: | |
impl_code.append(f' "{p.name}": {p.name},') | |
impl_code.append("}") | |
impl_code.append("") | |
impl_code.append("async with self.session.get(url, params=params) as response:") | |
else: | |
impl_code.append("async with self.session.get(url) as response:") | |
# Add body parameters for POST/PUT/PATCH | |
elif method in ["post", "put", "patch"]: | |
body_params = [p for p in parameters if f"{{{p.name}}}" not in path] | |
if body_params: | |
impl_code.append("data = {") | |
for p in body_params: | |
impl_code.append(f' "{p.name}": {p.name},') | |
impl_code.append("}") | |
impl_code.append("") | |
impl_code.append(f"async with self.session.{method}(url, json=data) as response:") | |
else: | |
impl_code.append(f"async with self.session.{method}(url) as response:") | |
# DELETE method | |
else: | |
impl_code.append("async with self.session.delete(url) as response:") | |
# Add response handling | |
impl_code.extend([ | |
" response.raise_for_status()", | |
" return await response.json()" | |
]) | |
# Create the function | |
await builder.create_function( | |
connector_id=connector.id, | |
name=func_name, | |
description=endpoint.get("description", ""), | |
parameters=[p for p in parameters], | |
return_type="Dict[str, Any]", | |
implementation="\n".join(impl_code) | |
) | |
return connector | |
# ========== Test Suite ========== | |
class TestSuite: | |
"""Tests connectors as they are built""" | |
def __init__(self, registry: ConnectorRegistry): | |
self.registry = registry | |
async def generate_test(self, connector_id: str, function_name: str) -> str: | |
"""Generate a test for a connector function""" | |
connector = await self.registry.get(connector_id) | |
if not connector or function_name not in connector.functions: | |
return "# Failed to generate test - connector or function not found" | |
func = connector.functions[function_name] | |
# Generate test code | |
test_code = [ | |
"import asyncio", | |
"import unittest", | |
"from adaptive_connector import ConnectorRegistry, ConnectorBuilder", | |
"import redis.asyncio as redis", | |
"", | |
f"class Test{function_name.capitalize()}(unittest.IsolatedAsyncioTestCase):", | |
" async def asyncSetUp(self):", | |
" self.redis = redis.Redis()", | |
" self.registry = ConnectorRegistry(self.redis)", | |
" self.builder = ConnectorBuilder(self.registry, self.redis)", | |
f' self.connector_id = "{connector.id}"', | |
"", | |
f" async def test_{function_name}(self):", | |
" # Load the connector module", | |
" module = await self.builder.load_connector_module(self.connector_id)", | |
" self.assertIsNotNone(module)", | |
"", | |
f" # Create a client for {connector.name}", | |
f" client_class = getattr(module, '{connector.name.replace(' ', '')}Client')", | |
f" config = {json.dumps(connector.config, indent=4)}", | |
" async with client_class(config) as client:", | |
] | |
# Add parameter values based on their types | |
params = [] | |
for param in func.parameters: | |
if param.type == "str": | |
params.append(f'{param.name}="test_{param.name}"') | |
elif param.type == "int": | |
params.append(f"{param.name}=1") | |
elif param.type == "bool": | |
params.append(f"{param.name}=True") | |
elif param.type == "float": | |
params.append(f"{param.name}=1.0") | |
elif param.type == "List": | |
params.append(f"{param.name}=[]") | |
elif param.type == "Dict": | |
params.append(f"{param.name}={{}}") | |
else: | |
params.append(f'{param.name}=None') | |
param_str = ", ".join(params) | |
test_code.extend([ | |
f" result = await client.{function_name}({param_str})", | |
" # Add assertions here based on expected response", | |
" self.assertIsNotNone(result)", | |
"", | |
" async def asyncTearDown(self):", | |
" await self.redis.close()", | |
"", | |
"if __name__ == '__main__':", | |
" unittest.main()" | |
]) | |
return "\n".join(test_code) | |
async def run_test(self, test_code: str) -> Dict[str, Any]: | |
"""Run a test by executing the test code""" | |
# In a real implementation, this would execute the test in a sandbox | |
# For demonstration, we'll just report success | |
return { | |
"success": True, | |
"message": "Test passed successfully", | |
"details": { | |
"ran": 1, | |
"errors": 0, | |
"failures": 0 | |
} | |
} | |
async def test_connector(self, connector_id: str) -> Dict[str, Any]: | |
"""Test all functions in a connector""" | |
connector = await self.registry.get(connector_id) | |
if not connector: | |
return {"success": False, "message": "Connector not found"} | |
results = {} | |
for func_name in connector.functions: | |
test_code = await self.generate_test(connector_id, func_name) | |
results[func_name] = await self.run_test(test_code) | |
# Update the function's success metrics | |
func = connector.functions[func_name] | |
func.update_metrics(results[func_name]["success"]) | |
# Save the updated connector | |
await self.registry.save(connector) | |
return { | |
"connector_id": connector_id, | |
"success": all(r["success"] for r in results.values()), | |
"results": results | |
} | |
# ========== Main Application ========== | |
class AdaptiveConnectorFramework: | |
"""Main application for the Adaptive Connector Framework""" | |
def __init__(self, redis_url: str = "redis://localhost:6379"): | |
"""Initialize the framework""" | |
self.redis_url = redis_url | |
self.redis_client = None | |
self.registry = None | |
self.builder = None | |
self.test_suite = None | |
self.schema_extractor = None | |
async def initialize(self) -> None: | |
"""Initialize all components""" | |
self.redis_client = redis.Redis.from_url(self.redis_url) | |
self.registry = ConnectorRegistry(self.redis_client) | |
self.builder = ConnectorBuilder(self.registry, self.redis_client) | |
self.test_suite = TestSuite(self.registry) | |
self.schema_extractor = SchemaExtractor() | |
# Load existing connectors | |
await self.registry.load_all() | |
async def create_connector_from_docs( | |
self, | |
name: str, | |
description: str, | |
base_url: str, | |
docs_text: str | |
) -> Dict[str, Any]: | |
"""Create a connector from documentation text""" | |
# Extract schema from docs | |
endpoints = await self.schema_extractor.extract_from_text(docs_text) | |
# Generate connector | |
connector = await self.schema_extractor.generate_connector_from_schema( | |
builder=self.builder, | |
name=name, | |
description=description, | |
base_url=base_url, | |
endpoints=endpoints | |
) | |
if not connector: | |
return {"success": False, "message": "Failed to create connector"} | |
# Test the connector | |
test_results = await self.test_suite.test_connector(connector.id) | |
return { | |
"success": True, | |
"connector_id": connector.id, | |
"name": connector.name, | |
"function_count": len(connector.functions), | |
"test_results": test_results | |
} | |
async def close(self) -> None: | |
"""Close all connections""" | |
if self.redis_client: | |
await self.redis_client.close() | |
# ========== Usage Example ========== | |
async def main(): | |
"""Example usage of the framework""" | |
# Initialize the framework | |
framework = AdaptiveConnectorFramework() | |
await framework.initialize() | |
# Example: Create a weather API connector from docs | |
weather_docs = """ | |
GET /api/weather/{city} | |
Returns current weather for a city. | |
Parameters: | |
city (string): Name of the city to get weather for | |
units (string): Units to use (metric or imperial) | |
Example response: | |
{ | |
"temperature": 22.5, | |
"conditions": "Partly Cloudy", | |
"humidity": 65 | |
} | |
POST /api/weather/subscribe | |
Subscribe to weather alerts for a location. | |
Parameters: | |
city (string): Name of the city | |
email (string): Email address to send alerts to | |
alert_type (string): Type of alerts (severe, all) | |
""" | |
result = await framework.create_connector_from_docs( | |
name="Weather API", | |
description="Simple weather forecast API", | |
base_url="https://api.example.com", | |
docs_text=weather_docs | |
) | |
print(json.dumps(result, indent=2)) | |
# Close the framework | |
await framework.close() | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment