graph TD
User[Main / Scheduler] -->|Job Payload| Engine[Ingestion Engine]
subgraph Core Logic
Engine -->|Check Limit| RateLimiter[Rate Limiter]
Engine -->|Request Class| Registry[Connector Registry]
end
subgraph Plugins
Registry -->|Loads| Weather[Weather Connector]
end
subgraph External
Weather -->|HTTP| OpenMeteo[OpenMeteo API]
end
Weather -->|Yields DataRecord| Engine
Engine -->|Saves| DB[(Database / Warehouse)]
| Path | Component | Responsibility |
|---|---|---|
app/interfaces/ |
The Contract | Abstract Base Classes (ABCs). Defines rules that all Connectors must follow. |
app/connectors/registry.py |
The Factory | Scans directories and dynamically loads connector plugins at runtime. |
app/connectors/sources/ |
The Plugins | Specific implementations (e.g., weather.py). |
app/services/engine.py |
The Orchestrator | Coordinates the workflow: Load Connector → Check Rate Limit → Run Pipeline. |
app/services/resilience.py |
The Safety Net | Decorators for retries, exponential backoff, and error handling. |
app/schemas/ |
The Envelopes | Pydantic models (DTOs) ensuring data moves in a strict, validated format. |
When you run python main.py, the following sequence occurs:
-
Discovery Phase (Startup):
- The
ConnectorRegistrywalks through theapp/connectors/sources/directory. - It imports every file it finds and looks for classes inheriting from
BaseConnector. - It registers them in a dictionary:
{"weather": WeatherConnectorClass}.
- The
-
Job Trigger:
main.pysends a Job Payload to the Engine:{"source": "weather"}.
-
Instantiation:
- The Engine asks the Registry: "Do you have a connector named 'weather'?"
- The Registry returns the class. The Engine initializes it.
-
Guard Rails:
- The Engine calls
RateLimiter.check_limit("weather"). - If valid: Proceed. If limited: Raise error or re-queue.
- The Engine calls
-
Extraction (The "Yield" Loop):
- The Engine calls
connector.run(). - The Connector hits the external API (e.g., OpenMeteo).
- Resilience: If the API fails (500/Timeout), the
@resilient_requestdecorator automatically waits and retries (Exponential Backoff).
- The Engine calls
-
Transformation & Load:
- The Connector converts raw JSON into a standardized
DataRecordobject. - It
yieldsthe record back to the Engine. - The Engine saves the record to the Database/Storage.
- The Connector converts raw JSON into a standardized
- Concept: The Engine does not know how to fetch weather data. It only knows how to run a
BaseConnector. - Benefit: You can add a
TikTokConnectororSalesforceConnectorby simply dropping a file into the folder. No "if/else" statements need to be added to the Engine.
- Concept: High-level modules (
Engine) do not depend on low-level modules (weather.py). Both depend on abstractions (BaseConnector). - Benefit: Changing the Weather API logic never breaks the Engine.
- Concept: We wrap HTTP calls with
@resilient_request. - Benefit: Retry logic is separated from business logic. The connector code stays clean and readable.
- Concept: Connectors use
yieldinstead ofreturn list. - Benefit: The system streams data item-by-item. It can process 1 million records with the same RAM usage as 1 record.
To add a new data source:
- Create a file in
app/connectors/sources/(e.g.,news.py). - Inherit from
BaseConnector. - Implement
extract()(get data) andtransform()(clean data). - Done. The Registry will auto-discover it on the next run.
# Example: app/connectors/sources/news.py
class NewsConnector(BaseConnector):
@property
def name(self): return "news_api"
def extract(self, since=None):
# ... logic ...
yield item