# AquaForge Utility Data Lake (`data_lake`)
> **Hexagonally-architected telemetry platform for municipal & industrial utilities.**
> Extract, cleanse, transform, and surface multi-modal data (water, power, gas, waste, steam) at utility scale—without coupling business rules to infrastructure.



---
## Table of Contents
1. [Why AquaForge?](#why-aquaforge)
2. [Features](#features)
3. [Architecture](#architecture)
4. [Folder Structure](#folder-structure)
5. [Getting Started](#getting-started)
6. [Usage Examples](#usage-examples)
7. [Extending the Platform](#extending-the-platform)
8. [Testing & Quality](#testing--quality)
9. [Production Deployment](#production-deployment)
10. [FAQ](#faq)
11. [Contributing](#contributing)
12. [License](#license)
---
## Why AquaForge?
Traditional data-lake stacks tend to either:
* Couple domain logic to external infrastructure (difficult to swap cloud providers or field gateways).
* Focus on a single telemetry modality (e.g., **just** water or **just** power) and break when regulatory rules change.
* Trade stream processing performance for nightly batch compliance or vice-versa.
AquaForge fixes these shortcomings by applying **Hexagonal Architecture** to the utility space:
* **Core domain** (metering, SCADA, regulatory data-quality) lives in *pure TypeScript* modules—no direct Kafka/S3/etc. imports.
* **Adapters** (Kafka, S3, OPC-UA, MQTT, TimescaleDB, Grafana, …) wrap infrastructure and are hot-swappable.
* **Pipelines** manage ingestion → validation → transformation → storage via composable strategy chains.
* **Observer** side-cars publish metrics, lineage, and quality outcomes to Prometheus and OpenTelemetry.
---
## Features
| Category | Highlights |
|-----------------------|------------------------------------------------------------------------------------|
| Ingestion | High-throughput Kafka consumer, OPC-UA SCADA listener, batch CSV/SFTP importer |
| Validation | Declarative rule engine: schema, range, temporal consistency, regulatory thresholds|
| Transformation | Strategy-pattern ETL steps, versioned and hot-deployable |
| Storage | Iceberg/S3 lakehouse, TimescaleDB, Parquet partitioning, auto-compaction |
| Visualization | Grafana dashboards, Prometheus metrics, data-lineage explorer |
| Scheduling | cron/Quartz powered—supports pipelined & event-driven schedules |
| Observability | Distributed tracing with OpenTelemetry, per-record audit trail |
| Resilience | Exactly-once semantics, message replay, quorum-based error recovery |
---
## Architecture
┌──────────────┐
│ External IoT │
│ Gateways │
└──────┬───────┘
│
(Adapters: OPC-UA, MQTT)
│
┌─────────┐ ┌────────────────▼─────────────┐ ┌─────────────┐ │ Schedulers│ │ Ingestion Ports (hexagon) │ │ Batch Files │ │ cron/evt │ │ ──────────────── │ └─────────────┘ └─────┬────┘ │ KafkaTopicPort │ │ │ FileImportPort │ │ └─────────┬──────────────────┘ │ │ ▼ ▼ ┌────────────┐ ┌─────────────┐ │ Validator │ │ Transformer │ (Pipeline Pattern) └────┬───────┘ └────┬────────┘ │ │ ▼ ▼ ┌─────────────────────────────────────┐ │ Lake Storage (S3 / Iceberg) │ (Infrastructure Adapter) └─────────────────────────────────────┘ ▲ ▲ │ │ ┌─────┴─────┐ ┌─────┴──────┐ │ Grafana │ │ OTEL/Prom │ └────────────┘ └────────────┘
Key Principles:
1. **Dependency-inversion**: core never imports external libs (Kafka, AWS SDK, etc.).
2. **Strategy + Pipeline**: each ETL phase chooses a concrete strategy at runtime.
3. **Event Sourcing**: every state mutation generates an immutable event for replay.
4. **Fail-Fast & Recover**: poisoned messages routed to a dead-letter queue with retry policies.
---
## Folder Structure
data_lake/ ├── apps/ # runnable micro-services (ingest, transform, visualize, …) │ └── ingest-kafka/ ├── core/ # pure domain logic (no frameworks) │ ├── metering/ │ ├── validation/ │ └── transformation/ ├── adapters/ # infrastructure (Kafka, S3, Timescale, …) │ └── s3/ ├── packages/ # shared libraries │ ├── config/ │ ├── logger/ │ └── utils/ ├── scripts/ # one-off operational scripts ├── docs/ # ADRs, diagrams ├── tests/ # unit + e2e tests ├── docker/ # Dockerfiles & compose └── README.md
---
## Getting Started
### 1. Prerequisites
* Node.js ≥ 18.x (LTS)
* Docker ≥ 24.x (for local Kafka, S3, Timescale)
* `pnpm` (preferred) or `npm`, `yarn`
### 2. Clone & Install
```bash
git clone https://github.com/aquaforge/data_lake.git
cd data_lake
pnpm install
# spins up Kafka, MinIO (S3-compatible), TimescaleDB, Grafana
docker compose -f docker/compose.local.yml up -dpnpm dlake:seed # publish mock SCADA & metering messages to Kafkapnpm --filter @aquaforge/ingest-kafka devLogs should show messages flowing to the lakehouse and Grafana dashboards lighting up.
import { IngestionPipeline } from '@aquaforge/core/pipeline';
import { SchemaValidator } from '@aquaforge/core/validation';
import { RegulatoryValidator } from '@aquaforge/validation-regulator';
import { IcebergSink } from '@aquaforge/adapters-iceberg';
const pipeline = new IngestionPipeline()
.use(new SchemaValidator('meter_reading_v2.json'))
.use(new RegulatoryValidator({ region: 'EU' }))
.use(new IcebergSink({
bucket: 'utility-lake',
table: 'meter_readings',
partitionBy: ['utility_id', 'reading_date'],
}));
pipeline.on('error', (err) => {
console.error('💥 Pipeline failed', err);
/* potential hook: alerting, DLQ publish, etc. */
});
await pipeline.process(batchOfRecords);pnpm dlake:validate ./samples/eu_meters_2023Q1.csv --region EU --schema meter_reading_v2.jsonimport { ValidationContext, ValidationRule } from '@aquaforge/core/validation';
export class PressureRangeRule implements ValidationRule {
public readonly name = 'pressure-range';
public readonly description = 'Ensures pressure (kPa) within safe operating limits';
async validate(ctx: ValidationContext): Promise<void> {
const pressure: number = ctx.record.get('pressure_kpa');
if (pressure < 200 || pressure > 500) {
ctx.reject(`Pressure out of bounds: ${pressure} kPa`);
}
}
}Register your rule in core/validation/rules/index.ts:
export * from './pressure-range-rule';Then add to a pipeline:
pipeline.use(new PressureRangeRule());All sinks must implement the SinkPort interface:
export interface SinkPort {
write(records: DomainRecord[]): Promise<void>;
flush?(): Promise<void>;
close?(): Promise<void>;
}Example TimescaleDB sink in adapters/timescaledb/timescale-sink.ts.
- Unit tests:
pnpm test:unit(vitest + ts-mockito) - Integration tests:
pnpm test:int(spawns dockerized infra) - Coverage:
pnpm coverage - Linting:
pnpm lint - Conventional commits enforced via Husky + Commitlint
- Code-style: ESLint + Prettier (auto-fixed on commit)
- Automated CI/CD: GitHub Actions → SonarCloud → Container registry
- Build & push container images:
pnpm release:build
docker push ghcr.io/aquaforge/ingest-kafka:$(git rev-parse --short HEAD)- Apply Helm chart:
helm upgrade --install aqua-ingest ./helm/ingest \
--set image.tag=$(git rev-parse --short HEAD) \
-f ./helm/values/prod.yaml- Observe:
- Grafana →
Ingestion Overview - Prometheus →
job="aqua_ingest" - Loki → aggregated logs
Zero-downtime achieved via rolling deployments and consumer group rebalance.
Q: Does AquaForge support Azure Blob or Google Cloud Storage?
A: Yes—bring your own adapter implementing ObjectStoragePort; see adapters/gcs for reference.
Q: How do I replay failed messages?
A: Use the built-in DLQ CLI: pnpm dlake:replay --from 2023-08-01 --to 2023-08-02.
Q: Is the core domain library framework-agnostic?
A: 100 %. It compiles to pure ESM with zero runtime dependencies other than the TypeScript standard lib.
- Fork & clone repo.
- Create feature branch
feat/<ticket-id>-brief-description. - Commit using conventional style (
feat:,fix:,docs:…). - Run
pnpm ci(ensures tests, lint, types pass). - Open PR—CI must pass before review.
All contributions must follow the Community Code of Conduct (see CODE_OF_CONDUCT.md).
Apache 2.0 © AquaForge Maintainers