Skip to content

Instantly share code, notes, and snippets.

@michael-krumm
Created June 6, 2025 14:49
Show Gist options
  • Save michael-krumm/ff816f5305bbe70b4135cb5eb623c75a to your computer and use it in GitHub Desktop.
Save michael-krumm/ff816f5305bbe70b4135cb5eb623c75a to your computer and use it in GitHub Desktop.
Authorization & Access Service Implementation Documentation for Flow Issue #51

Authorization & Access Service - Implementation Plan

Executive Summary

This implementation plan details the creation of the Authorization & Access Service (AAS), a centralized abstraction layer for managing access to shared resources across the Mindbots ecosystem. The AAS incorporates patterns from the AI Delegation Service to support both human users and AI agents accessing shared resources.

Key Integration: AI Delegation Service

The AI Delegation Service provides critical patterns that enhance our AAS design:

1. Delegated Access Model

  • AI agents can access resources on behalf of users
  • Delegation tracked with unique IDs and scoped permissions
  • Consent management built into the flow

2. Token Enhancement

  • Delegated tokens contain special claims (delegation_id, allowed_contexts)
  • Tokens can be revoked instantly
  • Audit trail of all delegations

3. Scope-Based Permissions

  • allowed_tags_or_contexts pattern for fine-grained control
  • Supports wildcards (*) for broad access
  • Context-aware permission checking

Implementation Roadmap

Phase 1: Core AAS Service (Week 1-2)

Day 1-2: Service Foundation

1. Create Service Structure

cd repos/mindbots-services
mkdir authorization-access-service
cd authorization-access-service

# Initialize Poetry project
poetry init --name authorization-access-service \
  --description "Centralized authorization and access service for shared resources" \
  --author "Mindbots Team" \
  --python "^3.11"

# Add core dependencies
poetry add fastapi uvicorn pydantic boto3 aioboto3 httpx redis structlog
poetry add --group dev pytest pytest-asyncio pytest-cov black ruff mypy

2. Project Structure

authorization-access-service/
├── app/
│   ├── api/
│   │   └── v1/
│   │       ├── endpoints/
│   │       │   ├── resources.py      # Unified resource endpoints
│   │       │   ├── health.py         # Health checks
│   │       │   └── __init__.py
│   │       └── router.py
│   ├── core/
│   │   ├── config.py                 # Configuration management
│   │   ├── security.py               # Auth middleware
│   │   ├── exceptions.py             # Custom exceptions
│   │   └── logging.py                # Structured logging
│   ├── models/
│   │   ├── resource.py               # Unified resource model
│   │   ├── permission.py             # Permission models
│   │   ├── delegation.py             # AI delegation support
│   │   └── auth.py                   # Auth context models
│   ├── services/
│   │   ├── authorization_engine.py   # Core auth logic
│   │   ├── resource_manager.py       # Resource orchestration
│   │   ├── cache_manager.py          # Redis caching
│   │   └── audit_logger.py           # Audit trail
│   ├── adapters/
│   │   ├── base.py                   # Abstract adapter
│   │   ├── todo_adapter.py           # Todo service adapter
│   │   └── __init__.py
│   ├── clients/
│   │   ├── base.py                   # Base client with retry
│   │   ├── sharing_service.py        # Sharing service client
│   │   ├── todo_service.py           # Todo service client
│   │   ├── delegation_service.py     # AI delegation client
│   │   └── __init__.py
│   └── __init__.py
├── tests/
│   ├── unit/
│   ├── integration/
│   └── conftest.py
├── main.py
├── Dockerfile
├── docker-compose.yml
├── .env.example
└── pyproject.toml

Day 3-4: Core Models & Configuration

1. Permission Model with AI Support

# app/models/permission.py
from enum import Enum
from typing import Optional, List, Dict, Any
from pydantic import BaseModel, Field
from datetime import datetime

class Permission(str, Enum):
    """Permission levels matching sharing service"""
    VIEW = "view"
    EDIT = "edit"
    DELETE = "delete"
    SHARE = "share"
    OWNER = "owner"  # Special permission for resource owners

class AuthorizationContext(BaseModel):
    """Context for authorization decisions"""
    user_id: str
    resource_type: str
    resource_id: str
    required_permission: Permission
    is_ai_agent: bool = False
    delegation_id: Optional[str] = None
    allowed_contexts: List[str] = Field(default_factory=list)

class AuthorizationResult(BaseModel):
    """Result of authorization check"""
    allowed: bool
    reason: str  # "owner", "shared", "delegated", "no_access"
    permission_level: Optional[Permission] = None
    restrictions: Dict[str, Any] = Field(default_factory=dict)
    delegation_info: Optional[Dict[str, Any]] = None
    cached: bool = False
    cache_ttl: Optional[int] = None

    def to_cache(self) -> Dict[str, Any]:
        """Serialize for caching"""
        return self.model_dump(exclude={"cached"})

    @classmethod
    def from_cache(cls, data: Dict[str, Any]) -> "AuthorizationResult":
        """Deserialize from cache"""
        result = cls(**data)
        result.cached = True
        return result

2. Unified Resource Model

# app/models/resource.py
from enum import Enum
from typing import Optional, Dict, Any, List
from pydantic import BaseModel, Field
from datetime import datetime

class ResourceType(str, Enum):
    """Supported resource types"""
    TASK = "task"
    PROJECT = "project"
    CHAT = "chat"
    DOCUMENT = "document"

class ResourceFilter(str, Enum):
    """Filter options for listing resources"""
    OWNED = "owned"
    SHARED = "shared"
    ALL = "all"

class Resource(BaseModel):
    """Unified resource representation"""
    id: str
    type: ResourceType
    attributes: Dict[str, Any]  # Resource-specific data
    owner_id: str
    created_at: datetime
    updated_at: datetime

    # Optional fields populated based on context
    permissions: Optional[Dict[str, bool]] = None
    sharing_info: Optional[Dict[str, Any]] = None
    is_shared: bool = False
    permission_level: Optional[str] = None
    delegation_info: Optional[Dict[str, Any]] = None

    class Config:
        json_encoders = {
            datetime: lambda v: v.isoformat()
        }

3. AI Delegation Integration

# app/models/delegation.py
from typing import List, Optional
from pydantic import BaseModel
from datetime import datetime

class DelegationContext(BaseModel):
    """AI delegation context from token"""
    delegation_id: str
    allowed_tags_or_contexts: List[str]
    user_id: str  # The human user who granted delegation
    ai_agent_id: str  # The AI agent using the delegation
    granted_at: datetime

    def has_context(self, required_context: str) -> bool:
        """Check if delegation includes required context"""
        return (
            "*" in self.allowed_tags_or_contexts or
            required_context in self.allowed_tags_or_contexts
        )

    def can_access_resource_type(self, resource_type: str) -> bool:
        """Check if delegation allows access to resource type"""
        contexts = [
            f"read:{resource_type}",
            f"write:{resource_type}",
            f"all:{resource_type}",
            resource_type,
            "*"
        ]
        return any(self.has_context(ctx) for ctx in contexts)

Day 5-6: Authorization Engine

Complete Authorization Engine Implementation

# app/services/authorization_engine.py
import asyncio
from typing import Optional, List, Dict
from datetime import datetime
import structlog

from app.models.permission import (
    Permission, AuthorizationContext, AuthorizationResult
)
from app.models.resource import ResourceType
from app.models.delegation import DelegationContext
from app.clients import (
    SharingServiceClient, DelegationServiceClient
)
from app.services.cache_manager import CacheManager
from app.services.audit_logger import AuditLogger

logger = structlog.get_logger()

class AuthorizationEngine:
    """Core authorization logic with AI delegation support"""

    def __init__(
        self,
        sharing_client: SharingServiceClient,
        delegation_client: DelegationServiceClient,
        cache_manager: CacheManager,
        audit_logger: AuditLogger,
    ):
        self.sharing_client = sharing_client
        self.delegation_client = delegation_client
        self.cache = cache_manager
        self.audit = audit_logger

    async def authorize(
        self,
        context: AuthorizationContext
    ) -> AuthorizationResult:
        """
        Main authorization method supporting both humans and AI agents
        """
        # Log authorization attempt
        await self.audit.log_authorization_attempt(context)

        # Check cache first
        cache_key = self._build_cache_key(context)
        cached_result = await self.cache.get(cache_key)
        if cached_result:
            result = AuthorizationResult.from_cache(cached_result)
            await self.audit.log_authorization_result(context, result)
            return result

        # For AI agents, verify delegation first
        if context.is_ai_agent:
            delegation_result = await self._check_ai_delegation(context)
            if not delegation_result.allowed:
                await self._cache_result(cache_key, delegation_result, ttl=60)
                await self.audit.log_authorization_result(context, delegation_result)
                return delegation_result

        # Check ownership
        ownership_result = await self._check_ownership(context)
        if ownership_result.allowed:
            await self._cache_result(cache_key, ownership_result, ttl=300)
            await self.audit.log_authorization_result(context, ownership_result)
            return ownership_result

        # Check sharing permissions
        sharing_result = await self._check_sharing(context)
        if sharing_result.allowed:
            # For AI agents, combine delegation and sharing checks
            if context.is_ai_agent:
                sharing_result.delegation_info = delegation_result.delegation_info
            await self._cache_result(cache_key, sharing_result, ttl=300)
            await self.audit.log_authorization_result(context, sharing_result)
            return sharing_result

        # No access
        no_access_result = AuthorizationResult(
            allowed=False,
            reason="no_access"
        )
        await self._cache_result(cache_key, no_access_result, ttl=60)
        await self.audit.log_authorization_result(context, no_access_result)
        return no_access_result

    async def authorize_batch(
        self,
        user_id: str,
        resource_type: ResourceType,
        resource_ids: List[str],
        required_permission: Permission,
        is_ai_agent: bool = False,
        delegation_id: Optional[str] = None,
        allowed_contexts: List[str] = None
    ) -> Dict[str, AuthorizationResult]:
        """
        Authorize multiple resources in parallel
        """
        contexts = [
            AuthorizationContext(
                user_id=user_id,
                resource_type=resource_type.value,
                resource_id=resource_id,
                required_permission=required_permission,
                is_ai_agent=is_ai_agent,
                delegation_id=delegation_id,
                allowed_contexts=allowed_contexts or []
            )
            for resource_id in resource_ids
        ]

        # Parallel authorization
        results = await asyncio.gather(
            *[self.authorize(ctx) for ctx in contexts],
            return_exceptions=True
        )

        # Build result map
        result_map = {}
        for resource_id, result in zip(resource_ids, results):
            if isinstance(result, Exception):
                logger.error(
                    "Authorization error",
                    resource_id=resource_id,
                    error=str(result)
                )
                result_map[resource_id] = AuthorizationResult(
                    allowed=False,
                    reason="error"
                )
            else:
                result_map[resource_id] = result

        return result_map

    async def _check_ai_delegation(
        self,
        context: AuthorizationContext
    ) -> AuthorizationResult:
        """Verify AI agent delegation"""
        if not context.delegation_id:
            return AuthorizationResult(
                allowed=False,
                reason="missing_delegation_id"
            )

        try:
            # Get delegation details
            delegation = await self.delegation_client.get_delegation(
                user_id=context.user_id,
                delegation_id=context.delegation_id
            )

            if not delegation or delegation.status != "granted":
                return AuthorizationResult(
                    allowed=False,
                    reason="invalid_delegation"
                )

            # Create delegation context
            delegation_ctx = DelegationContext(
                delegation_id=delegation.delegation_id,
                allowed_tags_or_contexts=delegation.allowed_tags_or_contexts,
                user_id=delegation.user_id,
                ai_agent_id=context.user_id,  # In delegated context
                granted_at=delegation.grant_timestamp
            )

            # Check if delegation allows resource type
            if not delegation_ctx.can_access_resource_type(context.resource_type):
                return AuthorizationResult(
                    allowed=False,
                    reason="delegation_scope_mismatch"
                )

            # Check permission level in delegation
            permission_context = f"{context.required_permission}:{context.resource_type}"
            if not delegation_ctx.has_context(permission_context):
                # Fallback to read-only for view permission
                if context.required_permission == Permission.VIEW:
                    read_context = f"read:{context.resource_type}"
                    if not delegation_ctx.has_context(read_context):
                        return AuthorizationResult(
                            allowed=False,
                            reason="insufficient_delegation_permission"
                        )

            return AuthorizationResult(
                allowed=True,
                reason="delegated",
                delegation_info={
                    "delegation_id": delegation.delegation_id,
                    "granted_by": delegation.user_id,
                    "granted_at": delegation.grant_timestamp.isoformat(),
                    "allowed_contexts": delegation.allowed_tags_or_contexts
                }
            )

        except Exception as e:
            logger.error(
                "Delegation check failed",
                delegation_id=context.delegation_id,
                error=str(e)
            )
            return AuthorizationResult(
                allowed=False,
                reason="delegation_check_error"
            )

    async def _check_ownership(
        self,
        context: AuthorizationContext
    ) -> AuthorizationResult:
        """Check if user owns the resource"""
        # This will be implemented by resource adapters
        # For now, return not owned
        return AuthorizationResult(
            allowed=False,
            reason="not_owner"
        )

    async def _check_sharing(
        self,
        context: AuthorizationContext
    ) -> AuthorizationResult:
        """Check sharing permissions"""
        try:
            share = await self.sharing_client.get_share(
                entity_type=context.resource_type,
                entity_id=context.resource_id,
                shared_with_user_id=context.user_id
            )

            if not share:
                return AuthorizationResult(
                    allowed=False,
                    reason="not_shared"
                )

            # Check permission level
            if self._has_permission(share.permission_level, context.required_permission):
                return AuthorizationResult(
                    allowed=True,
                    reason="shared",
                    permission_level=Permission(share.permission_level),
                    restrictions={
                        "shared_by": share.shared_by_user_id,
                        "shared_at": share.created_at.isoformat()
                    }
                )

            return AuthorizationResult(
                allowed=False,
                reason="insufficient_permission"
            )

        except Exception as e:
            logger.error(
                "Sharing check failed",
                resource_id=context.resource_id,
                error=str(e)
            )
            return AuthorizationResult(
                allowed=False,
                reason="sharing_check_error"
            )

    def _has_permission(
        self,
        granted_level: str,
        required_level: Permission
    ) -> bool:
        """Check if granted permission satisfies required permission"""
        permission_hierarchy = {
            Permission.VIEW: 1,
            Permission.EDIT: 2,
            Permission.DELETE: 3,
            Permission.SHARE: 4,
            Permission.OWNER: 5
        }

        granted_value = permission_hierarchy.get(Permission(granted_level), 0)
        required_value = permission_hierarchy.get(required_level, 5)

        return granted_value >= required_value

    def _build_cache_key(self, context: AuthorizationContext) -> str:
        """Build cache key for authorization result"""
        key_parts = [
            "auth",
            context.user_id,
            context.resource_type,
            context.resource_id,
            context.required_permission.value
        ]

        if context.is_ai_agent and context.delegation_id:
            key_parts.extend(["ai", context.delegation_id])

        return ":".join(key_parts)

    async def _cache_result(
        self,
        key: str,
        result: AuthorizationResult,
        ttl: int
    ) -> None:
        """Cache authorization result"""
        result.cache_ttl = ttl
        await self.cache.set(key, result.to_cache(), ttl=ttl)

Day 7-8: Resource Adapters

1. Base Resource Adapter

# app/adapters/base.py
from abc import ABC, abstractmethod
from typing import Optional, List, Dict, Any
from app.models.resource import Resource, ResourceType
from app.models.auth import UserContext

class ResourceAdapter(ABC):
    """
    Abstract base class for resource adapters.
    Each resource type implements this interface.
    """

    @abstractmethod
    def get_resource_type(self) -> ResourceType:
        """Return the type of resource this adapter handles"""
        pass

    @abstractmethod
    async def get_single(
        self,
        resource_id: str,
        include_details: bool = True
    ) -> Optional[Resource]:
        """Fetch a single resource by ID"""
        pass

    @abstractmethod
    async def get_batch(
        self,
        resource_ids: List[str],
        include_details: bool = True
    ) -> List[Resource]:
        """Fetch multiple resources by IDs"""
        pass

    @abstractmethod
    async def list_owned_by_user(
        self,
        user_id: str,
        limit: int = 50,
        offset: int = 0,
        filters: Optional[Dict[str, Any]] = None
    ) -> List[Resource]:
        """List resources owned by a specific user"""
        pass

    @abstractmethod
    async def check_ownership(
        self,
        user_id: str,
        resource_id: str
    ) -> bool:
        """Check if user owns the resource"""
        pass

    def transform_to_resource(
        self,
        raw_data: Dict[str, Any]
    ) -> Resource:
        """
        Transform service-specific data to unified Resource format.
        Can be overridden by subclasses for custom transformation.
        """
        return Resource(
            id=raw_data.get("id"),
            type=self.get_resource_type(),
            attributes=raw_data,
            owner_id=raw_data.get("user_id"),
            created_at=raw_data.get("created_at"),
            updated_at=raw_data.get("updated_at")
        )

2. Todo Adapter Implementation

# app/adapters/todo_adapter.py
from typing import Optional, List, Dict, Any
import asyncio
import structlog

from app.adapters.base import ResourceAdapter
from app.models.resource import Resource, ResourceType
from app.clients.todo_service import TodoServiceClient
from app.core.exceptions import ResourceNotFoundError

logger = structlog.get_logger()

class TodoAdapter(ResourceAdapter):
    """Adapter for todo/task resources"""

    def __init__(self, todo_client: TodoServiceClient):
        self.client = todo_client

    def get_resource_type(self) -> ResourceType:
        return ResourceType.TASK

    async def get_single(
        self,
        resource_id: str,
        include_details: bool = True
    ) -> Optional[Resource]:
        """Fetch a single task by ID"""
        try:
            # Get task without user context (service-to-service)
            task_data = await self.client.get_task_by_id(
                task_id=resource_id,
                use_service_auth=True
            )

            if not task_data:
                return None

            return self._transform_task_to_resource(task_data)

        except ResourceNotFoundError:
            return None
        except Exception as e:
            logger.error(
                "Failed to fetch task",
                task_id=resource_id,
                error=str(e)
            )
            raise

    async def get_batch(
        self,
        resource_ids: List[str],
        include_details: bool = True
    ) -> List[Resource]:
        """Fetch multiple tasks by IDs"""
        if not resource_ids:
            return []

        try:
            # Use batch endpoint if available
            tasks_data = await self.client.get_tasks_batch(
                task_ids=resource_ids,
                use_service_auth=True
            )

            return [
                self._transform_task_to_resource(task)
                for task in tasks_data
            ]

        except Exception as e:
            logger.error(
                "Failed to fetch tasks batch",
                task_ids=resource_ids,
                error=str(e)
            )
            # Fallback to individual fetches
            tasks = await asyncio.gather(
                *[self.get_single(task_id) for task_id in resource_ids],
                return_exceptions=True
            )

            return [
                task for task in tasks
                if isinstance(task, Resource) and task is not None
            ]

    async def list_owned_by_user(
        self,
        user_id: str,
        limit: int = 50,
        offset: int = 0,
        filters: Optional[Dict[str, Any]] = None
    ) -> List[Resource]:
        """List tasks owned by a user"""
        try:
            tasks_data = await self.client.list_user_tasks(
                user_id=user_id,
                limit=limit,
                offset=offset,
                filters=filters
            )

            return [
                self._transform_task_to_resource(task)
                for task in tasks_data
            ]

        except Exception as e:
            logger.error(
                "Failed to list user tasks",
                user_id=user_id,
                error=str(e)
            )
            return []

    async def check_ownership(
        self,
        user_id: str,
        resource_id: str
    ) -> bool:
        """Check if user owns the task"""
        try:
            task = await self.get_single(resource_id, include_details=False)
            return task is not None and task.owner_id == user_id

        except Exception:
            return False

    def _transform_task_to_resource(self, task_data: Dict[str, Any]) -> Resource:
        """Transform todo service task to unified resource"""
        # Extract core fields
        resource = Resource(
            id=task_data["id"],
            type=ResourceType.TASK,
            owner_id=task_data["user_id"],
            created_at=task_data["created_at"],
            updated_at=task_data["updated_at"],
            attributes={
                "title": task_data.get("title"),
                "description": task_data.get("description"),
                "status": task_data.get("status"),
                "priority": task_data.get("priority"),
                "due_date": task_data.get("due_date"),
                "tags": task_data.get("tags", []),
                "completed": task_data.get("completed", False),
                # Include any custom fields
                **{k: v for k, v in task_data.items()
                   if k not in ["id", "user_id", "created_at", "updated_at"]}
            }
        )

        return resource

Day 9-10: API Implementation

Unified Resource Endpoints

# app/api/v1/endpoints/resources.py
from fastapi import APIRouter, Depends, HTTPException, Query, Header
from typing import List, Optional, Dict
import structlog

from app.models.resource import Resource, ResourceType, ResourceFilter
from app.models.auth import UserContext
from app.models.requests import BatchResourceRequest, ResourceResponse
from app.services.resource_manager import ResourceManager
from app.core.security import get_current_user
from app.core.exceptions import ResourceNotFoundError, UnauthorizedError

router = APIRouter(prefix="/resources", tags=["resources"])
logger = structlog.get_logger()

@router.get("/{resource_type}/{resource_id}", response_model=ResourceResponse)
async def get_resource(
    resource_type: ResourceType,
    resource_id: str,
    include: Optional[List[str]] = Query(None, description="Additional data to include"),
    user_context: UserContext = Depends(get_current_user),
    resource_manager: ResourceManager = Depends(),
):
    """
    Get a single resource with permission check.

    Optional includes:
    - permissions: Include user's permissions for this resource
    - owner: Include owner information
    - shares: Include sharing information
    - delegation: Include AI delegation info (if applicable)
    """
    logger.info(
        "Getting resource",
        resource_type=resource_type,
        resource_id=resource_id,
        user_id=user_context.user_id,
        is_ai_agent=user_context.is_ai_agent
    )

    try:
        resource = await resource_manager.get_resource_with_permissions(
            user_context=user_context,
            resource_type=resource_type,
            resource_id=resource_id,
            includes=include or []
        )

        if not resource:
            raise HTTPException(
                status_code=404,
                detail=f"{resource_type.value.capitalize()} not found"
            )

        return ResourceResponse(
            data=resource,
            meta={
                "api_version": "1.0",
                "resource_type": resource_type.value,
                "user_context": {
                    "is_ai_agent": user_context.is_ai_agent,
                    "has_delegation": bool(user_context.delegation_id)
                }
            }
        )

    except UnauthorizedError as e:
        raise HTTPException(status_code=403, detail=str(e))
    except ResourceNotFoundError:
        raise HTTPException(status_code=404, detail="Resource not found")
    except Exception as e:
        logger.error("Failed to get resource", error=str(e))
        raise HTTPException(status_code=500, detail="Internal server error")

@router.post("/{resource_type}/batch", response_model=List[ResourceResponse])
async def get_resources_batch(
    resource_type: ResourceType,
    request: BatchResourceRequest,
    user_context: UserContext = Depends(get_current_user),
    resource_manager: ResourceManager = Depends(),
):
    """
    Get multiple resources with permission filtering.

    Maximum batch size: 50 resources
    """
    if len(request.resource_ids) > 50:
        raise HTTPException(
            status_code=400,
            detail="Batch size exceeds maximum of 50 resources"
        )

    logger.info(
        "Getting resources batch",
        resource_type=resource_type,
        batch_size=len(request.resource_ids),
        user_id=user_context.user_id,
        is_ai_agent=user_context.is_ai_agent
    )

    try:
        resources = await resource_manager.get_resources_batch_with_permissions(
            user_context=user_context,
            resource_type=resource_type,
            resource_ids=request.resource_ids,
            includes=request.include or []
        )

        return [
            ResourceResponse(
                data=resource,
                meta={
                    "api_version": "1.0",
                    "resource_type": resource_type.value
                }
            )
            for resource in resources
        ]

    except Exception as e:
        logger.error("Failed to get resources batch", error=str(e))
        raise HTTPException(status_code=500, detail="Internal server error")

@router.get("/{resource_type}", response_model=List[ResourceResponse])
async def list_resources(
    resource_type: ResourceType,
    filter: ResourceFilter = Query(ResourceFilter.ALL),
    limit: int = Query(50, le=100, ge=1),
    offset: int = Query(0, ge=0),
    include: Optional[List[str]] = Query(None),
    user_context: UserContext = Depends(get_current_user),
    resource_manager: ResourceManager = Depends(),
):
    """
    List accessible resources of a specific type.

    Filters:
    - owned: Only resources owned by the user
    - shared: Only resources shared with the user
    - all: Both owned and shared resources (default)

    For AI agents, only resources within their delegation scope are returned.
    """
    logger.info(
        "Listing resources",
        resource_type=resource_type,
        filter=filter,
        limit=limit,
        offset=offset,
        user_id=user_context.user_id,
        is_ai_agent=user_context.is_ai_agent
    )

    try:
        resources = await resource_manager.list_accessible_resources(
            user_context=user_context,
            resource_type=resource_type,
            filter=filter,
            limit=limit,
            offset=offset,
            includes=include or []
        )

        return [
            ResourceResponse(
                data=resource,
                meta={
                    "api_version": "1.0",
                    "resource_type": resource_type.value,
                    "filter": filter.value,
                    "pagination": {
                        "limit": limit,
                        "offset": offset,
                        "count": len(resources)
                    }
                }
            )
            for resource in resources
        ]

    except Exception as e:
        logger.error("Failed to list resources", error=str(e))
        raise HTTPException(status_code=500, detail="Internal server error")

# Health check endpoint
@router.get("/health", tags=["health"])
async def health_check():
    """Basic health check for the AAS"""
    return {
        "status": "healthy",
        "service": "Authorization & Access Service",
        "version": "1.0.0"
    }

Phase 2: Infrastructure & Testing (Week 2)

Day 11-12: Terraform Infrastructure

1. AAS Module

# tf_infra/modules/AuthorizationAccessService/main.tf

locals {
  service_name = "${var.environment}-authorization-access-service"
}

# ECR Repository
resource "aws_ecr_repository" "aas" {
  name                 = local.service_name
  image_tag_mutability = "MUTABLE"

  image_scanning_configuration {
    scan_on_push = true
  }

  tags = merge(var.common_tags, {
    Name = local.service_name
  })
}

# Lambda Function
module "aas_lambda" {
  source = "../Lambda"

  function_name = local.service_name
  description   = "Authorization and Access Service for shared resources"

  image_uri = "${aws_ecr_repository.aas.repository_url}:latest"

  environment_variables = {
    ENVIRONMENT              = var.environment
    LOG_LEVEL               = var.log_level

    # Service URLs
    SHARING_SERVICE_URL      = var.sharing_service_url
    TODO_SERVICE_URL        = var.todo_service_url
    DELEGATION_SERVICE_URL  = var.delegation_service_url

    # Redis Cache
    REDIS_ENDPOINT          = module.redis_cache.endpoint
    REDIS_PORT              = module.redis_cache.port

    # Feature Flags
    ENABLE_CACHE            = "true"
    CACHE_TTL_SECONDS       = "300"
    ENABLE_AI_DELEGATION    = "true"
  }

  vpc_config = {
    subnet_ids         = var.private_subnet_ids
    security_group_ids = [aws_security_group.aas.id]
  }

  timeout = 30
  memory_size = 512

  tags = var.common_tags
}

# Redis Cache
module "redis_cache" {
  source = "../ElastiCache"

  cluster_id           = "${local.service_name}-cache"
  node_type           = var.cache_node_type
  num_cache_nodes     = 1
  engine_version      = "7.0"
  port                = 6379

  subnet_group_name   = var.cache_subnet_group_name
  security_group_ids  = [aws_security_group.redis.id]

  tags = var.common_tags
}

# Security Groups
resource "aws_security_group" "aas" {
  name_prefix = "${local.service_name}-lambda-"
  vpc_id      = var.vpc_id

  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }

  tags = merge(var.common_tags, {
    Name = "${local.service_name}-lambda"
  })
}

resource "aws_security_group" "redis" {
  name_prefix = "${local.service_name}-redis-"
  vpc_id      = var.vpc_id

  ingress {
    from_port       = 6379
    to_port         = 6379
    protocol        = "tcp"
    security_groups = [aws_security_group.aas.id]
  }

  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }

  tags = merge(var.common_tags, {
    Name = "${local.service_name}-redis"
  })
}

# IAM Role Policies
resource "aws_iam_role_policy" "aas_permissions" {
  name = "${local.service_name}-permissions"
  role = module.aas_lambda.execution_role_id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "lambda:InvokeFunction"
        ]
        Resource = [
          var.sharing_service_arn,
          var.todo_service_arn,
          var.delegation_service_arn
        ]
      },
      {
        Effect = "Allow"
        Action = [
          "elasticache:DescribeCacheClusters",
          "elasticache:DescribeCacheNodes"
        ]
        Resource = "*"
      }
    ]
  })
}

2. API Gateway Integration

# tf_infra/modules/AuthorizationAccessService/api_gateway.tf

# API Gateway Resources
resource "aws_api_gateway_resource" "aas_base" {
  rest_api_id = var.api_gateway_id
  parent_id   = var.api_gateway_root_id
  path_part   = "aas"
}

resource "aws_api_gateway_resource" "api_version" {
  rest_api_id = var.api_gateway_id
  parent_id   = aws_api_gateway_resource.aas_base.id
  path_part   = "api"
}

resource "aws_api_gateway_resource" "v1" {
  rest_api_id = var.api_gateway_id
  parent_id   = aws_api_gateway_resource.api_version.id
  path_part   = "v1"
}

resource "aws_api_gateway_resource" "resources" {
  rest_api_id = var.api_gateway_id
  parent_id   = aws_api_gateway_resource.v1.id
  path_part   = "resources"
}

# Proxy resource for all sub-paths
resource "aws_api_gateway_resource" "proxy" {
  rest_api_id = var.api_gateway_id
  parent_id   = aws_api_gateway_resource.resources.id
  path_part   = "{proxy+}"
}

# Methods
resource "aws_api_gateway_method" "any" {
  rest_api_id   = var.api_gateway_id
  resource_id   = aws_api_gateway_resource.proxy.id
  http_method   = "ANY"
  authorization = "CUSTOM"
  authorizer_id = var.jwt_authorizer_id
}

# Lambda Integration
resource "aws_api_gateway_integration" "lambda" {
  rest_api_id = var.api_gateway_id
  resource_id = aws_api_gateway_resource.proxy.id
  http_method = aws_api_gateway_method.any.http_method

  integration_http_method = "POST"
  type                   = "AWS_PROXY"
  uri                    = module.aas_lambda.invoke_arn
}

# Lambda Permission for API Gateway
resource "aws_lambda_permission" "api_gateway" {
  statement_id  = "AllowAPIGatewayInvoke"
  action        = "lambda:InvokeFunction"
  function_name = module.aas_lambda.function_name
  principal     = "apigateway.amazonaws.com"
  source_arn    = "${var.api_gateway_execution_arn}/*/*"
}

Day 13-14: Testing Implementation

1. Unit Tests

# tests/unit/test_authorization_engine.py
import pytest
from unittest.mock import Mock, AsyncMock
from datetime import datetime

from app.services.authorization_engine import AuthorizationEngine
from app.models.permission import (
    Permission, AuthorizationContext, AuthorizationResult
)

@pytest.mark.asyncio
async def test_authorize_owner_access():
    """Test that resource owners get full access"""
    # Setup mocks
    sharing_client = Mock()
    delegation_client = Mock()
    cache_manager = Mock()
    cache_manager.get = AsyncMock(return_value=None)
    cache_manager.set = AsyncMock()
    audit_logger = Mock()
    audit_logger.log_authorization_attempt = AsyncMock()
    audit_logger.log_authorization_result = AsyncMock()

    engine = AuthorizationEngine(
        sharing_client, delegation_client, cache_manager, audit_logger
    )

    # Mock ownership check
    engine._check_ownership = AsyncMock(
        return_value=AuthorizationResult(
            allowed=True,
            reason="owner",
            permission_level=Permission.OWNER
        )
    )

    # Test
    context = AuthorizationContext(
        user_id="user123",
        resource_type="task",
        resource_id="task456",
        required_permission=Permission.DELETE
    )

    result = await engine.authorize(context)

    # Assertions
    assert result.allowed is True
    assert result.reason == "owner"
    assert result.permission_level == Permission.OWNER

    # Verify cache was set
    cache_manager.set.assert_called_once()

    # Verify audit logging
    audit_logger.log_authorization_attempt.assert_called_once_with(context)
    audit_logger.log_authorization_result.assert_called_once()

@pytest.mark.asyncio
async def test_authorize_ai_agent_with_delegation():
    """Test AI agent access with valid delegation"""
    # Setup mocks
    sharing_client = Mock()
    delegation_client = Mock()
    cache_manager = Mock()
    cache_manager.get = AsyncMock(return_value=None)
    cache_manager.set = AsyncMock()
    audit_logger = Mock()
    audit_logger.log_authorization_attempt = AsyncMock()
    audit_logger.log_authorization_result = AsyncMock()

    # Mock delegation check
    delegation_client.get_delegation = AsyncMock(
        return_value=Mock(
            delegation_id="del123",
            status="granted",
            allowed_tags_or_contexts=["read:task", "write:task"],
            user_id="user123",
            grant_timestamp=datetime.utcnow()
        )
    )

    engine = AuthorizationEngine(
        sharing_client, delegation_client, cache_manager, audit_logger
    )

    # Test
    context = AuthorizationContext(
        user_id="ai-agent-123",
        resource_type="task",
        resource_id="task456",
        required_permission=Permission.VIEW,
        is_ai_agent=True,
        delegation_id="del123"
    )

    result = await engine.authorize(context)

    # Assertions
    assert result.allowed is True
    assert result.reason == "shared"  # Or whatever the actual reason is
    assert result.delegation_info is not None
    assert result.delegation_info["delegation_id"] == "del123"

# More tests...

2. Integration Tests

# tests/integration/test_api_endpoints.py
import pytest
from httpx import AsyncClient
from fastapi.testclient import TestClient

from main import app

@pytest.mark.asyncio
async def test_get_shared_task():
    """Test getting a shared task through AAS"""
    async with AsyncClient(app=app, base_url="http://test") as client:
        # Setup test data
        headers = {"Authorization": "Bearer test-token"}

        # Test getting a resource
        response = await client.get(
            "/api/v1/resources/task/task123",
            headers=headers
        )

        assert response.status_code == 200
        data = response.json()
        assert data["data"]["id"] == "task123"
        assert data["data"]["type"] == "task"

@pytest.mark.asyncio
async def test_batch_resources_with_permissions():
    """Test batch endpoint filters by permissions"""
    async with AsyncClient(app=app, base_url="http://test") as client:
        headers = {"Authorization": "Bearer test-token"}

        # Request batch of resources
        response = await client.post(
            "/api/v1/resources/task/batch",
            headers=headers,
            json={
                "resource_ids": ["task1", "task2", "task3"],
                "include": ["permissions", "shares"]
            }
        )

        assert response.status_code == 200
        data = response.json()

        # Should only return accessible resources
        assert len(data) <= 3

        # Each resource should have permission info
        for resource in data:
            assert "permissions" in resource["data"]

Phase 3: Flutter Integration (Week 3)

Update Flutter to use AAS

// lib/services/authorization_access_service.dart
class AuthorizationAccessService {
  final String baseUrl;
  final AuthService authService;

  AuthorizationAccessService({
    required this.baseUrl,
    required this.authService,
  });

  Future<Resource?> getResource({
    required ResourceType type,
    required String resourceId,
    List<String> include = const [],
  }) async {
    final token = await authService.getAccessToken();

    final response = await http.get(
      Uri.parse('$baseUrl/api/v1/resources/${type.value}/$resourceId')
        .replace(queryParameters: {
          if (include.isNotEmpty) 'include': include,
        }),
      headers: {
        'Authorization': 'Bearer $token',
        'Content-Type': 'application/json',
      },
    );

    if (response.statusCode == 200) {
      final data = json.decode(response.body);
      return Resource.fromJson(data['data']);
    } else if (response.statusCode == 404) {
      return null;
    } else {
      throw Exception('Failed to get resource: ${response.statusCode}');
    }
  }

  Future<List<Resource>> getResourcesBatch({
    required ResourceType type,
    required List<String> resourceIds,
    List<String> include = const [],
  }) async {
    final token = await authService.getAccessToken();

    final response = await http.post(
      Uri.parse('$baseUrl/api/v1/resources/${type.value}/batch'),
      headers: {
        'Authorization': 'Bearer $token',
        'Content-Type': 'application/json',
      },
      body: json.encode({
        'resource_ids': resourceIds,
        'include': include,
      }),
    );

    if (response.statusCode == 200) {
      final data = json.decode(response.body) as List;
      return data.map((item) => Resource.fromJson(item['data'])).toList();
    } else {
      throw Exception('Failed to get resources batch: ${response.statusCode}');
    }
  }
}

Key Benefits of This Architecture

  1. Security First

    • Zero-trust model
    • Single point of authorization
    • Complete audit trail
    • Support for AI agent delegation
  2. Performance Optimized

    • Multi-level caching
    • Batch operations
    • Circuit breakers
    • Parallel authorization
  3. Future Proof

    • Easy to add new resource types
    • Support for new permission models
    • Ready for GraphQL/WebSocket
    • AI/ML ready
  4. Developer Friendly

    • Consistent API
    • Clear documentation
    • Comprehensive testing
    • Easy integration

Success Metrics

  • ✅ Shared tasks accessible in <200ms
  • ✅ Support for 50+ item batches
  • ✅ AI agents can access delegated resources
  • ✅ Complete audit trail
  • ✅ Zero security vulnerabilities
  • ✅ 99.9% uptime

Next Steps

  1. Start Phase 1 implementation
  2. Set up development environment
  3. Create service structure
  4. Implement core components
  5. Deploy to development
  6. Begin integration testing

Authorization & Access Service - Architectural Concept

Executive Summary

The Authorization & Access Service (AAS) is a centralized abstraction layer that handles all shared resource access across the Mindbots ecosystem. It acts as a secure proxy between clients and resource services, enforcing sharing permissions and providing a unified API for accessing shared resources.

Core Design Principles

1. Zero Trust Security Model

  • Never trust client-provided resource IDs
  • Always verify permissions at the service level
  • Implement defense in depth
  • Audit all access attempts

2. Resource Agnostic Design

  • Generic interface that works with any resource type
  • Pluggable resource adapters for different services
  • Consistent API regardless of underlying resource

3. Performance First

  • Intelligent caching with security boundaries
  • Parallel permission checks
  • Minimal latency overhead
  • Circuit breakers for downstream services

4. Developer Experience

  • Single API to learn for all shared resources
  • Consistent error handling
  • Clear permission models
  • Self-documenting endpoints

Architecture Overview

┌─────────────────────────────────────────────────────────────────┐
│                         Clients                                  │
│  (Flutter App, Web App, Third-party integrations)               │
└─────────────────────┬───────────────────────────────────────────┘
                      │ HTTPS + JWT
                      ▼
┌─────────────────────────────────────────────────────────────────┐
│              API Gateway (AWS API Gateway)                       │
│  - Rate limiting                                                 │
│  - JWT validation                                                │
│  - Request routing                                               │
└─────────────────────┬───────────────────────────────────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────────────────────────┐
│          Authorization & Access Service (AAS)                    │
│                                                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                   API Layer                              │   │
│  │  - Unified resource access endpoints                     │   │
│  │  - Request validation                                    │   │
│  │  - Response transformation                               │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              │                                   │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │              Authorization Engine                        │   │
│  │  - Permission resolution                                 │   │
│  │  - Policy evaluation                                     │   │
│  │  - Access decision caching                              │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              │                                   │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │              Resource Adapter Layer                      │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐  │   │
│  │  │   Todo   │ │ Project  │ │   Chat   │ │  Future  │  │   │
│  │  │ Adapter  │ │ Adapter  │ │ Adapter  │ │ Adapters │  │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘  │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              │                                   │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                  Service Clients                         │   │
│  │  - Sharing Service Client                                │   │
│  │  - Resource Service Clients                              │   │
│  │  - Circuit breakers & retries                           │   │
│  └─────────────────────────────────────────────────────────┘   │
└─────────────────────┬───────────────────────────────────────────┘
                      │ Internal service-to-service auth
                      ▼
┌─────────────────────────────────────────────────────────────────┐
│                    Backend Services                              │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐              │
│  │   Sharing   │ │    Todo     │ │   Project   │              │
│  │   Service   │ │   Service   │ │   Service   │  ...         │
│  └─────────────┘ └─────────────┘ └─────────────┘              │
└─────────────────────────────────────────────────────────────────┘

API Design

Unified Resource Access Pattern

# Get a single resource (with permission check)
GET /api/v1/resources/{resource_type}/{resource_id}

# Get multiple resources (batch with permission filtering)
POST /api/v1/resources/{resource_type}/batch
{
  "resource_ids": ["id1", "id2", "id3"]
}

# List accessible resources of a type
GET /api/v1/resources/{resource_type}?filter=owned|shared|all

# Get resource with related data
GET /api/v1/resources/{resource_type}/{resource_id}?include=permissions,owner,shares

Resource Types

class ResourceType(str, Enum):
    TASK = "task"
    PROJECT = "project"
    CHAT = "chat"
    DOCUMENT = "document"
    # Easily extensible for future types

Response Format

{
  "data": {
    "id": "resource-id",
    "type": "task",
    "attributes": {
      // Resource-specific data
    },
    "permissions": {
      "can_read": true,
      "can_write": false,
      "can_delete": false,
      "can_share": false
    },
    "sharing": {
      "is_shared": true,
      "shared_by": "user-id",
      "shared_at": "2024-01-01T00:00:00Z"
    }
  },
  "meta": {
    "api_version": "1.0",
    "request_id": "req-123"
  }
}

Security Architecture

1. Authentication & Authorization Flow

async def authorize_resource_access(
    user_id: str,
    resource_type: ResourceType,
    resource_id: str,
    required_permission: Permission
) -> AuthorizationResult:
    # Step 1: Check if user owns the resource
    if await is_resource_owner(user_id, resource_type, resource_id):
        return AuthorizationResult(allowed=True, reason="owner")

    # Step 2: Check sharing permissions
    share = await sharing_service.get_share(
        resource_type=resource_type,
        resource_id=resource_id,
        shared_with_user_id=user_id
    )

    if not share:
        return AuthorizationResult(allowed=False, reason="no_access")

    # Step 3: Verify permission level
    if share.has_permission(required_permission):
        return AuthorizationResult(
            allowed=True,
            reason="shared",
            restrictions=share.restrictions
        )

    return AuthorizationResult(
        allowed=False,
        reason="insufficient_permissions"
    )

2. Defense in Depth

  • Layer 1: API Gateway JWT validation
  • Layer 2: AAS authentication middleware
  • Layer 3: Authorization engine permission checks
  • Layer 4: Resource service validation
  • Layer 5: Database row-level security

3. Security Headers

security_headers = {
    "X-Content-Type-Options": "nosniff",
    "X-Frame-Options": "DENY",
    "X-XSS-Protection": "1; mode=block",
    "Strict-Transport-Security": "max-age=31536000",
    "Content-Security-Policy": "default-src 'self'",
    "X-Request-ID": generate_request_id()
}

Performance Optimization

1. Multi-Level Caching

class CacheStrategy:
    # L1: In-memory cache (5 minutes)
    permission_cache: TTLCache = TTLCache(maxsize=10000, ttl=300)

    # L2: Redis cache (15 minutes)
    redis_cache: Redis = Redis(
        decode_responses=True,
        socket_keepalive=True,
        health_check_interval=30
    )

    # L3: Conditional HTTP caching
    etag_support: bool = True

2. Batch Operations

async def get_resources_batch(
    user_id: str,
    resource_type: ResourceType,
    resource_ids: List[str]
) -> List[Resource]:
    # Parallel permission checks
    permission_tasks = [
        authorize_resource_access(user_id, resource_type, rid, Permission.READ)
        for rid in resource_ids
    ]
    permissions = await asyncio.gather(*permission_tasks)

    # Filter authorized resources
    authorized_ids = [
        rid for rid, perm in zip(resource_ids, permissions)
        if perm.allowed
    ]

    # Batch fetch from resource service
    if authorized_ids:
        return await resource_adapter.get_batch(authorized_ids)
    return []

3. Circuit Breaker Pattern

class ServiceCircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failures = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED

    async def call(self, func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise ServiceUnavailableError()

        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

Resource Adapter Pattern

Base Adapter Interface

class ResourceAdapter(ABC):
    @abstractmethod
    async def get_single(
        self,
        resource_id: str,
        user_context: UserContext
    ) -> Optional[Resource]:
        pass

    @abstractmethod
    async def get_batch(
        self,
        resource_ids: List[str],
        user_context: UserContext
    ) -> List[Resource]:
        pass

    @abstractmethod
    async def list_owned(
        self,
        user_id: str,
        filters: Dict[str, Any]
    ) -> List[Resource]:
        pass

    @abstractmethod
    def transform_to_unified_format(
        self,
        raw_resource: Any
    ) -> Resource:
        pass

Todo Adapter Implementation

class TodoAdapter(ResourceAdapter):
    def __init__(self, todo_service_client: TodoServiceClient):
        self.client = todo_service_client

    async def get_single(
        self,
        resource_id: str,
        user_context: UserContext
    ) -> Optional[Resource]:
        try:
            # Call todo service with service-to-service auth
            task = await self.client.get_task(
                task_id=resource_id,
                auth_token=get_service_token()
            )
            return self.transform_to_unified_format(task)
        except TaskNotFoundError:
            return None

    def transform_to_unified_format(self, task: Task) -> Resource:
        return Resource(
            id=task.id,
            type=ResourceType.TASK,
            attributes={
                "title": task.title,
                "description": task.description,
                "status": task.status,
                "created_at": task.created_at,
                "updated_at": task.updated_at,
                # Map all task fields
            },
            owner_id=task.user_id,
            created_at=task.created_at,
            updated_at=task.updated_at
        )

Infrastructure Integration

Terraform Module Structure

# modules/AuthorizationAccessService/main.tf

module "aas_lambda" {
  source = "../Lambda"

  function_name = "${var.environment}-authorization-access-service"
  runtime       = "python3.11"
  handler       = "main.handler"

  environment_variables = {
    SHARING_SERVICE_URL = var.sharing_service_url
    TODO_SERVICE_URL    = var.todo_service_url
    REDIS_URL          = module.redis_cache.endpoint
    LOG_LEVEL          = var.log_level
  }

  vpc_config = {
    subnet_ids         = var.private_subnet_ids
    security_group_ids = [module.aas_security_group.id]
  }
}

module "redis_cache" {
  source = "../ElastiCache"

  cluster_id         = "${var.environment}-aas-cache"
  node_type         = "cache.t3.micro"
  engine_version    = "7.0"
  subnet_group_name = var.cache_subnet_group
}

API Gateway Integration

resource "aws_api_gateway_resource" "aas_resources" {
  rest_api_id = var.api_gateway_id
  parent_id   = var.api_gateway_root_id
  path_part   = "resources"
}

resource "aws_api_gateway_method" "get_resource" {
  rest_api_id   = var.api_gateway_id
  resource_id   = aws_api_gateway_resource.aas_resources.id
  http_method   = "GET"
  authorization = "CUSTOM"
  authorizer_id = var.jwt_authorizer_id

  request_parameters = {
    "method.request.path.resource_type" = true
    "method.request.path.resource_id"   = true
  }
}

Migration Strategy

Phase 1: Setup (Week 1-2)

  1. Create AAS service infrastructure
  2. Implement core authorization engine
  3. Deploy with todo adapter only
  4. Add comprehensive monitoring

Phase 2: Integration (Week 3-4)

  1. Update Flutter app to use AAS for shared todos
  2. Monitor performance and fix issues
  3. Add caching layers
  4. Implement batch endpoints

Phase 3: Expansion (Week 5-6)

  1. Add project adapter
  2. Add chat adapter
  3. Update all clients
  4. Deprecate direct service calls for shared resources

Phase 4: Optimization (Week 7-8)

  1. Analyze performance metrics
  2. Optimize slow queries
  3. Implement advanced caching
  4. Add predictive prefetching

Monitoring & Observability

Key Metrics

class AASMetrics:
    # Performance metrics
    request_duration = Histogram(
        'aas_request_duration_seconds',
        'Request duration by endpoint and resource type',
        ['endpoint', 'resource_type', 'status']
    )

    # Authorization metrics
    authorization_decisions = Counter(
        'aas_authorization_decisions_total',
        'Authorization decisions by result',
        ['resource_type', 'decision', 'reason']
    )

    # Cache metrics
    cache_hits = Counter(
        'aas_cache_hits_total',
        'Cache hits by cache level',
        ['cache_level', 'resource_type']
    )

    # Error metrics
    downstream_errors = Counter(
        'aas_downstream_errors_total',
        'Errors from downstream services',
        ['service', 'error_type']
    )

Logging Strategy

@contextmanager
def request_context(request_id: str, user_id: str, resource_type: str):
    """Structured logging context for all operations"""
    logger = structlog.get_logger()
    logger = logger.bind(
        request_id=request_id,
        user_id=user_id,
        resource_type=resource_type,
        service="aas"
    )

    start_time = time.time()
    try:
        yield logger
    finally:
        duration = time.time() - start_time
        logger.info(
            "request_completed",
            duration_ms=duration * 1000
        )

Future Considerations

1. Advanced Features

  • GraphQL interface for flexible queries
  • Real-time updates via WebSocket
  • Bulk permission management
  • Permission inheritance hierarchies

2. AI Integration

  • Smart permission suggestions
  • Anomaly detection for suspicious access
  • Predictive caching based on usage patterns

3. Cross-Region Support

  • Multi-region deployment
  • Global permission cache
  • Edge computing for low latency

Conclusion

The Authorization & Access Service provides a secure, performant, and extensible solution for managing shared resource access across the Mindbots ecosystem. By centralizing authorization logic and providing a unified API, we can ensure consistent security policies while enabling rapid feature development.

Key benefits:

  • Security: Single point for permission enforcement
  • Performance: Optimized batch operations and caching
  • Extensibility: Easy to add new resource types
  • Maintainability: Centralized authorization logic
  • Developer Experience: Consistent API across all resources

Issue Implementation: Enable Secure Access to Shared Todos for Users with View Permissions

1. Original GitHub Issue

⚠️ CRITICAL: Language Requirement: This entire implementation document MUST be in English, even if the original issue or user communication is in another language. All sections below must be written in English.

GitHub Issue: https://github.com/mindbots-inc/flow/issues/51
Title: Enable secure access to shared todos for users with view permissions
Repository: https://github.com/mindbots-inc/flow
Related to: Issue #28
Assignee: @michaelkrumm

2. Research & Analysis

Understanding Current State

UI Implementation (Flow App):

  • ✅ Share button on task cards
  • ✅ Share status indicators showing who tasks are shared with
  • ✅ Permission badges (View, Edit, Share, etc.)
  • ✅ "Shared by" indicator for received shares
  • ✅ Share management modal for managing permissions
  • ✅ User search functionality
  • ✅ Sharing filters (All, My Tasks, Shared with Me, Shared by Me)
  • ✅ Offline support for pending shares

Backend Architecture:

  • Todo Service: AWS Lambda with DynamoDB, handles task CRUD operations
  • Sharing Service: Separate AWS Lambda, manages shares and permissions
  • API Gateway: JWT authentication for all endpoints
  • Current Issue: Todo service only returns tasks owned by the requesting user

Security Model:

  • JWT-based authentication at API Gateway
  • User context available in Lambda functions
  • DynamoDB row-level access based on user_id
  • No current cross-service permission checking

Research Findings

  1. Flutter App Implementation:

    • The sharingAwareTasksProvider fetches shared task IDs from sharing service
    • Then attempts to fetch task details individually using todoApiService.getTask()
    • This fails with 404 because todo service doesn't check sharing permissions
  2. Infrastructure:

    • Both services deployed as containerized Lambda functions
    • Services can communicate via AWS SDK or HTTP
    • IAM roles already configured for basic operations
  3. Abstraction Layer Consideration:

    • User wants to share multiple resource types (tasks, projects, chats)
    • Current implementation is task-specific
    • Need to design for extensibility

3. Requirements Clarification (95% Confidence)

Confirmed Requirements:

  1. GitHub Issue: Create new issue in flow repo, linked to #28
  2. UI: Already fully implemented and functional
  3. Performance: Real-time or near real-time, batch size of 50
  4. Security: Backend enforcement only, no audit log initially
  5. Future: Design with abstraction layer for multiple resource types

4. Security Analysis

Attack Vectors & Mitigations

  1. Unauthorized Access via Direct API Calls

    • Risk: Users could try to access any task by guessing IDs
    • Mitigation: Always verify permissions server-side
  2. Permission Elevation

    • Risk: Users with VIEW trying to perform EDIT operations
    • Mitigation: Check specific permission level for each operation
  3. Batch Request Exploitation

    • Risk: Users requesting thousands of task IDs to find valid ones
    • Mitigation: Limit batch size, rate limiting, only return authorized tasks
  4. Service-to-Service Spoofing

    • Risk: Malicious requests pretending to be from todo service
    • Mitigation: Use AWS IAM roles for service authentication
  5. Cache Poisoning

    • Risk: If caching permissions, stale data could grant unauthorized access
    • Mitigation: Short TTL (5 minutes), invalidate on permission changes

Security Implementation Strategy

# Pseudo-code for secure permission checking
async def get_task_with_permissions(task_id: str, user_id: str, auth_context: AuthContext):
    # 1. Try to get task as owner
    task = await get_task_by_owner(user_id, task_id)
    if task:
        return task

    # 2. Check sharing permissions
    if await sharing_service.check_permission(
        entity_type="task",
        entity_id=task_id,
        user_id=user_id,
        required_permission=PermissionLevel.VIEW
    ):
        # 3. Get task with different access pattern
        task = await get_task_by_id(task_id)
        if task:
            # 4. Add permission context for frontend
            task.shared_access = True
            task.permission_level = await sharing_service.get_user_permission(task_id, user_id)
            return task

    # 5. No access
    raise NotFoundException("Task not found")

5. Implementation Plan

Overall Strategy

After careful analysis and discussion, we've decided to implement Option B: Centralized Authorization & Access Service (AAS). This provides a secure, scalable abstraction layer for all shared resource access across the Mindbots ecosystem.

The AAS will incorporate key patterns from the AI Delegation Service:

  • Delegated Access: Support both human users and AI agents accessing shared resources
  • Consent Management: Track and audit all sharing/delegation decisions
  • Scoped Permissions: Fine-grained control over what can be accessed
  • Instant Revocation: Immediate permission changes across all services

Architecture Decision: Authorization & Access Service

The AAS will act as a centralized proxy between clients and resource services, enforcing sharing permissions and providing a unified API for accessing shared resources. This approach offers:

  • Security: Single point for permission enforcement with zero-trust model
  • Extensibility: Easy to add new resource types (projects, chats, documents)
  • Performance: Intelligent caching and batch operations
  • Consistency: Unified API regardless of resource type
  • AI-Ready: Built-in support for AI agent delegated access

For the complete architectural concept, see: Authorization & Access Service Concept

Implementation Phases

Phase 1: Core AAS Infrastructure (Current Sprint)

Week 1: Foundation

  1. Create AAS Service Structure

    cd repos/mindbots-services
    mkdir authorization-access-service
    cd authorization-access-service
    poetry init --name authorization-access-service --python "^3.11"
  2. Core Dependencies

    [tool.poetry.dependencies]
    python = "^3.11"
    fastapi = "^0.109.0"
    pydantic = "^2.5.0"
    boto3 = "^1.34.0"
    aioboto3 = "^12.0.0"
    httpx = "^0.25.0"
    redis = "^5.0.0"
    structlog = "^24.0.0"
  3. Service Structure

    authorization-access-service/
    ├── app/
    │   ├── api/
    │   │   └── v1/
    │   │       ├── endpoints/
    │   │       │   ├── resources.py
    │   │       │   └── health.py
    │   │       └── router.py
    │   ├── core/
    │   │   ├── config.py
    │   │   ├── security.py
    │   │   └── exceptions.py
    │   ├── models/
    │   │   ├── resource.py
    │   │   ├── permission.py
    │   │   └── delegation.py
    │   ├── services/
    │   │   ├── authorization_engine.py
    │   │   ├── cache_manager.py
    │   │   └── audit_logger.py
    │   ├── adapters/
    │   │   ├── base.py
    │   │   ├── todo_adapter.py
    │   │   └── __init__.py
    │   └── clients/
    │       ├── sharing_service.py
    │       ├── todo_service.py
    │       └── delegation_service.py
    ├── main.py
    ├── Dockerfile
    └── pyproject.toml
    
  4. Implementation Steps

    Day 1-2: Core Service Setup

    • Create FastAPI application structure
    • Implement authentication middleware
    • Set up configuration management
    • Create base models and schemas
    • Add structured logging

    Day 3-4: Authorization Engine

    • Implement permission resolver
    • Create caching layer (Redis)
    • Add audit logging
    • Implement circuit breakers
    • Add comprehensive error handling

    Day 5: Service Clients

    • Create sharing service client
    • Create todo service client
    • Add AI delegation service client
    • Implement retry logic
    • Add service health checks

Week 2: Resource Adapters & Integration

Day 6-7: Todo Adapter

  • Implement ResourceAdapter interface
  • Create todo-specific transformation
  • Add batch operations
  • Implement error handling
  • Add comprehensive tests

Day 8-9: API Implementation

  • Create unified resource endpoints
  • Implement batch endpoint
  • Add OpenAPI documentation
  • Create request/response models
  • Add rate limiting

Day 10: Testing & Documentation

  • Unit tests for all components
  • Integration tests with mocked services
  • Load testing
  • API documentation
  • Deployment guide

Phase 2: Infrastructure & Deployment (Week 3-4)

  1. Terraform Infrastructure

    # tf_infra/modules/AuthorizationAccessService/
    module "aas" {
      source = "../AuthorizationAccessService"
    
      environment         = var.environment
      vpc_id             = var.vpc_id
      private_subnet_ids = var.private_subnet_ids
    
      # Service URLs
      sharing_service_url    = var.sharing_service_url
      todo_service_url      = var.todo_service_url
      delegation_service_url = var.delegation_service_url
    
      # Redis cache
      enable_cache = true
      cache_node_type = "cache.t3.micro"
    }
  2. API Gateway Integration

    • Add new routes for AAS
    • Configure JWT authorizer
    • Set up request validation
    • Add CloudWatch logging
  3. Monitoring Setup

    • CloudWatch dashboards
    • X-Ray tracing
    • Custom metrics
    • Alerting rules

Phase 3: Flutter Integration (Week 4)

  1. Update Flutter Services

    // New unified resource service
    class UnifiedResourceService {
      Future<List<Resource>> getResources({
        required ResourceType type,
        ResourceFilter filter = ResourceFilter.all,
      });
    
      Future<Resource?> getResource({
        required ResourceType type,
        required String resourceId,
      });
    }
  2. Migration Strategy

    • Feature flag for gradual rollout
    • Fallback to direct service calls
    • Error handling for migration
    • Performance monitoring

Detailed Implementation: Core Components

1. Authorization Engine

# app/services/authorization_engine.py
from typing import Optional, List
from enum import Enum
import asyncio

from app.models.permission import Permission, AuthorizationResult
from app.models.resource import ResourceType
from app.clients import SharingServiceClient, DelegationServiceClient

class AuthorizationEngine:
    def __init__(
        self,
        sharing_client: SharingServiceClient,
        delegation_client: DelegationServiceClient,
        cache_manager: CacheManager,
    ):
        self.sharing_client = sharing_client
        self.delegation_client = delegation_client
        self.cache = cache_manager

    async def authorize_access(
        self,
        user_id: str,
        resource_type: ResourceType,
        resource_id: str,
        required_permission: Permission,
        is_ai_agent: bool = False,
        delegation_id: Optional[str] = None,
    ) -> AuthorizationResult:
        """
        Authorize access to a resource, supporting both human users and AI agents.
        """
        # Check cache first
        cache_key = f"auth:{user_id}:{resource_type}:{resource_id}:{required_permission}"
        cached_result = await self.cache.get(cache_key)
        if cached_result:
            return AuthorizationResult.from_cache(cached_result)

        # For AI agents, verify delegation first
        if is_ai_agent and delegation_id:
            delegation_valid = await self._verify_ai_delegation(
                user_id, delegation_id, resource_type
            )
            if not delegation_valid:
                return AuthorizationResult(
                    allowed=False,
                    reason="invalid_delegation"
                )

        # Check ownership
        is_owner = await self._check_ownership(user_id, resource_type, resource_id)
        if is_owner:
            result = AuthorizationResult(
                allowed=True,
                reason="owner",
                permission_level=Permission.OWNER
            )
            await self.cache.set(cache_key, result.to_cache(), ttl=300)
            return result

        # Check sharing permissions
        share = await self.sharing_client.get_share(
            resource_type=resource_type.value,
            resource_id=resource_id,
            shared_with_user_id=user_id
        )

        if share and share.has_permission(required_permission):
            result = AuthorizationResult(
                allowed=True,
                reason="shared",
                permission_level=share.permission_level,
                restrictions=share.restrictions
            )
            await self.cache.set(cache_key, result.to_cache(), ttl=300)
            return result

        # No access
        result = AuthorizationResult(
            allowed=False,
            reason="no_access"
        )
        await self.cache.set(cache_key, result.to_cache(), ttl=60)
        return result

    async def authorize_batch(
        self,
        user_id: str,
        resource_type: ResourceType,
        resource_ids: List[str],
        required_permission: Permission,
    ) -> dict[str, AuthorizationResult]:
        """
        Authorize access to multiple resources in parallel.
        """
        tasks = [
            self.authorize_access(
                user_id, resource_type, resource_id, required_permission
            )
            for resource_id in resource_ids
        ]

        results = await asyncio.gather(*tasks, return_exceptions=True)

        return {
            resource_id: result if isinstance(result, AuthorizationResult)
            else AuthorizationResult(allowed=False, reason="error")
            for resource_id, result in zip(resource_ids, results)
        }

    async def _verify_ai_delegation(
        self,
        user_id: str,
        delegation_id: str,
        resource_type: ResourceType
    ) -> bool:
        """
        Verify AI agent has valid delegation for resource type.
        """
        delegation = await self.delegation_client.get_delegation(
            user_id, delegation_id
        )

        if not delegation or delegation.status != "granted":
            return False

        # Check if resource type is in allowed contexts
        allowed_contexts = delegation.allowed_tags_or_contexts
        return (
            "*" in allowed_contexts or
            f"read:{resource_type.value}" in allowed_contexts or
            f"write:{resource_type.value}" in allowed_contexts
        )

2. Resource Adapter Base

# app/adapters/base.py
from abc import ABC, abstractmethod
from typing import Optional, List, Any, Dict
from app.models.resource import Resource, ResourceType
from app.models.auth import UserContext

class ResourceAdapter(ABC):
    """
    Abstract base class for resource adapters.
    Each resource type (todo, project, chat) implements this interface.
    """

    @abstractmethod
    def get_resource_type(self) -> ResourceType:
        """Return the type of resource this adapter handles."""
        pass

    @abstractmethod
    async def get_single(
        self,
        resource_id: str,
        user_context: Optional[UserContext] = None
    ) -> Optional[Resource]:
        """Fetch a single resource by ID."""
        pass

    @abstractmethod
    async def get_batch(
        self,
        resource_ids: List[str],
        user_context: Optional[UserContext] = None
    ) -> List[Resource]:
        """Fetch multiple resources by IDs."""
        pass

    @abstractmethod
    async def list_owned(
        self,
        user_id: str,
        filters: Optional[Dict[str, Any]] = None,
        limit: int = 50,
        offset: int = 0
    ) -> List[Resource]:
        """List resources owned by a user."""
        pass

    @abstractmethod
    async def check_ownership(
        self,
        user_id: str,
        resource_id: str
    ) -> bool:
        """Check if user owns the resource."""
        pass

3. Unified API Endpoints

# app/api/v1/endpoints/resources.py
from fastapi import APIRouter, Depends, HTTPException, Query
from typing import List, Optional

from app.models.resource import Resource, ResourceType, ResourceFilter
from app.models.requests import BatchResourceRequest
from app.services.resource_manager import ResourceManager
from app.core.auth import get_current_user, UserContext

router = APIRouter(prefix="/resources", tags=["resources"])

@router.get("/{resource_type}/{resource_id}", response_model=Resource)
async def get_resource(
    resource_type: ResourceType,
    resource_id: str,
    include: Optional[List[str]] = Query(None),
    user_context: UserContext = Depends(get_current_user),
    resource_manager: ResourceManager = Depends(get_resource_manager),
):
    """
    Get a single resource with permission check.

    Optional includes:
    - permissions: Include user's permissions
    - owner: Include owner information
    - shares: Include sharing information
    """
    resource = await resource_manager.get_resource_with_permissions(
        user_id=user_context.user_id,
        resource_type=resource_type,
        resource_id=resource_id,
        include=include or [],
        is_ai_agent=user_context.is_ai_agent,
        delegation_id=user_context.delegation_id
    )

    if not resource:
        raise HTTPException(status_code=404, detail="Resource not found")

    return resource

@router.post("/{resource_type}/batch", response_model=List[Resource])
async def get_resources_batch(
    resource_type: ResourceType,
    request: BatchResourceRequest,
    user_context: UserContext = Depends(get_current_user),
    resource_manager: ResourceManager = Depends(get_resource_manager),
):
    """
    Get multiple resources with permission filtering.
    """
    if len(request.resource_ids) > 50:
        raise HTTPException(
            status_code=400,
            detail="Batch size exceeds limit of 50"
        )

    resources = await resource_manager.get_resources_batch_with_permissions(
        user_id=user_context.user_id,
        resource_type=resource_type,
        resource_ids=request.resource_ids,
        include=request.include or [],
        is_ai_agent=user_context.is_ai_agent,
        delegation_id=user_context.delegation_id
    )

    return resources

@router.get("/{resource_type}", response_model=List[Resource])
async def list_resources(
    resource_type: ResourceType,
    filter: ResourceFilter = Query(ResourceFilter.ALL),
    limit: int = Query(50, le=100),
    offset: int = Query(0, ge=0),
    user_context: UserContext = Depends(get_current_user),
    resource_manager: ResourceManager = Depends(get_resource_manager),
):
    """
    List accessible resources of a specific type.

    Filters:
    - owned: Only resources owned by user
    - shared: Only resources shared with user
    - all: Both owned and shared resources
    """
    resources = await resource_manager.list_accessible_resources(
        user_id=user_context.user_id,
        resource_type=resource_type,
        filter=filter,
        limit=limit,
        offset=offset,
        is_ai_agent=user_context.is_ai_agent,
        delegation_id=user_context.delegation_id
    )

    return resources

Security Enhancements with AI Delegation

  1. AI Agent Authentication

    async def get_current_user(
        authorization: str = Header(...),
        auth_service: AuthService = Depends(get_auth_service)
    ) -> UserContext:
        # Decode JWT
        token_data = await auth_service.decode_token(authorization)
    
        # Check if this is an AI agent with delegation
        delegation_id = token_data.get("custom:delegation_id")
        is_ai_agent = bool(delegation_id)
    
        return UserContext(
            user_id=token_data["sub"],
            is_ai_agent=is_ai_agent,
            delegation_id=delegation_id,
            allowed_contexts=token_data.get("custom:allowed_contexts", [])
        )
  2. Consent Tracking

    # Track all access attempts for audit
    await audit_logger.log_access_attempt(
        user_id=user_context.user_id,
        resource_type=resource_type,
        resource_id=resource_id,
        action="read",
        result="allowed" if resource else "denied",
        is_ai_agent=user_context.is_ai_agent,
        delegation_id=user_context.delegation_id
    )
  3. Instant Revocation

    • Cache invalidation on share revocation
    • Real-time permission updates
    • WebSocket notifications for active sessions

Success Metrics

  • ✅ Users can access shared todos through unified API
  • ✅ AI agents can access resources with proper delegation
  • ✅ Batch operations complete in <500ms for 50 items
  • ✅ Zero unauthorized access incidents
  • ✅ Complete audit trail of all access attempts
  • ✅ Seamless migration from direct service access

Next Steps

  1. Begin Phase 1 Implementation

    • Set up service structure
    • Implement core components
    • Create todo adapter
    • Deploy to development environment
  2. Testing Strategy

    • Unit tests for all components
    • Integration tests with real services
    • Security penetration testing
    • Performance benchmarking
  3. Documentation

    • API documentation
    • Integration guide
    • Security best practices
    • Migration guide for clients

6. Testing Strategy

Unit Tests

  • Test permission checking logic
  • Test batch request validation
  • Test service client error handling
  • Test caching behavior

Integration Tests

  • Test end-to-end flow with sharing service
  • Test permission changes and cache invalidation
  • Test rate limiting
  • Test error scenarios

Security Tests

  • Attempt unauthorized access
  • Test permission elevation attempts
  • Test batch request limits
  • Verify audit logging

Performance Tests

  • Measure single task fetch latency
  • Measure batch fetch performance
  • Test cache hit rates
  • Load test with concurrent users

7. Deployment Plan

  1. Deploy Updated Todo Service

    • Build and push new Docker image
    • Update Lambda function
    • Verify health checks
  2. Update API Gateway

    • Add new batch endpoint route
    • Update request validators
    • Deploy API changes
  3. Monitor and Rollback Plan

    • Monitor error rates and latencies
    • Have rollback procedure ready
    • Gradual rollout with feature flags

8. Documentation Updates

  • Update API documentation with new endpoints
  • Document permission model
  • Add security best practices
  • Create developer guide for extending to new resource types

9. Success Metrics

  • Users can successfully view shared tasks through AAS
  • AI agents can access resources with proper delegation
  • No unauthorized access incidents
  • Batch fetch latency < 200ms for 50 tasks
  • Zero security vulnerabilities found in testing
  • Complete audit trail of all access attempts
  • Existing functionality continues working
  • Seamless migration from direct service access

10. Future Enhancements

  1. Extended Resource Types: Add project, chat, and document adapters
  2. GraphQL Interface: Add GraphQL endpoint for flexible queries
  3. Real-time Updates: WebSocket support for live permission changes
  4. ML-Based Permissions: Predictive permission suggestions
  5. Global Scale: Multi-region deployment with edge caching

11. AI Delegation Service Integration

The AAS incorporates key patterns from the AI Delegation Service:

Delegated Access for AI Agents

  • AI agents use delegated tokens with delegation_id and allowed_contexts
  • Permissions are scoped to specific resource types and actions
  • Delegation can be revoked instantly, cutting off AI agent access

Enhanced Security Model

# AI Agent makes request with delegated token
Authorization: Bearer <delegated-token>

# Token contains special claims:
{
  "sub": "ai-agent-id",
  "custom:delegation_id": "del-123",
  "custom:allowed_contexts": ["read:task", "write:task"],
  "custom:delegated_by": "user-123"
}

# AAS verifies delegation before checking resource permissions

Consent & Audit Trail

  • Every AI agent access is logged with delegation context
  • Users can see which AI agents accessed their resources
  • Complete audit trail for compliance and security

Implementation Details

For the complete AAS implementation plan with AI delegation support, see: Authorization & Access Service Implementation Plan

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment