Skip to content

Instantly share code, notes, and snippets.

@johncoder
Last active June 27, 2018 14:38
Show Gist options
  • Save johncoder/fb1002336a5ffb05a7c315fafc8c3be8 to your computer and use it in GitHub Desktop.
Save johncoder/fb1002336a5ffb05a7c315fafc8c3be8 to your computer and use it in GitHub Desktop.
EventSourcing in Node.js
version: '3'
services:
eventstore:
container_name: geteventstore
image: eventstore/eventstore
ports:
- "2113:2113"
- "1113:1113"
volumes:
- .:/var/lib/eventstore
const R = require('ramda');
// TODO(john): Snapshotting
// TODO(john): Consider some form of optimistic concurrency? Requires versioning.
const symbols = {
ok: Symbol('ok'),
rejected: Symbol('ok'),
failed: Symbol('ok'),
void: Symbol('void'),
unapplied: Symbol('unapplied'),
};
const getState = R.curry((
resume,
loadAggregate,
id
) => {
const aggregate = loadAggregate(id);
resume(aggregate, null);
return aggregate;
});
const sendCommand = R.curry((
resume,
mutate,
loadAggregate,
id,
command,
resolve,
reject
) => {
const aggregate = loadAggregate(id);
resume(aggregate, null);
const events = [];
const result = mutate(f => events.push(f), aggregate, command);
if (result !== symbols.ok) {
reject(result);
return;
}
// TODO(john): It would be really nice to decouple aggregate from
// its events at some ponit.
aggregate.events = aggregate.events.concat(events);
resume(aggregate, events);
resolve(aggregate);
});
const commandHandler = R.curry(($type, next, command) => {
if ($type !== command.$type) {
return null;
}
return next($type);
});
const processCommand = R.curry((handlers, apply, aggregate, command) => {
let result = symbols.unapplied;
for (let i = 0; i < handlers.length; i++) {
const handler = handlers[i](command);
if (handler === null) {
continue;
}
result = handler(apply, aggregate, command);
if (result !== symbols.unapplied) {
return result;
}
}
return result;
});
const eventHandler = R.curry(($type, next, input) => {
return {
state: input.event.$type === $type ?
next($type, input.state, input.event) : input.state,
event: input.event,
};
});
const dispatchEvents = R.curry((handlers, aggregate, event) => {
let state = aggregate;
for (let i = 0; i < handlers.length; i++) {
state = handlers[i]({state, event}).state;
}
return state;
});
const resume = R.curry((handle, aggregate, newEvents) => {
// TODO(john): It would be really nice to decouple aggregate from
// its events at some point.
const events = newEvents || aggregate.events;
let state = null;
// TODO(john): try replacing with reduce(handle, events, {})
for (let i = 0; i < events.length; i++) {
state = handle(state || aggregate, events[i]);
}
// NOTE(john): This might not work for Mongoose. Must try...
return Object.assign(aggregate, state);
});
module.exports = {
symbols,
sendCommand,
commandHandler,
processCommand,
dispatchEvents,
eventHandler,
resume,
getState,
};
const Buffer = require('buffer').Buffer;
const uuid = require('uuid/v4');
const eventstore = require('eventstore-node');
const eventstoreUrl = process.env.EVENTSTORE_URL || 'tcp://localhost:1113';
const connSettings = {};
const esConnection = eventstore.createConnection(connSettings, eventstoreUrl);
esConnection.connect();
esConnection.once('connected', endpoint => {
console.log(`Connected to eventstore at ${endpoint.host}:${endpoint.port}`);
});
function stream(streamName) {
return async function sendMessage(message, type) {
const id = uuid();
const event = eventstore.createJsonEventData(id, message, null, type);
// console.log('sending...');
process.stdout.write('.');
try {
const result = await esConnection.appendToStream(streamName, eventstore.expectedVersion.any, event);
// console.log(`Successfully stored ${id}; result=${JSON.stringify(result)}`);
process.stdout.write(':');
} catch (exception) {
console.error(`Error sending ${id}: ${exception}`);
}
};
}
const send = stream('hello-world');
const writing = process.env.WRITING === 'true';
if (writing) {
setInterval(() => {
send({
message: 'hello world!',
value: uuid(),
created: (new Date()).toISOString(),
}, 'pulse');
}, 10);
} else {
setImmediate(async function resume() {
try {
const start = new Date();
const result = await esConnection
.readStreamEventsForward('hello-world', 0, 4096);
// console.log(JSON.stringify(result, null, ' '));
const events = result.events.map(e => ({
id: e.event.eventId,
number: e.event.eventNumber,
type: e.event.eventType,
created: e.created,
event: JSON.parse(Buffer.from(e.event.data).toString('utf8')),
}));
// events.forEach(e => console.log(JSON.stringify(e)));
let c = 0;
events.forEach(() => c++);
const finish = new Date();
console.log(`done loading ${c} events in ${finish - start}ms`);
} catch (exception) {
console.error(`Error resuming: ${exception}`);
}
});
}
console.log('started...');
const assert = require('assert');
const R = require('ramda');
const es = require('../event-sourcing');
const noopCommandHandler = R.curry(($type, apply, aggregate, command) => {
// do nothing?
});
describe('event-sourcing', () => {
describe('commandHandler', () => {
it('returns null for wrong type', () => {
const handler = es.commandHandler(1, noopCommandHandler, {$type: 2});
assert.ok(handler === null, 'handler was not null');
});
it('returns a function for correct type', () => {
const handler = es.commandHandler(1, noopCommandHandler, {$type: 1});
assert.equal(typeof handler, 'function', 'handler was not a Function');
});
});
describe('processCommand', () => {
it('returns symbols.unapplied if no matching command handlers', () => {
const process = es.processCommand([
es.commandHandler(1, noopCommandHandler),
es.commandHandler(2, noopCommandHandler),
es.commandHandler(3, noopCommandHandler),
]);
const events = [];
const result = process(f => events.push(f), {events:[]}, {$type:4});
assert.equal(events.length, 0, 'unexpected events');
assert.equal(result, es.symbols.unapplied);
});
it('returns a result if handled', () => {
const process = es.processCommand([
es.commandHandler(1, noopCommandHandler),
es.commandHandler(2, noopCommandHandler),
es.commandHandler(3, R.curry(($type, apply, aggregate, command) => {
apply({
$type,
message: command.message,
});
return es.symbols.ok;
})),
]);
const events = [];
const result = process(f => events.push(f), {events:[]}, {$type:3,message:'hi'});
assert.notEqual(result, es.symbols.unapplied, `expected unapplied but got ${result.toString()}`);
assert.equal(events.length, 1, `expected one event but found ${events.length}`);
assert.equal(events[0].message, 'hi', 'event did not have command message');
});
});
describe('eventHandler', () => {
it('returns the original state when event is wrong type', () => {
const state = {foo:5};
const input = {state, event: {$type:2, message: 'hi'}};
const result = es.eventHandler(1,
(t, s, e) => Object.assign({}, s, e),
input);
assert.equal(result.state, state, 'state was not the same');
assert.equal(result.state.foo, state.foo, 'property was different');
assert.ok(!result.state.hasOwnProperty('message'), 'had message property');
assert.equal(result.event, input.event);
});
it('returns new state when event is correct type', () => {
const state = {foo:5};
const message = 'hi';
const input = {state, event: {$type:1, message}};
const result = es.eventHandler(1,
(t, s, e) => Object.assign({}, s, e),
input);
assert.notEqual(result.state, state, 'state was the same');
assert.equal(result.state.foo, state.foo, 'property was different');
assert.equal(result.state.message, message, 'message was not the same');
assert.equal(result.event, input.event);
});
});
describe('dispatchEvents', () => {
it('handler is invoked for matching type', () => {
const aggregate = {events:[]};
const event = {$type:5, message: 'hi'};
const makeTestHandler = R.curry(($type, state, event) => {
return Object.assign({}, state, event, {$type});
});
const handlers = [
es.eventHandler(1, makeTestHandler),
es.eventHandler(2, makeTestHandler),
es.eventHandler(3, makeTestHandler),
es.eventHandler(4, makeTestHandler),
es.eventHandler(5, makeTestHandler),
];
const result = es.dispatchEvents(handlers, aggregate, event);
assert.equal(result.$type, event.$type, 'unexpected $type');
});
it('handler is not invoked for no matching types', () => {
const aggregate = {events:[]};
const event = {$type:0, message: 'hi'};
const makeTestHandler = R.curry(($type, state, event) => {
return Object.assign({}, state, event, {$type});
});
const handlers = [
es.eventHandler(1, makeTestHandler),
es.eventHandler(2, makeTestHandler),
es.eventHandler(3, makeTestHandler),
es.eventHandler(4, makeTestHandler),
es.eventHandler(5, makeTestHandler),
];
const result = es.dispatchEvents(handlers, aggregate, event);
assert.ok(!result.hasOwnProperty('$type'), 'had property named $type');
});
it('handler is invoked for all matching types', () => {
const aggregate = {events:[]};
const event = {$type:1, message: 'hi'};
let count = 0;
const makeTestHandler = R.curry(($type, state, event) => {
count++;
return Object.assign({}, state, event, {$type});
});
const handlers = [
es.eventHandler(1, makeTestHandler),
es.eventHandler(1, makeTestHandler),
es.eventHandler(1, makeTestHandler),
es.eventHandler(1, makeTestHandler),
es.eventHandler(1, makeTestHandler),
];
const result = es.dispatchEvents(handlers, aggregate, event);
assert.equal(count, handlers.length, 'was not called for all handlers');
});
});
describe('resume', () => {
it('invokes each handler for each event on aggregate', () => {
const aggregate = {
foo: 'bar',
};
const handler = R.curry((prop, $type, state, event) => {
const obj = {};
obj[prop] = event[prop];
return Object.assign(obj, state);
});
const handlers = es.dispatchEvents([
es.eventHandler(1, handler('one')),
es.eventHandler(2, handler('two')),
es.eventHandler(3, handler('three')),
es.eventHandler(4, handler('four')),
es.eventHandler(5, handler('five')),
]);
const result = es.resume(handlers, aggregate, [
{$type: 1, one: '1'},
{$type: 2, two: '2'},
{$type: 3, three: '3'},
{$type: 4, four: '4'},
{$type: 5, five: '5'},
]);
assert.equal(result.foo, aggregate.foo);
assert.equal(result.one, '1');
assert.equal(result.two, '2');
assert.equal(result.three, '3');
assert.equal(result.four, '4');
assert.equal(result.five, '5');
assert.ok(!result.hasOwnProperty('$type'));
});
});
describe('sendCommand', () => {
it('is unapplied if no command handlers are called', () => {
const handlers = es.dispatchEvents([]);
const resume = es.resume(handlers);
const process = es.processCommand([
es.commandHandler(1, noopCommandHandler),
es.commandHandler(2, noopCommandHandler),
es.commandHandler(3, R.curry(($type, apply, aggregate, command) => {
apply({
$type,
message: command.message,
});
return es.symbols.ok;
})),
]);
const aggregate = {
events: [
{$type: 8, message: 'handled'},
],
};
const loadAggregate = () => {
return aggregate;
};
const sendCommand = es.sendCommand(resume, process, loadAggregate);
const id = 99;
const command = {$type: 8, message: 'unused'};
const resolve = result => {
assert.fail(result, 'did not expect to get here');
};
const reject = result => {
assert.equal(result, es.symbols.unapplied, 'command was unexpectedly applied');
};
sendCommand(id, command, resolve, reject);
});
it('is applied if command handlers are called', () => {
const handler = ($type, state, event) => {
return Object.assign(state, event);
};
const handlers = es.dispatchEvents([
es.eventHandler(1, handler),
es.eventHandler(2, handler),
es.eventHandler(3, handler),
]);
const resume = es.resume(handlers);
const process = es.processCommand([
es.commandHandler(2, R.curry(($type, apply, aggregate, command) => {
apply({
$type,
message: command.message,
});
return es.symbols.ok;
})),
]);
const aggregate = {
events: [
{$type: 1, message: 'handled', one: '1'},
],
};
const loadAggregate = () => {
return aggregate;
};
const sendCommand = es.sendCommand(resume, process, loadAggregate);
const id = 99;
const command = {$type: 2, message: 'command'};
const resolve = result => {
assert.equal(result.message, command.message, 'did not get message from command');
assert.equal(result.one, aggregate.events[0].one, 'did not get previous state');
};
const reject = result => {
assert.fail(result, 'did not expect to ge there');
};
sendCommand(id, command, resolve, reject);
});
});
describe('getState', () => {
it('returns original object if no handlers', () => {
const aggregate = {
foo: 'bar',
events: []
};
const loadAggregate = () => aggregate;
const handler = ($type, state, event) => {
return Object.assign({}, state, event);
};
const resume = es.resume(es.dispatchEvents([
es.eventHandler(1, handler),
es.eventHandler(2, handler),
es.eventHandler(3, handler),
]));
const id = 99;
const result = es.getState(resume, loadAggregate, id);
assert.deepEqual(result, {foo: aggregate.foo, events: []});
});
it('applies events', () => {
const aggregate = {
foo: 'bar',
events: [
{$type: 1, one: '1'},
{$type: 2, two: '2'},
{$type: 3, three: '3'},
]
};
const loadAggregate = () => aggregate;
const handler = ($type, state, event) => {
let result = Object.assign({}, state, event);
delete result.$type;
return result;
};
const resume = es.resume(es.dispatchEvents([
es.eventHandler(1, handler),
es.eventHandler(2, handler),
es.eventHandler(3, handler),
]));
const id = 99;
const result = es.getState(resume, loadAggregate, id);
assert.deepEqual(result, {
one: '1',
two: '2',
three: '3',
foo: 'bar',
events: aggregate.events,
}, 'result did not have all expected data');
});
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment