Created
August 3, 2022 19:53
-
-
Save rmolinamir/b269420e93c63aede94d01384ff0bc0b to your computer and use it in GitHub Desktop.
Projector.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { ArgumentInvalidException, DomainEvent } from '@ddd-framework/core'; | |
import { | |
CheckpointStore, | |
Projection, | |
WritableEventStream | |
} from '@ddd-framework/eventsourcing'; | |
import { | |
AllStreamResolvedEvent, | |
AllStreamSubscription, | |
EventStoreDBClient, | |
excludeSystemEvents, | |
Position, | |
START | |
} from '@eventstore/db-client'; | |
import EsStoredEvent from '../../../../write_model/infrastructure/adapters/event_store/EsStoredEvent'; | |
import CheckpointId from '../adapters/event_store/CheckpointId'; | |
import EsCheckpoint from '../adapters/event_store/EsCheckpoint'; | |
export default abstract class Projector { | |
protected abstract projection: Projection; | |
protected abstract checkpointStore: CheckpointStore<EsCheckpoint>; | |
private client: EventStoreDBClient; | |
private subscription: AllStreamSubscription | null = null; | |
constructor() { | |
this.client = EventStoreDBClient.connectionString( | |
// TODO: parameterize through config manager provider | |
'esdb://eventstore-db:2113?tls=false&keepAliveTimeout=10000&keepAliveInterval=10000' | |
); | |
} | |
/** | |
* Pause the EventStream subscription. | |
*/ | |
public async pause(): Promise<void> { | |
this.subscription?.pause(); | |
} | |
/** | |
* Resume the EventStream subscription. | |
*/ | |
public async resume(): Promise<void> { | |
this.subscription?.resume(); | |
} | |
/** | |
* Restart the EventStream subscription. | |
*/ | |
public async restart(): Promise<void> { | |
await this.stop(); | |
await this.start(); | |
} | |
/** | |
* Rewind the EventStream subscription. | |
*/ | |
public async rewind(): Promise<void> { | |
const checkpointId = Projector.getCheckpointIdOf(this.projection); | |
await this.checkpointStore.delete(checkpointId); | |
await this.restart(); | |
} | |
/** | |
* Start the EventStream subscription. | |
*/ | |
public async start(): Promise<void> { | |
await this.subscribe(); | |
} | |
/** | |
* Stop the EventStream subscription. | |
*/ | |
public async stop(): Promise<void> { | |
await this.subscription?.unsubscribe(); | |
} | |
/** | |
* Create an EventStore Subscription using the EventStoreDBClient, starting | |
* at the persisted checkpoint if any. | |
*/ | |
private async subscribe(): Promise<void> { | |
const checkpointId = Projector.getCheckpointIdOf(this.projection); | |
console.log('checkpointId: ', checkpointId); | |
const aStoredCheckpoint = await this.checkpointStore.get(checkpointId); | |
const startingPosition = aStoredCheckpoint | |
? EsCheckpoint.deserialize(aStoredCheckpoint) | |
: START; | |
const subscription = this.client.subscribeToAll({ | |
fromPosition: startingPosition, | |
filter: excludeSystemEvents({ | |
checkpointInterval: Projector.OPTIONS.checkpointInterval, | |
checkpointReached: async (_, aPosition) => | |
await this.onCheckpointReached(aPosition) | |
}) | |
}); | |
const eventStream = subscription.pipe( | |
new WritableEventStream<DomainEvent, AllStreamResolvedEvent>( | |
({ event }) => { | |
if (!event?.isJson) | |
throw new ArgumentInvalidException( | |
'chunk', | |
`Invalid EventStore resolved event:\n${JSON.stringify( | |
event, | |
null, | |
2 | |
)}` | |
); | |
return EsStoredEvent.deserialize(event); | |
} | |
) | |
); | |
this.subscription = subscription; | |
console.log('this.subscription: ', this.subscription); | |
for await (const anEvent of eventStream) | |
await this.projection.project(anEvent); | |
} | |
private async onCheckpointReached(aPosition: Position): Promise<void> { | |
const checkpointId = Projector.getCheckpointIdOf(this.projection); | |
// The subscription will wait until the promise is resolved | |
await this.checkpointStore.store( | |
EsCheckpoint.serialize(checkpointId, aPosition) | |
); | |
} | |
public static readonly PROJECTIONS_TOKEN = 'PROJECTIONS_TOKEN'; | |
private static readonly OPTIONS = { | |
/** | |
* Sets how often the checkpointReached callback is called. | |
* Must be greater than 0. | |
* TODO: Increase to something bigger, default is 32 | |
*/ | |
checkpointInterval: 16 | |
}; | |
/** | |
* | |
*/ | |
private static getCheckpointIdOf(aProjection: Projection): CheckpointId { | |
return new CheckpointId(aProjection.constructor.name); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment