This implementation plan details the creation of the Authorization & Access Service (AAS), a centralized abstraction layer for managing access to shared resources across the Mindbots ecosystem. The AAS incorporates patterns from the AI Delegation Service to support both human users and AI agents accessing shared resources.
The AI Delegation Service provides critical patterns that enhance our AAS design:
- AI agents can access resources on behalf of users
- Delegation tracked with unique IDs and scoped permissions
- Consent management built into the flow
- Delegated tokens contain special claims (delegation_id, allowed_contexts)
- Tokens can be revoked instantly
- Audit trail of all delegations
allowed_tags_or_contexts
pattern for fine-grained control- Supports wildcards (
*
) for broad access - Context-aware permission checking
1. Create Service Structure
cd repos/mindbots-services
mkdir authorization-access-service
cd authorization-access-service
# Initialize Poetry project
poetry init --name authorization-access-service \
--description "Centralized authorization and access service for shared resources" \
--author "Mindbots Team" \
--python "^3.11"
# Add core dependencies
poetry add fastapi uvicorn pydantic boto3 aioboto3 httpx redis structlog
poetry add --group dev pytest pytest-asyncio pytest-cov black ruff mypy
2. Project Structure
authorization-access-service/
├── app/
│ ├── api/
│ │ └── v1/
│ │ ├── endpoints/
│ │ │ ├── resources.py # Unified resource endpoints
│ │ │ ├── health.py # Health checks
│ │ │ └── __init__.py
│ │ └── router.py
│ ├── core/
│ │ ├── config.py # Configuration management
│ │ ├── security.py # Auth middleware
│ │ ├── exceptions.py # Custom exceptions
│ │ └── logging.py # Structured logging
│ ├── models/
│ │ ├── resource.py # Unified resource model
│ │ ├── permission.py # Permission models
│ │ ├── delegation.py # AI delegation support
│ │ └── auth.py # Auth context models
│ ├── services/
│ │ ├── authorization_engine.py # Core auth logic
│ │ ├── resource_manager.py # Resource orchestration
│ │ ├── cache_manager.py # Redis caching
│ │ └── audit_logger.py # Audit trail
│ ├── adapters/
│ │ ├── base.py # Abstract adapter
│ │ ├── todo_adapter.py # Todo service adapter
│ │ └── __init__.py
│ ├── clients/
│ │ ├── base.py # Base client with retry
│ │ ├── sharing_service.py # Sharing service client
│ │ ├── todo_service.py # Todo service client
│ │ ├── delegation_service.py # AI delegation client
│ │ └── __init__.py
│ └── __init__.py
├── tests/
│ ├── unit/
│ ├── integration/
│ └── conftest.py
├── main.py
├── Dockerfile
├── docker-compose.yml
├── .env.example
└── pyproject.toml
1. Permission Model with AI Support
# app/models/permission.py
from enum import Enum
from typing import Optional, List, Dict, Any
from pydantic import BaseModel, Field
from datetime import datetime
class Permission(str, Enum):
"""Permission levels matching sharing service"""
VIEW = "view"
EDIT = "edit"
DELETE = "delete"
SHARE = "share"
OWNER = "owner" # Special permission for resource owners
class AuthorizationContext(BaseModel):
"""Context for authorization decisions"""
user_id: str
resource_type: str
resource_id: str
required_permission: Permission
is_ai_agent: bool = False
delegation_id: Optional[str] = None
allowed_contexts: List[str] = Field(default_factory=list)
class AuthorizationResult(BaseModel):
"""Result of authorization check"""
allowed: bool
reason: str # "owner", "shared", "delegated", "no_access"
permission_level: Optional[Permission] = None
restrictions: Dict[str, Any] = Field(default_factory=dict)
delegation_info: Optional[Dict[str, Any]] = None
cached: bool = False
cache_ttl: Optional[int] = None
def to_cache(self) -> Dict[str, Any]:
"""Serialize for caching"""
return self.model_dump(exclude={"cached"})
@classmethod
def from_cache(cls, data: Dict[str, Any]) -> "AuthorizationResult":
"""Deserialize from cache"""
result = cls(**data)
result.cached = True
return result
2. Unified Resource Model
# app/models/resource.py
from enum import Enum
from typing import Optional, Dict, Any, List
from pydantic import BaseModel, Field
from datetime import datetime
class ResourceType(str, Enum):
"""Supported resource types"""
TASK = "task"
PROJECT = "project"
CHAT = "chat"
DOCUMENT = "document"
class ResourceFilter(str, Enum):
"""Filter options for listing resources"""
OWNED = "owned"
SHARED = "shared"
ALL = "all"
class Resource(BaseModel):
"""Unified resource representation"""
id: str
type: ResourceType
attributes: Dict[str, Any] # Resource-specific data
owner_id: str
created_at: datetime
updated_at: datetime
# Optional fields populated based on context
permissions: Optional[Dict[str, bool]] = None
sharing_info: Optional[Dict[str, Any]] = None
is_shared: bool = False
permission_level: Optional[str] = None
delegation_info: Optional[Dict[str, Any]] = None
class Config:
json_encoders = {
datetime: lambda v: v.isoformat()
}
3. AI Delegation Integration
# app/models/delegation.py
from typing import List, Optional
from pydantic import BaseModel
from datetime import datetime
class DelegationContext(BaseModel):
"""AI delegation context from token"""
delegation_id: str
allowed_tags_or_contexts: List[str]
user_id: str # The human user who granted delegation
ai_agent_id: str # The AI agent using the delegation
granted_at: datetime
def has_context(self, required_context: str) -> bool:
"""Check if delegation includes required context"""
return (
"*" in self.allowed_tags_or_contexts or
required_context in self.allowed_tags_or_contexts
)
def can_access_resource_type(self, resource_type: str) -> bool:
"""Check if delegation allows access to resource type"""
contexts = [
f"read:{resource_type}",
f"write:{resource_type}",
f"all:{resource_type}",
resource_type,
"*"
]
return any(self.has_context(ctx) for ctx in contexts)
Complete Authorization Engine Implementation
# app/services/authorization_engine.py
import asyncio
from typing import Optional, List, Dict
from datetime import datetime
import structlog
from app.models.permission import (
Permission, AuthorizationContext, AuthorizationResult
)
from app.models.resource import ResourceType
from app.models.delegation import DelegationContext
from app.clients import (
SharingServiceClient, DelegationServiceClient
)
from app.services.cache_manager import CacheManager
from app.services.audit_logger import AuditLogger
logger = structlog.get_logger()
class AuthorizationEngine:
"""Core authorization logic with AI delegation support"""
def __init__(
self,
sharing_client: SharingServiceClient,
delegation_client: DelegationServiceClient,
cache_manager: CacheManager,
audit_logger: AuditLogger,
):
self.sharing_client = sharing_client
self.delegation_client = delegation_client
self.cache = cache_manager
self.audit = audit_logger
async def authorize(
self,
context: AuthorizationContext
) -> AuthorizationResult:
"""
Main authorization method supporting both humans and AI agents
"""
# Log authorization attempt
await self.audit.log_authorization_attempt(context)
# Check cache first
cache_key = self._build_cache_key(context)
cached_result = await self.cache.get(cache_key)
if cached_result:
result = AuthorizationResult.from_cache(cached_result)
await self.audit.log_authorization_result(context, result)
return result
# For AI agents, verify delegation first
if context.is_ai_agent:
delegation_result = await self._check_ai_delegation(context)
if not delegation_result.allowed:
await self._cache_result(cache_key, delegation_result, ttl=60)
await self.audit.log_authorization_result(context, delegation_result)
return delegation_result
# Check ownership
ownership_result = await self._check_ownership(context)
if ownership_result.allowed:
await self._cache_result(cache_key, ownership_result, ttl=300)
await self.audit.log_authorization_result(context, ownership_result)
return ownership_result
# Check sharing permissions
sharing_result = await self._check_sharing(context)
if sharing_result.allowed:
# For AI agents, combine delegation and sharing checks
if context.is_ai_agent:
sharing_result.delegation_info = delegation_result.delegation_info
await self._cache_result(cache_key, sharing_result, ttl=300)
await self.audit.log_authorization_result(context, sharing_result)
return sharing_result
# No access
no_access_result = AuthorizationResult(
allowed=False,
reason="no_access"
)
await self._cache_result(cache_key, no_access_result, ttl=60)
await self.audit.log_authorization_result(context, no_access_result)
return no_access_result
async def authorize_batch(
self,
user_id: str,
resource_type: ResourceType,
resource_ids: List[str],
required_permission: Permission,
is_ai_agent: bool = False,
delegation_id: Optional[str] = None,
allowed_contexts: List[str] = None
) -> Dict[str, AuthorizationResult]:
"""
Authorize multiple resources in parallel
"""
contexts = [
AuthorizationContext(
user_id=user_id,
resource_type=resource_type.value,
resource_id=resource_id,
required_permission=required_permission,
is_ai_agent=is_ai_agent,
delegation_id=delegation_id,
allowed_contexts=allowed_contexts or []
)
for resource_id in resource_ids
]
# Parallel authorization
results = await asyncio.gather(
*[self.authorize(ctx) for ctx in contexts],
return_exceptions=True
)
# Build result map
result_map = {}
for resource_id, result in zip(resource_ids, results):
if isinstance(result, Exception):
logger.error(
"Authorization error",
resource_id=resource_id,
error=str(result)
)
result_map[resource_id] = AuthorizationResult(
allowed=False,
reason="error"
)
else:
result_map[resource_id] = result
return result_map
async def _check_ai_delegation(
self,
context: AuthorizationContext
) -> AuthorizationResult:
"""Verify AI agent delegation"""
if not context.delegation_id:
return AuthorizationResult(
allowed=False,
reason="missing_delegation_id"
)
try:
# Get delegation details
delegation = await self.delegation_client.get_delegation(
user_id=context.user_id,
delegation_id=context.delegation_id
)
if not delegation or delegation.status != "granted":
return AuthorizationResult(
allowed=False,
reason="invalid_delegation"
)
# Create delegation context
delegation_ctx = DelegationContext(
delegation_id=delegation.delegation_id,
allowed_tags_or_contexts=delegation.allowed_tags_or_contexts,
user_id=delegation.user_id,
ai_agent_id=context.user_id, # In delegated context
granted_at=delegation.grant_timestamp
)
# Check if delegation allows resource type
if not delegation_ctx.can_access_resource_type(context.resource_type):
return AuthorizationResult(
allowed=False,
reason="delegation_scope_mismatch"
)
# Check permission level in delegation
permission_context = f"{context.required_permission}:{context.resource_type}"
if not delegation_ctx.has_context(permission_context):
# Fallback to read-only for view permission
if context.required_permission == Permission.VIEW:
read_context = f"read:{context.resource_type}"
if not delegation_ctx.has_context(read_context):
return AuthorizationResult(
allowed=False,
reason="insufficient_delegation_permission"
)
return AuthorizationResult(
allowed=True,
reason="delegated",
delegation_info={
"delegation_id": delegation.delegation_id,
"granted_by": delegation.user_id,
"granted_at": delegation.grant_timestamp.isoformat(),
"allowed_contexts": delegation.allowed_tags_or_contexts
}
)
except Exception as e:
logger.error(
"Delegation check failed",
delegation_id=context.delegation_id,
error=str(e)
)
return AuthorizationResult(
allowed=False,
reason="delegation_check_error"
)
async def _check_ownership(
self,
context: AuthorizationContext
) -> AuthorizationResult:
"""Check if user owns the resource"""
# This will be implemented by resource adapters
# For now, return not owned
return AuthorizationResult(
allowed=False,
reason="not_owner"
)
async def _check_sharing(
self,
context: AuthorizationContext
) -> AuthorizationResult:
"""Check sharing permissions"""
try:
share = await self.sharing_client.get_share(
entity_type=context.resource_type,
entity_id=context.resource_id,
shared_with_user_id=context.user_id
)
if not share:
return AuthorizationResult(
allowed=False,
reason="not_shared"
)
# Check permission level
if self._has_permission(share.permission_level, context.required_permission):
return AuthorizationResult(
allowed=True,
reason="shared",
permission_level=Permission(share.permission_level),
restrictions={
"shared_by": share.shared_by_user_id,
"shared_at": share.created_at.isoformat()
}
)
return AuthorizationResult(
allowed=False,
reason="insufficient_permission"
)
except Exception as e:
logger.error(
"Sharing check failed",
resource_id=context.resource_id,
error=str(e)
)
return AuthorizationResult(
allowed=False,
reason="sharing_check_error"
)
def _has_permission(
self,
granted_level: str,
required_level: Permission
) -> bool:
"""Check if granted permission satisfies required permission"""
permission_hierarchy = {
Permission.VIEW: 1,
Permission.EDIT: 2,
Permission.DELETE: 3,
Permission.SHARE: 4,
Permission.OWNER: 5
}
granted_value = permission_hierarchy.get(Permission(granted_level), 0)
required_value = permission_hierarchy.get(required_level, 5)
return granted_value >= required_value
def _build_cache_key(self, context: AuthorizationContext) -> str:
"""Build cache key for authorization result"""
key_parts = [
"auth",
context.user_id,
context.resource_type,
context.resource_id,
context.required_permission.value
]
if context.is_ai_agent and context.delegation_id:
key_parts.extend(["ai", context.delegation_id])
return ":".join(key_parts)
async def _cache_result(
self,
key: str,
result: AuthorizationResult,
ttl: int
) -> None:
"""Cache authorization result"""
result.cache_ttl = ttl
await self.cache.set(key, result.to_cache(), ttl=ttl)
1. Base Resource Adapter
# app/adapters/base.py
from abc import ABC, abstractmethod
from typing import Optional, List, Dict, Any
from app.models.resource import Resource, ResourceType
from app.models.auth import UserContext
class ResourceAdapter(ABC):
"""
Abstract base class for resource adapters.
Each resource type implements this interface.
"""
@abstractmethod
def get_resource_type(self) -> ResourceType:
"""Return the type of resource this adapter handles"""
pass
@abstractmethod
async def get_single(
self,
resource_id: str,
include_details: bool = True
) -> Optional[Resource]:
"""Fetch a single resource by ID"""
pass
@abstractmethod
async def get_batch(
self,
resource_ids: List[str],
include_details: bool = True
) -> List[Resource]:
"""Fetch multiple resources by IDs"""
pass
@abstractmethod
async def list_owned_by_user(
self,
user_id: str,
limit: int = 50,
offset: int = 0,
filters: Optional[Dict[str, Any]] = None
) -> List[Resource]:
"""List resources owned by a specific user"""
pass
@abstractmethod
async def check_ownership(
self,
user_id: str,
resource_id: str
) -> bool:
"""Check if user owns the resource"""
pass
def transform_to_resource(
self,
raw_data: Dict[str, Any]
) -> Resource:
"""
Transform service-specific data to unified Resource format.
Can be overridden by subclasses for custom transformation.
"""
return Resource(
id=raw_data.get("id"),
type=self.get_resource_type(),
attributes=raw_data,
owner_id=raw_data.get("user_id"),
created_at=raw_data.get("created_at"),
updated_at=raw_data.get("updated_at")
)
2. Todo Adapter Implementation
# app/adapters/todo_adapter.py
from typing import Optional, List, Dict, Any
import asyncio
import structlog
from app.adapters.base import ResourceAdapter
from app.models.resource import Resource, ResourceType
from app.clients.todo_service import TodoServiceClient
from app.core.exceptions import ResourceNotFoundError
logger = structlog.get_logger()
class TodoAdapter(ResourceAdapter):
"""Adapter for todo/task resources"""
def __init__(self, todo_client: TodoServiceClient):
self.client = todo_client
def get_resource_type(self) -> ResourceType:
return ResourceType.TASK
async def get_single(
self,
resource_id: str,
include_details: bool = True
) -> Optional[Resource]:
"""Fetch a single task by ID"""
try:
# Get task without user context (service-to-service)
task_data = await self.client.get_task_by_id(
task_id=resource_id,
use_service_auth=True
)
if not task_data:
return None
return self._transform_task_to_resource(task_data)
except ResourceNotFoundError:
return None
except Exception as e:
logger.error(
"Failed to fetch task",
task_id=resource_id,
error=str(e)
)
raise
async def get_batch(
self,
resource_ids: List[str],
include_details: bool = True
) -> List[Resource]:
"""Fetch multiple tasks by IDs"""
if not resource_ids:
return []
try:
# Use batch endpoint if available
tasks_data = await self.client.get_tasks_batch(
task_ids=resource_ids,
use_service_auth=True
)
return [
self._transform_task_to_resource(task)
for task in tasks_data
]
except Exception as e:
logger.error(
"Failed to fetch tasks batch",
task_ids=resource_ids,
error=str(e)
)
# Fallback to individual fetches
tasks = await asyncio.gather(
*[self.get_single(task_id) for task_id in resource_ids],
return_exceptions=True
)
return [
task for task in tasks
if isinstance(task, Resource) and task is not None
]
async def list_owned_by_user(
self,
user_id: str,
limit: int = 50,
offset: int = 0,
filters: Optional[Dict[str, Any]] = None
) -> List[Resource]:
"""List tasks owned by a user"""
try:
tasks_data = await self.client.list_user_tasks(
user_id=user_id,
limit=limit,
offset=offset,
filters=filters
)
return [
self._transform_task_to_resource(task)
for task in tasks_data
]
except Exception as e:
logger.error(
"Failed to list user tasks",
user_id=user_id,
error=str(e)
)
return []
async def check_ownership(
self,
user_id: str,
resource_id: str
) -> bool:
"""Check if user owns the task"""
try:
task = await self.get_single(resource_id, include_details=False)
return task is not None and task.owner_id == user_id
except Exception:
return False
def _transform_task_to_resource(self, task_data: Dict[str, Any]) -> Resource:
"""Transform todo service task to unified resource"""
# Extract core fields
resource = Resource(
id=task_data["id"],
type=ResourceType.TASK,
owner_id=task_data["user_id"],
created_at=task_data["created_at"],
updated_at=task_data["updated_at"],
attributes={
"title": task_data.get("title"),
"description": task_data.get("description"),
"status": task_data.get("status"),
"priority": task_data.get("priority"),
"due_date": task_data.get("due_date"),
"tags": task_data.get("tags", []),
"completed": task_data.get("completed", False),
# Include any custom fields
**{k: v for k, v in task_data.items()
if k not in ["id", "user_id", "created_at", "updated_at"]}
}
)
return resource
Unified Resource Endpoints
# app/api/v1/endpoints/resources.py
from fastapi import APIRouter, Depends, HTTPException, Query, Header
from typing import List, Optional, Dict
import structlog
from app.models.resource import Resource, ResourceType, ResourceFilter
from app.models.auth import UserContext
from app.models.requests import BatchResourceRequest, ResourceResponse
from app.services.resource_manager import ResourceManager
from app.core.security import get_current_user
from app.core.exceptions import ResourceNotFoundError, UnauthorizedError
router = APIRouter(prefix="/resources", tags=["resources"])
logger = structlog.get_logger()
@router.get("/{resource_type}/{resource_id}", response_model=ResourceResponse)
async def get_resource(
resource_type: ResourceType,
resource_id: str,
include: Optional[List[str]] = Query(None, description="Additional data to include"),
user_context: UserContext = Depends(get_current_user),
resource_manager: ResourceManager = Depends(),
):
"""
Get a single resource with permission check.
Optional includes:
- permissions: Include user's permissions for this resource
- owner: Include owner information
- shares: Include sharing information
- delegation: Include AI delegation info (if applicable)
"""
logger.info(
"Getting resource",
resource_type=resource_type,
resource_id=resource_id,
user_id=user_context.user_id,
is_ai_agent=user_context.is_ai_agent
)
try:
resource = await resource_manager.get_resource_with_permissions(
user_context=user_context,
resource_type=resource_type,
resource_id=resource_id,
includes=include or []
)
if not resource:
raise HTTPException(
status_code=404,
detail=f"{resource_type.value.capitalize()} not found"
)
return ResourceResponse(
data=resource,
meta={
"api_version": "1.0",
"resource_type": resource_type.value,
"user_context": {
"is_ai_agent": user_context.is_ai_agent,
"has_delegation": bool(user_context.delegation_id)
}
}
)
except UnauthorizedError as e:
raise HTTPException(status_code=403, detail=str(e))
except ResourceNotFoundError:
raise HTTPException(status_code=404, detail="Resource not found")
except Exception as e:
logger.error("Failed to get resource", error=str(e))
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/{resource_type}/batch", response_model=List[ResourceResponse])
async def get_resources_batch(
resource_type: ResourceType,
request: BatchResourceRequest,
user_context: UserContext = Depends(get_current_user),
resource_manager: ResourceManager = Depends(),
):
"""
Get multiple resources with permission filtering.
Maximum batch size: 50 resources
"""
if len(request.resource_ids) > 50:
raise HTTPException(
status_code=400,
detail="Batch size exceeds maximum of 50 resources"
)
logger.info(
"Getting resources batch",
resource_type=resource_type,
batch_size=len(request.resource_ids),
user_id=user_context.user_id,
is_ai_agent=user_context.is_ai_agent
)
try:
resources = await resource_manager.get_resources_batch_with_permissions(
user_context=user_context,
resource_type=resource_type,
resource_ids=request.resource_ids,
includes=request.include or []
)
return [
ResourceResponse(
data=resource,
meta={
"api_version": "1.0",
"resource_type": resource_type.value
}
)
for resource in resources
]
except Exception as e:
logger.error("Failed to get resources batch", error=str(e))
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/{resource_type}", response_model=List[ResourceResponse])
async def list_resources(
resource_type: ResourceType,
filter: ResourceFilter = Query(ResourceFilter.ALL),
limit: int = Query(50, le=100, ge=1),
offset: int = Query(0, ge=0),
include: Optional[List[str]] = Query(None),
user_context: UserContext = Depends(get_current_user),
resource_manager: ResourceManager = Depends(),
):
"""
List accessible resources of a specific type.
Filters:
- owned: Only resources owned by the user
- shared: Only resources shared with the user
- all: Both owned and shared resources (default)
For AI agents, only resources within their delegation scope are returned.
"""
logger.info(
"Listing resources",
resource_type=resource_type,
filter=filter,
limit=limit,
offset=offset,
user_id=user_context.user_id,
is_ai_agent=user_context.is_ai_agent
)
try:
resources = await resource_manager.list_accessible_resources(
user_context=user_context,
resource_type=resource_type,
filter=filter,
limit=limit,
offset=offset,
includes=include or []
)
return [
ResourceResponse(
data=resource,
meta={
"api_version": "1.0",
"resource_type": resource_type.value,
"filter": filter.value,
"pagination": {
"limit": limit,
"offset": offset,
"count": len(resources)
}
}
)
for resource in resources
]
except Exception as e:
logger.error("Failed to list resources", error=str(e))
raise HTTPException(status_code=500, detail="Internal server error")
# Health check endpoint
@router.get("/health", tags=["health"])
async def health_check():
"""Basic health check for the AAS"""
return {
"status": "healthy",
"service": "Authorization & Access Service",
"version": "1.0.0"
}
1. AAS Module
# tf_infra/modules/AuthorizationAccessService/main.tf
locals {
service_name = "${var.environment}-authorization-access-service"
}
# ECR Repository
resource "aws_ecr_repository" "aas" {
name = local.service_name
image_tag_mutability = "MUTABLE"
image_scanning_configuration {
scan_on_push = true
}
tags = merge(var.common_tags, {
Name = local.service_name
})
}
# Lambda Function
module "aas_lambda" {
source = "../Lambda"
function_name = local.service_name
description = "Authorization and Access Service for shared resources"
image_uri = "${aws_ecr_repository.aas.repository_url}:latest"
environment_variables = {
ENVIRONMENT = var.environment
LOG_LEVEL = var.log_level
# Service URLs
SHARING_SERVICE_URL = var.sharing_service_url
TODO_SERVICE_URL = var.todo_service_url
DELEGATION_SERVICE_URL = var.delegation_service_url
# Redis Cache
REDIS_ENDPOINT = module.redis_cache.endpoint
REDIS_PORT = module.redis_cache.port
# Feature Flags
ENABLE_CACHE = "true"
CACHE_TTL_SECONDS = "300"
ENABLE_AI_DELEGATION = "true"
}
vpc_config = {
subnet_ids = var.private_subnet_ids
security_group_ids = [aws_security_group.aas.id]
}
timeout = 30
memory_size = 512
tags = var.common_tags
}
# Redis Cache
module "redis_cache" {
source = "../ElastiCache"
cluster_id = "${local.service_name}-cache"
node_type = var.cache_node_type
num_cache_nodes = 1
engine_version = "7.0"
port = 6379
subnet_group_name = var.cache_subnet_group_name
security_group_ids = [aws_security_group.redis.id]
tags = var.common_tags
}
# Security Groups
resource "aws_security_group" "aas" {
name_prefix = "${local.service_name}-lambda-"
vpc_id = var.vpc_id
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
tags = merge(var.common_tags, {
Name = "${local.service_name}-lambda"
})
}
resource "aws_security_group" "redis" {
name_prefix = "${local.service_name}-redis-"
vpc_id = var.vpc_id
ingress {
from_port = 6379
to_port = 6379
protocol = "tcp"
security_groups = [aws_security_group.aas.id]
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
tags = merge(var.common_tags, {
Name = "${local.service_name}-redis"
})
}
# IAM Role Policies
resource "aws_iam_role_policy" "aas_permissions" {
name = "${local.service_name}-permissions"
role = module.aas_lambda.execution_role_id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"lambda:InvokeFunction"
]
Resource = [
var.sharing_service_arn,
var.todo_service_arn,
var.delegation_service_arn
]
},
{
Effect = "Allow"
Action = [
"elasticache:DescribeCacheClusters",
"elasticache:DescribeCacheNodes"
]
Resource = "*"
}
]
})
}
2. API Gateway Integration
# tf_infra/modules/AuthorizationAccessService/api_gateway.tf
# API Gateway Resources
resource "aws_api_gateway_resource" "aas_base" {
rest_api_id = var.api_gateway_id
parent_id = var.api_gateway_root_id
path_part = "aas"
}
resource "aws_api_gateway_resource" "api_version" {
rest_api_id = var.api_gateway_id
parent_id = aws_api_gateway_resource.aas_base.id
path_part = "api"
}
resource "aws_api_gateway_resource" "v1" {
rest_api_id = var.api_gateway_id
parent_id = aws_api_gateway_resource.api_version.id
path_part = "v1"
}
resource "aws_api_gateway_resource" "resources" {
rest_api_id = var.api_gateway_id
parent_id = aws_api_gateway_resource.v1.id
path_part = "resources"
}
# Proxy resource for all sub-paths
resource "aws_api_gateway_resource" "proxy" {
rest_api_id = var.api_gateway_id
parent_id = aws_api_gateway_resource.resources.id
path_part = "{proxy+}"
}
# Methods
resource "aws_api_gateway_method" "any" {
rest_api_id = var.api_gateway_id
resource_id = aws_api_gateway_resource.proxy.id
http_method = "ANY"
authorization = "CUSTOM"
authorizer_id = var.jwt_authorizer_id
}
# Lambda Integration
resource "aws_api_gateway_integration" "lambda" {
rest_api_id = var.api_gateway_id
resource_id = aws_api_gateway_resource.proxy.id
http_method = aws_api_gateway_method.any.http_method
integration_http_method = "POST"
type = "AWS_PROXY"
uri = module.aas_lambda.invoke_arn
}
# Lambda Permission for API Gateway
resource "aws_lambda_permission" "api_gateway" {
statement_id = "AllowAPIGatewayInvoke"
action = "lambda:InvokeFunction"
function_name = module.aas_lambda.function_name
principal = "apigateway.amazonaws.com"
source_arn = "${var.api_gateway_execution_arn}/*/*"
}
1. Unit Tests
# tests/unit/test_authorization_engine.py
import pytest
from unittest.mock import Mock, AsyncMock
from datetime import datetime
from app.services.authorization_engine import AuthorizationEngine
from app.models.permission import (
Permission, AuthorizationContext, AuthorizationResult
)
@pytest.mark.asyncio
async def test_authorize_owner_access():
"""Test that resource owners get full access"""
# Setup mocks
sharing_client = Mock()
delegation_client = Mock()
cache_manager = Mock()
cache_manager.get = AsyncMock(return_value=None)
cache_manager.set = AsyncMock()
audit_logger = Mock()
audit_logger.log_authorization_attempt = AsyncMock()
audit_logger.log_authorization_result = AsyncMock()
engine = AuthorizationEngine(
sharing_client, delegation_client, cache_manager, audit_logger
)
# Mock ownership check
engine._check_ownership = AsyncMock(
return_value=AuthorizationResult(
allowed=True,
reason="owner",
permission_level=Permission.OWNER
)
)
# Test
context = AuthorizationContext(
user_id="user123",
resource_type="task",
resource_id="task456",
required_permission=Permission.DELETE
)
result = await engine.authorize(context)
# Assertions
assert result.allowed is True
assert result.reason == "owner"
assert result.permission_level == Permission.OWNER
# Verify cache was set
cache_manager.set.assert_called_once()
# Verify audit logging
audit_logger.log_authorization_attempt.assert_called_once_with(context)
audit_logger.log_authorization_result.assert_called_once()
@pytest.mark.asyncio
async def test_authorize_ai_agent_with_delegation():
"""Test AI agent access with valid delegation"""
# Setup mocks
sharing_client = Mock()
delegation_client = Mock()
cache_manager = Mock()
cache_manager.get = AsyncMock(return_value=None)
cache_manager.set = AsyncMock()
audit_logger = Mock()
audit_logger.log_authorization_attempt = AsyncMock()
audit_logger.log_authorization_result = AsyncMock()
# Mock delegation check
delegation_client.get_delegation = AsyncMock(
return_value=Mock(
delegation_id="del123",
status="granted",
allowed_tags_or_contexts=["read:task", "write:task"],
user_id="user123",
grant_timestamp=datetime.utcnow()
)
)
engine = AuthorizationEngine(
sharing_client, delegation_client, cache_manager, audit_logger
)
# Test
context = AuthorizationContext(
user_id="ai-agent-123",
resource_type="task",
resource_id="task456",
required_permission=Permission.VIEW,
is_ai_agent=True,
delegation_id="del123"
)
result = await engine.authorize(context)
# Assertions
assert result.allowed is True
assert result.reason == "shared" # Or whatever the actual reason is
assert result.delegation_info is not None
assert result.delegation_info["delegation_id"] == "del123"
# More tests...
2. Integration Tests
# tests/integration/test_api_endpoints.py
import pytest
from httpx import AsyncClient
from fastapi.testclient import TestClient
from main import app
@pytest.mark.asyncio
async def test_get_shared_task():
"""Test getting a shared task through AAS"""
async with AsyncClient(app=app, base_url="http://test") as client:
# Setup test data
headers = {"Authorization": "Bearer test-token"}
# Test getting a resource
response = await client.get(
"/api/v1/resources/task/task123",
headers=headers
)
assert response.status_code == 200
data = response.json()
assert data["data"]["id"] == "task123"
assert data["data"]["type"] == "task"
@pytest.mark.asyncio
async def test_batch_resources_with_permissions():
"""Test batch endpoint filters by permissions"""
async with AsyncClient(app=app, base_url="http://test") as client:
headers = {"Authorization": "Bearer test-token"}
# Request batch of resources
response = await client.post(
"/api/v1/resources/task/batch",
headers=headers,
json={
"resource_ids": ["task1", "task2", "task3"],
"include": ["permissions", "shares"]
}
)
assert response.status_code == 200
data = response.json()
# Should only return accessible resources
assert len(data) <= 3
# Each resource should have permission info
for resource in data:
assert "permissions" in resource["data"]
Update Flutter to use AAS
// lib/services/authorization_access_service.dart
class AuthorizationAccessService {
final String baseUrl;
final AuthService authService;
AuthorizationAccessService({
required this.baseUrl,
required this.authService,
});
Future<Resource?> getResource({
required ResourceType type,
required String resourceId,
List<String> include = const [],
}) async {
final token = await authService.getAccessToken();
final response = await http.get(
Uri.parse('$baseUrl/api/v1/resources/${type.value}/$resourceId')
.replace(queryParameters: {
if (include.isNotEmpty) 'include': include,
}),
headers: {
'Authorization': 'Bearer $token',
'Content-Type': 'application/json',
},
);
if (response.statusCode == 200) {
final data = json.decode(response.body);
return Resource.fromJson(data['data']);
} else if (response.statusCode == 404) {
return null;
} else {
throw Exception('Failed to get resource: ${response.statusCode}');
}
}
Future<List<Resource>> getResourcesBatch({
required ResourceType type,
required List<String> resourceIds,
List<String> include = const [],
}) async {
final token = await authService.getAccessToken();
final response = await http.post(
Uri.parse('$baseUrl/api/v1/resources/${type.value}/batch'),
headers: {
'Authorization': 'Bearer $token',
'Content-Type': 'application/json',
},
body: json.encode({
'resource_ids': resourceIds,
'include': include,
}),
);
if (response.statusCode == 200) {
final data = json.decode(response.body) as List;
return data.map((item) => Resource.fromJson(item['data'])).toList();
} else {
throw Exception('Failed to get resources batch: ${response.statusCode}');
}
}
}
-
Security First
- Zero-trust model
- Single point of authorization
- Complete audit trail
- Support for AI agent delegation
-
Performance Optimized
- Multi-level caching
- Batch operations
- Circuit breakers
- Parallel authorization
-
Future Proof
- Easy to add new resource types
- Support for new permission models
- Ready for GraphQL/WebSocket
- AI/ML ready
-
Developer Friendly
- Consistent API
- Clear documentation
- Comprehensive testing
- Easy integration
- ✅ Shared tasks accessible in <200ms
- ✅ Support for 50+ item batches
- ✅ AI agents can access delegated resources
- ✅ Complete audit trail
- ✅ Zero security vulnerabilities
- ✅ 99.9% uptime
- Start Phase 1 implementation
- Set up development environment
- Create service structure
- Implement core components
- Deploy to development
- Begin integration testing