Skip to content

Instantly share code, notes, and snippets.

@evolvingsam
Last active December 8, 2025 19:49
Show Gist options
  • Select an option

  • Save evolvingsam/64607e61fb0a755cc511b5e9ee9986c5 to your computer and use it in GitHub Desktop.

Select an option

Save evolvingsam/64607e61fb0a755cc511b5e9ee9986c5 to your computer and use it in GitHub Desktop.

Modular Python Ingestion Pipeline

High-Level Architecture

graph TD
    User[Main / Scheduler] -->|Job Payload| Engine[Ingestion Engine]
    
    subgraph Core Logic
    Engine -->|Check Limit| RateLimiter[Rate Limiter]
    Engine -->|Request Class| Registry[Connector Registry]
    end
    
    subgraph Plugins
    Registry -->|Loads| Weather[Weather Connector]
    end
    
    subgraph External
    Weather -->|HTTP| OpenMeteo[OpenMeteo API]
    end
    
    Weather -->|Yields DataRecord| Engine
    Engine -->|Saves| DB[(Database / Warehouse)]
Loading

Directory Structure & Responsibilities

Path Component Responsibility
app/interfaces/ The Contract Abstract Base Classes (ABCs). Defines rules that all Connectors must follow.
app/connectors/registry.py The Factory Scans directories and dynamically loads connector plugins at runtime.
app/connectors/sources/ The Plugins Specific implementations (e.g., weather.py).
app/services/engine.py The Orchestrator Coordinates the workflow: Load Connector → Check Rate Limit → Run Pipeline.
app/services/resilience.py The Safety Net Decorators for retries, exponential backoff, and error handling.
app/schemas/ The Envelopes Pydantic models (DTOs) ensuring data moves in a strict, validated format.

Execution Flow

When you run python main.py, the following sequence occurs:

  1. Discovery Phase (Startup):

    • The ConnectorRegistry walks through the app/connectors/sources/ directory.
    • It imports every file it finds and looks for classes inheriting from BaseConnector.
    • It registers them in a dictionary: {"weather": WeatherConnectorClass}.
  2. Job Trigger:

    • main.py sends a Job Payload to the Engine: {"source": "weather"}.
  3. Instantiation:

    • The Engine asks the Registry: "Do you have a connector named 'weather'?"
    • The Registry returns the class. The Engine initializes it.
  4. Guard Rails:

    • The Engine calls RateLimiter.check_limit("weather").
    • If valid: Proceed. If limited: Raise error or re-queue.
  5. Extraction (The "Yield" Loop):

    • The Engine calls connector.run().
    • The Connector hits the external API (e.g., OpenMeteo).
    • Resilience: If the API fails (500/Timeout), the @resilient_request decorator automatically waits and retries (Exponential Backoff).
  6. Transformation & Load:

    • The Connector converts raw JSON into a standardized DataRecord object.
    • It yields the record back to the Engine.
    • The Engine saves the record to the Database/Storage.

Key Design Patterns Used

1. Strategy Pattern & Factory Pattern (The Registry)

  • Concept: The Engine does not know how to fetch weather data. It only knows how to run a BaseConnector.
  • Benefit: You can add a TikTokConnector or SalesforceConnector by simply dropping a file into the folder. No "if/else" statements need to be added to the Engine.

2. Dependency Inversion (SOLID)

  • Concept: High-level modules (Engine) do not depend on low-level modules (weather.py). Both depend on abstractions (BaseConnector).
  • Benefit: Changing the Weather API logic never breaks the Engine.

3. Decorator Pattern (Resilience)

  • Concept: We wrap HTTP calls with @resilient_request.
  • Benefit: Retry logic is separated from business logic. The connector code stays clean and readable.

4. Generator Pattern (Memory Management)

  • Concept: Connectors use yield instead of return list.
  • Benefit: The system streams data item-by-item. It can process 1 million records with the same RAM usage as 1 record.

Extension Guide

To add a new data source:

  1. Create a file in app/connectors/sources/ (e.g., news.py).
  2. Inherit from BaseConnector.
  3. Implement extract() (get data) and transform() (clean data).
  4. Done. The Registry will auto-discover it on the next run.
# Example: app/connectors/sources/news.py
class NewsConnector(BaseConnector):
    @property
    def name(self): return "news_api"
    
    def extract(self, since=None):
        # ... logic ...
        yield item

http://github.com/evolvingsam/ingestion-platform

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment