Skip to content

Instantly share code, notes, and snippets.

@kordless
Created April 10, 2025 14:53
Show Gist options
  • Save kordless/861c4f119e78cdb9de6f569f3d87a840 to your computer and use it in GitHub Desktop.
Save kordless/861c4f119e78cdb9de6f569f3d87a840 to your computer and use it in GitHub Desktop.
"""
Adaptive Connector Framework (ACF)
A self-bootstrapping alternative to MCP that dynamically builds and tests
connectors based on current needs. The system evolves its own capabilities
through iterative learning and testing.
Key components:
1. Registry - Manages available connectors and their capabilities
2. Connector Builder - Dynamically creates new connectors
3. Test Suite - Validates connectors as they're built
4. Schema Extractor - Learns API formats from documentation
5. Redis Integration - Provides persistent storage
"""
import asyncio
import inspect
import json
import logging
import re
import typing
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
import aiohttp
import redis.asyncio as redis
import importlib.util
import sys
import os
import uuid
from dataclasses import dataclass, field, asdict
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("adaptive_connector")
# ========== Data Models ==========
@dataclass
class Parameter:
"""Parameter definition for connector functions"""
name: str
type: str
description: str
required: bool = True
default: Any = None
@dataclass
class ConnectorFunction:
"""Definition of a callable function in a connector"""
name: str
description: str
parameters: List[Parameter] = field(default_factory=list)
return_type: str = "Any"
success_rate: float = 0.0
call_count: int = 0
success_count: int = 0
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for storage and serialization"""
return {
"name": self.name,
"description": self.description,
"parameters": [asdict(p) for p in self.parameters],
"return_type": self.return_type,
"success_rate": self.success_rate,
"call_count": self.call_count,
"success_count": self.success_count,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ConnectorFunction':
"""Create from dictionary"""
params_data = data.pop("parameters", [])
params = [Parameter(**p) for p in params_data]
return cls(**{**data, "parameters": params})
def update_metrics(self, success: bool) -> None:
"""Update success metrics after a call"""
self.call_count += 1
if success:
self.success_count += 1
self.success_rate = self.success_count / self.call_count if self.call_count else 0.0
@dataclass
class Connector:
"""A connector to an external service"""
id: str
name: str
description: str
version: str = "0.1.0"
functions: Dict[str, ConnectorFunction] = field(default_factory=dict)
config: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for storage"""
return {
"id": self.id,
"name": self.name,
"description": self.description,
"version": self.version,
"functions": {name: fn.to_dict() for name, fn in self.functions.items()},
"config": self.config,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Connector':
"""Create from dictionary"""
functions_data = data.pop("functions", {})
functions = {
name: ConnectorFunction.from_dict(fn_data)
for name, fn_data in functions_data.items()
}
return cls(**{**data, "functions": functions})
# ========== Registry ==========
class ConnectorRegistry:
"""Registry for managing connectors"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.connectors: Dict[str, Connector] = {}
async def load_all(self) -> None:
"""Load all connectors from Redis"""
keys = await self.redis.keys("connector:*")
for key in keys:
connector_id = key.decode('utf-8').split(':')[1]
await self.load(connector_id)
async def load(self, connector_id: str) -> Optional[Connector]:
"""Load a connector from Redis"""
data = await self.redis.get(f"connector:{connector_id}")
if data:
connector_dict = json.loads(data)
connector = Connector.from_dict(connector_dict)
self.connectors[connector_id] = connector
return connector
return None
async def save(self, connector: Connector) -> None:
"""Save a connector to Redis"""
self.connectors[connector.id] = connector
await self.redis.set(
f"connector:{connector.id}",
json.dumps(connector.to_dict())
)
async def list(self) -> List[Dict[str, Any]]:
"""List all registered connectors"""
return [
{"id": c.id, "name": c.name, "description": c.description, "version": c.version}
for c in self.connectors.values()
]
async def get(self, connector_id: str) -> Optional[Connector]:
"""Get a connector by ID"""
if connector_id in self.connectors:
return self.connectors[connector_id]
return await self.load(connector_id)
async def delete(self, connector_id: str) -> bool:
"""Delete a connector"""
if connector_id in self.connectors:
del self.connectors[connector_id]
return bool(await self.redis.delete(f"connector:{connector_id}"))
# ========== Connector Builder ==========
class ConnectorBuilder:
"""Dynamically builds connectors based on specifications"""
def __init__(self, registry: ConnectorRegistry, redis_client: redis.Redis):
self.registry = registry
self.redis = redis_client
async def create_connector(
self,
name: str,
description: str,
base_url: Optional[str] = None,
auth_type: Optional[str] = None,
auth_params: Optional[Dict[str, Any]] = None
) -> Connector:
"""Create a new connector with basic configuration"""
connector_id = str(uuid.uuid4())
config = {
"base_url": base_url,
"auth_type": auth_type,
"auth_params": auth_params or {}
}
connector = Connector(
id=connector_id,
name=name,
description=description,
config=config
)
await self.registry.save(connector)
return connector
async def create_function(
self,
connector_id: str,
name: str,
description: str,
parameters: List[Parameter],
return_type: str,
implementation: str
) -> Optional[ConnectorFunction]:
"""
Add a function to a connector with implementation code
Args:
connector_id: ID of the connector
name: Name of the function
description: Description of what the function does
parameters: List of Parameter objects
return_type: Return type as string
implementation: Python code for the function implementation
"""
connector = await self.registry.get(connector_id)
if not connector:
logger.error(f"Connector {connector_id} not found")
return None
# Create the function metadata
func_def = ConnectorFunction(
name=name,
description=description,
parameters=parameters,
return_type=return_type
)
# Store the implementation code
await self.redis.set(
f"connector:{connector_id}:function:{name}:impl",
implementation
)
# Update the connector
connector.functions[name] = func_def
await self.registry.save(connector)
return func_def
async def generate_client_module(self, connector_id: str) -> Optional[str]:
"""Generate a Python module for the connector"""
connector = await self.registry.get(connector_id)
if not connector:
return None
# Start building the module code
module_code = [
f'"""{connector.name} - {connector.description}"""',
"",
"import asyncio",
"import json",
"import aiohttp",
"from typing import Dict, Any, List, Optional, Union",
"",
f"class {connector.name.replace(' ', '')}Client:",
f' """Client for {connector.name}"""',
"",
" def __init__(self, config=None):",
" self.config = config or {}",
' self.base_url = self.config.get("base_url")',
" self.session = None",
"",
" async def __aenter__(self):",
" self.session = aiohttp.ClientSession()",
" return self",
"",
" async def __aexit__(self, exc_type, exc_val, exc_tb):",
" if self.session:",
" await self.session.close()",
""
]
# Add function implementations
for func_name, func_def in connector.functions.items():
impl = await self.redis.get(f"connector:{connector_id}:function:{func_name}:impl")
if not impl:
continue
# Extract the function code and indent it
impl_code = impl.decode('utf-8')
indented_impl = "\n ".join(impl_code.split('\n'))
# Generate the function signature
params_str = ", ".join([
f"{p.name}: {p.type}" + (f" = {p.default}" if p.default is not None else "")
for p in func_def.parameters
])
# Add the function to the module
module_code.extend([
f" async def {func_name}(self, {params_str}):",
f' """{func_def.description}"""',
f" {indented_impl}",
""
])
return "\n".join(module_code)
async def load_connector_module(self, connector_id: str) -> Optional[Any]:
"""Load the connector as a Python module"""
module_code = await self.generate_client_module(connector_id)
if not module_code:
return None
connector = await self.registry.get(connector_id)
if not connector:
return None
module_name = f"adaptive_connector_{connector.id}"
# Create a module from the code
spec = importlib.util.find_spec("adaptive_connector_base")
if spec is None:
# Create a base module if it doesn't exist
module_spec = importlib.machinery.ModuleSpec(
"adaptive_connector_base",
importlib.machinery.SourceFileLoader("adaptive_connector_base", "")
)
module = importlib.util.module_from_spec(module_spec)
sys.modules["adaptive_connector_base"] = module
# Create and load the module
module_spec = importlib.machinery.ModuleSpec(
module_name,
importlib.machinery.SourceFileLoader(module_name, "")
)
module = importlib.util.module_from_spec(module_spec)
sys.modules[module_name] = module
# Execute the module code in the module's namespace
exec(module_code, module.__dict__)
return module
# ========== Schema Extractor ==========
class SchemaExtractor:
"""Extracts API schemas from documentation"""
async def extract_from_openapi(self, openapi_json: Dict[str, Any]) -> Dict[str, Any]:
"""Extract schema from OpenAPI JSON"""
endpoints = {}
# Process paths and methods
paths = openapi_json.get("paths", {})
for path, methods in paths.items():
for method, details in methods.items():
if method in ["get", "post", "put", "delete", "patch"]:
endpoint_id = f"{method.upper()}_{path.replace('/', '_')}"
# Extract parameters
parameters = []
for param in details.get("parameters", []):
param_schema = param.get("schema", {})
parameters.append(Parameter(
name=param.get("name", ""),
type=param_schema.get("type", "string"),
description=param.get("description", ""),
required=param.get("required", False)
))
# Handle request body if present
if "requestBody" in details:
content = details["requestBody"].get("content", {})
schema = next(iter(content.values()), {}).get("schema", {})
if "properties" in schema:
for prop_name, prop in schema["properties"].items():
parameters.append(Parameter(
name=prop_name,
type=prop.get("type", "string"),
description=prop.get("description", ""),
required=prop_name in schema.get("required", [])
))
# Add to endpoints
endpoints[endpoint_id] = {
"path": path,
"method": method,
"summary": details.get("summary", ""),
"description": details.get("description", ""),
"parameters": parameters,
"response_schema": details.get("responses", {})
}
return endpoints
async def extract_from_text(self, api_docs: str) -> Dict[str, Any]:
"""Extract API schema from text documentation (basic implementation)"""
endpoints = {}
# Simple regex patterns to find endpoints
endpoint_pattern = r"(GET|POST|PUT|DELETE|PATCH)\s+(/[\w/{}]+)"
param_pattern = r"(\w+)\s*\((\w+)\):\s*([^\n]+)"
# Find endpoints
for match in re.finditer(endpoint_pattern, api_docs, re.IGNORECASE):
method, path = match.groups()
endpoint_id = f"{method.upper()}_{path.replace('/', '_')}"
# Find description (text after the endpoint until the next section)
start_pos = match.end()
next_section = api_docs.find("\n\n", start_pos)
if next_section == -1:
next_section = len(api_docs)
description = api_docs[start_pos:next_section].strip()
# Find parameters
parameters = []
for param_match in re.finditer(param_pattern, description):
name, typ, desc = param_match.groups()
parameters.append(Parameter(
name=name,
type=typ,
description=desc,
required=True # Assume required by default
))
endpoints[endpoint_id] = {
"path": path,
"method": method.upper(),
"description": description,
"parameters": parameters
}
return endpoints
async def generate_connector_from_schema(
self,
builder: ConnectorBuilder,
name: str,
description: str,
base_url: str,
endpoints: Dict[str, Any]
) -> Optional[Connector]:
"""Generate a connector from extracted schema"""
# Create the connector
connector = await builder.create_connector(
name=name,
description=description,
base_url=base_url
)
if not connector:
return None
# Create functions for each endpoint
for endpoint_id, endpoint in endpoints.items():
func_name = endpoint_id.lower()
# Convert endpoint parameters to function parameters
parameters = endpoint.get("parameters", [])
# Generate implementation code
path = endpoint["path"]
method = endpoint["method"].lower()
# Replace path parameters with f-string format
for param in parameters:
if f"{{{param.name}}}" in path:
path = path.replace(f"{{{param.name}}}", f"{{{param.name}}}")
# Build the implementation code
impl_code = [
"if not self.session:",
" self.session = aiohttp.ClientSession()",
"",
f'url = f"{self.base_url}{path}"',
""
]
# Add query parameters for GET
if method == "get":
query_params = [p for p in parameters if f"{{{p.name}}}" not in path]
if query_params:
impl_code.append("params = {")
for p in query_params:
impl_code.append(f' "{p.name}": {p.name},')
impl_code.append("}")
impl_code.append("")
impl_code.append("async with self.session.get(url, params=params) as response:")
else:
impl_code.append("async with self.session.get(url) as response:")
# Add body parameters for POST/PUT/PATCH
elif method in ["post", "put", "patch"]:
body_params = [p for p in parameters if f"{{{p.name}}}" not in path]
if body_params:
impl_code.append("data = {")
for p in body_params:
impl_code.append(f' "{p.name}": {p.name},')
impl_code.append("}")
impl_code.append("")
impl_code.append(f"async with self.session.{method}(url, json=data) as response:")
else:
impl_code.append(f"async with self.session.{method}(url) as response:")
# DELETE method
else:
impl_code.append("async with self.session.delete(url) as response:")
# Add response handling
impl_code.extend([
" response.raise_for_status()",
" return await response.json()"
])
# Create the function
await builder.create_function(
connector_id=connector.id,
name=func_name,
description=endpoint.get("description", ""),
parameters=[p for p in parameters],
return_type="Dict[str, Any]",
implementation="\n".join(impl_code)
)
return connector
# ========== Test Suite ==========
class TestSuite:
"""Tests connectors as they are built"""
def __init__(self, registry: ConnectorRegistry):
self.registry = registry
async def generate_test(self, connector_id: str, function_name: str) -> str:
"""Generate a test for a connector function"""
connector = await self.registry.get(connector_id)
if not connector or function_name not in connector.functions:
return "# Failed to generate test - connector or function not found"
func = connector.functions[function_name]
# Generate test code
test_code = [
"import asyncio",
"import unittest",
"from adaptive_connector import ConnectorRegistry, ConnectorBuilder",
"import redis.asyncio as redis",
"",
f"class Test{function_name.capitalize()}(unittest.IsolatedAsyncioTestCase):",
" async def asyncSetUp(self):",
" self.redis = redis.Redis()",
" self.registry = ConnectorRegistry(self.redis)",
" self.builder = ConnectorBuilder(self.registry, self.redis)",
f' self.connector_id = "{connector.id}"',
"",
f" async def test_{function_name}(self):",
" # Load the connector module",
" module = await self.builder.load_connector_module(self.connector_id)",
" self.assertIsNotNone(module)",
"",
f" # Create a client for {connector.name}",
f" client_class = getattr(module, '{connector.name.replace(' ', '')}Client')",
f" config = {json.dumps(connector.config, indent=4)}",
" async with client_class(config) as client:",
]
# Add parameter values based on their types
params = []
for param in func.parameters:
if param.type == "str":
params.append(f'{param.name}="test_{param.name}"')
elif param.type == "int":
params.append(f"{param.name}=1")
elif param.type == "bool":
params.append(f"{param.name}=True")
elif param.type == "float":
params.append(f"{param.name}=1.0")
elif param.type == "List":
params.append(f"{param.name}=[]")
elif param.type == "Dict":
params.append(f"{param.name}={{}}")
else:
params.append(f'{param.name}=None')
param_str = ", ".join(params)
test_code.extend([
f" result = await client.{function_name}({param_str})",
" # Add assertions here based on expected response",
" self.assertIsNotNone(result)",
"",
" async def asyncTearDown(self):",
" await self.redis.close()",
"",
"if __name__ == '__main__':",
" unittest.main()"
])
return "\n".join(test_code)
async def run_test(self, test_code: str) -> Dict[str, Any]:
"""Run a test by executing the test code"""
# In a real implementation, this would execute the test in a sandbox
# For demonstration, we'll just report success
return {
"success": True,
"message": "Test passed successfully",
"details": {
"ran": 1,
"errors": 0,
"failures": 0
}
}
async def test_connector(self, connector_id: str) -> Dict[str, Any]:
"""Test all functions in a connector"""
connector = await self.registry.get(connector_id)
if not connector:
return {"success": False, "message": "Connector not found"}
results = {}
for func_name in connector.functions:
test_code = await self.generate_test(connector_id, func_name)
results[func_name] = await self.run_test(test_code)
# Update the function's success metrics
func = connector.functions[func_name]
func.update_metrics(results[func_name]["success"])
# Save the updated connector
await self.registry.save(connector)
return {
"connector_id": connector_id,
"success": all(r["success"] for r in results.values()),
"results": results
}
# ========== Main Application ==========
class AdaptiveConnectorFramework:
"""Main application for the Adaptive Connector Framework"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
"""Initialize the framework"""
self.redis_url = redis_url
self.redis_client = None
self.registry = None
self.builder = None
self.test_suite = None
self.schema_extractor = None
async def initialize(self) -> None:
"""Initialize all components"""
self.redis_client = redis.Redis.from_url(self.redis_url)
self.registry = ConnectorRegistry(self.redis_client)
self.builder = ConnectorBuilder(self.registry, self.redis_client)
self.test_suite = TestSuite(self.registry)
self.schema_extractor = SchemaExtractor()
# Load existing connectors
await self.registry.load_all()
async def create_connector_from_docs(
self,
name: str,
description: str,
base_url: str,
docs_text: str
) -> Dict[str, Any]:
"""Create a connector from documentation text"""
# Extract schema from docs
endpoints = await self.schema_extractor.extract_from_text(docs_text)
# Generate connector
connector = await self.schema_extractor.generate_connector_from_schema(
builder=self.builder,
name=name,
description=description,
base_url=base_url,
endpoints=endpoints
)
if not connector:
return {"success": False, "message": "Failed to create connector"}
# Test the connector
test_results = await self.test_suite.test_connector(connector.id)
return {
"success": True,
"connector_id": connector.id,
"name": connector.name,
"function_count": len(connector.functions),
"test_results": test_results
}
async def close(self) -> None:
"""Close all connections"""
if self.redis_client:
await self.redis_client.close()
# ========== Usage Example ==========
async def main():
"""Example usage of the framework"""
# Initialize the framework
framework = AdaptiveConnectorFramework()
await framework.initialize()
# Example: Create a weather API connector from docs
weather_docs = """
GET /api/weather/{city}
Returns current weather for a city.
Parameters:
city (string): Name of the city to get weather for
units (string): Units to use (metric or imperial)
Example response:
{
"temperature": 22.5,
"conditions": "Partly Cloudy",
"humidity": 65
}
POST /api/weather/subscribe
Subscribe to weather alerts for a location.
Parameters:
city (string): Name of the city
email (string): Email address to send alerts to
alert_type (string): Type of alerts (severe, all)
"""
result = await framework.create_connector_from_docs(
name="Weather API",
description="Simple weather forecast API",
base_url="https://api.example.com",
docs_text=weather_docs
)
print(json.dumps(result, indent=2))
# Close the framework
await framework.close()
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment