Last active
June 27, 2018 14:38
-
-
Save johncoder/fb1002336a5ffb05a7c315fafc8c3be8 to your computer and use it in GitHub Desktop.
EventSourcing in Node.js
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version: '3' | |
services: | |
eventstore: | |
container_name: geteventstore | |
image: eventstore/eventstore | |
ports: | |
- "2113:2113" | |
- "1113:1113" | |
volumes: | |
- .:/var/lib/eventstore |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const R = require('ramda'); | |
// TODO(john): Snapshotting | |
// TODO(john): Consider some form of optimistic concurrency? Requires versioning. | |
const symbols = { | |
ok: Symbol('ok'), | |
rejected: Symbol('ok'), | |
failed: Symbol('ok'), | |
void: Symbol('void'), | |
unapplied: Symbol('unapplied'), | |
}; | |
const getState = R.curry(( | |
resume, | |
loadAggregate, | |
id | |
) => { | |
const aggregate = loadAggregate(id); | |
resume(aggregate, null); | |
return aggregate; | |
}); | |
const sendCommand = R.curry(( | |
resume, | |
mutate, | |
loadAggregate, | |
id, | |
command, | |
resolve, | |
reject | |
) => { | |
const aggregate = loadAggregate(id); | |
resume(aggregate, null); | |
const events = []; | |
const result = mutate(f => events.push(f), aggregate, command); | |
if (result !== symbols.ok) { | |
reject(result); | |
return; | |
} | |
// TODO(john): It would be really nice to decouple aggregate from | |
// its events at some ponit. | |
aggregate.events = aggregate.events.concat(events); | |
resume(aggregate, events); | |
resolve(aggregate); | |
}); | |
const commandHandler = R.curry(($type, next, command) => { | |
if ($type !== command.$type) { | |
return null; | |
} | |
return next($type); | |
}); | |
const processCommand = R.curry((handlers, apply, aggregate, command) => { | |
let result = symbols.unapplied; | |
for (let i = 0; i < handlers.length; i++) { | |
const handler = handlers[i](command); | |
if (handler === null) { | |
continue; | |
} | |
result = handler(apply, aggregate, command); | |
if (result !== symbols.unapplied) { | |
return result; | |
} | |
} | |
return result; | |
}); | |
const eventHandler = R.curry(($type, next, input) => { | |
return { | |
state: input.event.$type === $type ? | |
next($type, input.state, input.event) : input.state, | |
event: input.event, | |
}; | |
}); | |
const dispatchEvents = R.curry((handlers, aggregate, event) => { | |
let state = aggregate; | |
for (let i = 0; i < handlers.length; i++) { | |
state = handlers[i]({state, event}).state; | |
} | |
return state; | |
}); | |
const resume = R.curry((handle, aggregate, newEvents) => { | |
// TODO(john): It would be really nice to decouple aggregate from | |
// its events at some point. | |
const events = newEvents || aggregate.events; | |
let state = null; | |
// TODO(john): try replacing with reduce(handle, events, {}) | |
for (let i = 0; i < events.length; i++) { | |
state = handle(state || aggregate, events[i]); | |
} | |
// NOTE(john): This might not work for Mongoose. Must try... | |
return Object.assign(aggregate, state); | |
}); | |
module.exports = { | |
symbols, | |
sendCommand, | |
commandHandler, | |
processCommand, | |
dispatchEvents, | |
eventHandler, | |
resume, | |
getState, | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const Buffer = require('buffer').Buffer; | |
const uuid = require('uuid/v4'); | |
const eventstore = require('eventstore-node'); | |
const eventstoreUrl = process.env.EVENTSTORE_URL || 'tcp://localhost:1113'; | |
const connSettings = {}; | |
const esConnection = eventstore.createConnection(connSettings, eventstoreUrl); | |
esConnection.connect(); | |
esConnection.once('connected', endpoint => { | |
console.log(`Connected to eventstore at ${endpoint.host}:${endpoint.port}`); | |
}); | |
function stream(streamName) { | |
return async function sendMessage(message, type) { | |
const id = uuid(); | |
const event = eventstore.createJsonEventData(id, message, null, type); | |
// console.log('sending...'); | |
process.stdout.write('.'); | |
try { | |
const result = await esConnection.appendToStream(streamName, eventstore.expectedVersion.any, event); | |
// console.log(`Successfully stored ${id}; result=${JSON.stringify(result)}`); | |
process.stdout.write(':'); | |
} catch (exception) { | |
console.error(`Error sending ${id}: ${exception}`); | |
} | |
}; | |
} | |
const send = stream('hello-world'); | |
const writing = process.env.WRITING === 'true'; | |
if (writing) { | |
setInterval(() => { | |
send({ | |
message: 'hello world!', | |
value: uuid(), | |
created: (new Date()).toISOString(), | |
}, 'pulse'); | |
}, 10); | |
} else { | |
setImmediate(async function resume() { | |
try { | |
const start = new Date(); | |
const result = await esConnection | |
.readStreamEventsForward('hello-world', 0, 4096); | |
// console.log(JSON.stringify(result, null, ' ')); | |
const events = result.events.map(e => ({ | |
id: e.event.eventId, | |
number: e.event.eventNumber, | |
type: e.event.eventType, | |
created: e.created, | |
event: JSON.parse(Buffer.from(e.event.data).toString('utf8')), | |
})); | |
// events.forEach(e => console.log(JSON.stringify(e))); | |
let c = 0; | |
events.forEach(() => c++); | |
const finish = new Date(); | |
console.log(`done loading ${c} events in ${finish - start}ms`); | |
} catch (exception) { | |
console.error(`Error resuming: ${exception}`); | |
} | |
}); | |
} | |
console.log('started...'); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const assert = require('assert'); | |
const R = require('ramda'); | |
const es = require('../event-sourcing'); | |
const noopCommandHandler = R.curry(($type, apply, aggregate, command) => { | |
// do nothing? | |
}); | |
describe('event-sourcing', () => { | |
describe('commandHandler', () => { | |
it('returns null for wrong type', () => { | |
const handler = es.commandHandler(1, noopCommandHandler, {$type: 2}); | |
assert.ok(handler === null, 'handler was not null'); | |
}); | |
it('returns a function for correct type', () => { | |
const handler = es.commandHandler(1, noopCommandHandler, {$type: 1}); | |
assert.equal(typeof handler, 'function', 'handler was not a Function'); | |
}); | |
}); | |
describe('processCommand', () => { | |
it('returns symbols.unapplied if no matching command handlers', () => { | |
const process = es.processCommand([ | |
es.commandHandler(1, noopCommandHandler), | |
es.commandHandler(2, noopCommandHandler), | |
es.commandHandler(3, noopCommandHandler), | |
]); | |
const events = []; | |
const result = process(f => events.push(f), {events:[]}, {$type:4}); | |
assert.equal(events.length, 0, 'unexpected events'); | |
assert.equal(result, es.symbols.unapplied); | |
}); | |
it('returns a result if handled', () => { | |
const process = es.processCommand([ | |
es.commandHandler(1, noopCommandHandler), | |
es.commandHandler(2, noopCommandHandler), | |
es.commandHandler(3, R.curry(($type, apply, aggregate, command) => { | |
apply({ | |
$type, | |
message: command.message, | |
}); | |
return es.symbols.ok; | |
})), | |
]); | |
const events = []; | |
const result = process(f => events.push(f), {events:[]}, {$type:3,message:'hi'}); | |
assert.notEqual(result, es.symbols.unapplied, `expected unapplied but got ${result.toString()}`); | |
assert.equal(events.length, 1, `expected one event but found ${events.length}`); | |
assert.equal(events[0].message, 'hi', 'event did not have command message'); | |
}); | |
}); | |
describe('eventHandler', () => { | |
it('returns the original state when event is wrong type', () => { | |
const state = {foo:5}; | |
const input = {state, event: {$type:2, message: 'hi'}}; | |
const result = es.eventHandler(1, | |
(t, s, e) => Object.assign({}, s, e), | |
input); | |
assert.equal(result.state, state, 'state was not the same'); | |
assert.equal(result.state.foo, state.foo, 'property was different'); | |
assert.ok(!result.state.hasOwnProperty('message'), 'had message property'); | |
assert.equal(result.event, input.event); | |
}); | |
it('returns new state when event is correct type', () => { | |
const state = {foo:5}; | |
const message = 'hi'; | |
const input = {state, event: {$type:1, message}}; | |
const result = es.eventHandler(1, | |
(t, s, e) => Object.assign({}, s, e), | |
input); | |
assert.notEqual(result.state, state, 'state was the same'); | |
assert.equal(result.state.foo, state.foo, 'property was different'); | |
assert.equal(result.state.message, message, 'message was not the same'); | |
assert.equal(result.event, input.event); | |
}); | |
}); | |
describe('dispatchEvents', () => { | |
it('handler is invoked for matching type', () => { | |
const aggregate = {events:[]}; | |
const event = {$type:5, message: 'hi'}; | |
const makeTestHandler = R.curry(($type, state, event) => { | |
return Object.assign({}, state, event, {$type}); | |
}); | |
const handlers = [ | |
es.eventHandler(1, makeTestHandler), | |
es.eventHandler(2, makeTestHandler), | |
es.eventHandler(3, makeTestHandler), | |
es.eventHandler(4, makeTestHandler), | |
es.eventHandler(5, makeTestHandler), | |
]; | |
const result = es.dispatchEvents(handlers, aggregate, event); | |
assert.equal(result.$type, event.$type, 'unexpected $type'); | |
}); | |
it('handler is not invoked for no matching types', () => { | |
const aggregate = {events:[]}; | |
const event = {$type:0, message: 'hi'}; | |
const makeTestHandler = R.curry(($type, state, event) => { | |
return Object.assign({}, state, event, {$type}); | |
}); | |
const handlers = [ | |
es.eventHandler(1, makeTestHandler), | |
es.eventHandler(2, makeTestHandler), | |
es.eventHandler(3, makeTestHandler), | |
es.eventHandler(4, makeTestHandler), | |
es.eventHandler(5, makeTestHandler), | |
]; | |
const result = es.dispatchEvents(handlers, aggregate, event); | |
assert.ok(!result.hasOwnProperty('$type'), 'had property named $type'); | |
}); | |
it('handler is invoked for all matching types', () => { | |
const aggregate = {events:[]}; | |
const event = {$type:1, message: 'hi'}; | |
let count = 0; | |
const makeTestHandler = R.curry(($type, state, event) => { | |
count++; | |
return Object.assign({}, state, event, {$type}); | |
}); | |
const handlers = [ | |
es.eventHandler(1, makeTestHandler), | |
es.eventHandler(1, makeTestHandler), | |
es.eventHandler(1, makeTestHandler), | |
es.eventHandler(1, makeTestHandler), | |
es.eventHandler(1, makeTestHandler), | |
]; | |
const result = es.dispatchEvents(handlers, aggregate, event); | |
assert.equal(count, handlers.length, 'was not called for all handlers'); | |
}); | |
}); | |
describe('resume', () => { | |
it('invokes each handler for each event on aggregate', () => { | |
const aggregate = { | |
foo: 'bar', | |
}; | |
const handler = R.curry((prop, $type, state, event) => { | |
const obj = {}; | |
obj[prop] = event[prop]; | |
return Object.assign(obj, state); | |
}); | |
const handlers = es.dispatchEvents([ | |
es.eventHandler(1, handler('one')), | |
es.eventHandler(2, handler('two')), | |
es.eventHandler(3, handler('three')), | |
es.eventHandler(4, handler('four')), | |
es.eventHandler(5, handler('five')), | |
]); | |
const result = es.resume(handlers, aggregate, [ | |
{$type: 1, one: '1'}, | |
{$type: 2, two: '2'}, | |
{$type: 3, three: '3'}, | |
{$type: 4, four: '4'}, | |
{$type: 5, five: '5'}, | |
]); | |
assert.equal(result.foo, aggregate.foo); | |
assert.equal(result.one, '1'); | |
assert.equal(result.two, '2'); | |
assert.equal(result.three, '3'); | |
assert.equal(result.four, '4'); | |
assert.equal(result.five, '5'); | |
assert.ok(!result.hasOwnProperty('$type')); | |
}); | |
}); | |
describe('sendCommand', () => { | |
it('is unapplied if no command handlers are called', () => { | |
const handlers = es.dispatchEvents([]); | |
const resume = es.resume(handlers); | |
const process = es.processCommand([ | |
es.commandHandler(1, noopCommandHandler), | |
es.commandHandler(2, noopCommandHandler), | |
es.commandHandler(3, R.curry(($type, apply, aggregate, command) => { | |
apply({ | |
$type, | |
message: command.message, | |
}); | |
return es.symbols.ok; | |
})), | |
]); | |
const aggregate = { | |
events: [ | |
{$type: 8, message: 'handled'}, | |
], | |
}; | |
const loadAggregate = () => { | |
return aggregate; | |
}; | |
const sendCommand = es.sendCommand(resume, process, loadAggregate); | |
const id = 99; | |
const command = {$type: 8, message: 'unused'}; | |
const resolve = result => { | |
assert.fail(result, 'did not expect to get here'); | |
}; | |
const reject = result => { | |
assert.equal(result, es.symbols.unapplied, 'command was unexpectedly applied'); | |
}; | |
sendCommand(id, command, resolve, reject); | |
}); | |
it('is applied if command handlers are called', () => { | |
const handler = ($type, state, event) => { | |
return Object.assign(state, event); | |
}; | |
const handlers = es.dispatchEvents([ | |
es.eventHandler(1, handler), | |
es.eventHandler(2, handler), | |
es.eventHandler(3, handler), | |
]); | |
const resume = es.resume(handlers); | |
const process = es.processCommand([ | |
es.commandHandler(2, R.curry(($type, apply, aggregate, command) => { | |
apply({ | |
$type, | |
message: command.message, | |
}); | |
return es.symbols.ok; | |
})), | |
]); | |
const aggregate = { | |
events: [ | |
{$type: 1, message: 'handled', one: '1'}, | |
], | |
}; | |
const loadAggregate = () => { | |
return aggregate; | |
}; | |
const sendCommand = es.sendCommand(resume, process, loadAggregate); | |
const id = 99; | |
const command = {$type: 2, message: 'command'}; | |
const resolve = result => { | |
assert.equal(result.message, command.message, 'did not get message from command'); | |
assert.equal(result.one, aggregate.events[0].one, 'did not get previous state'); | |
}; | |
const reject = result => { | |
assert.fail(result, 'did not expect to ge there'); | |
}; | |
sendCommand(id, command, resolve, reject); | |
}); | |
}); | |
describe('getState', () => { | |
it('returns original object if no handlers', () => { | |
const aggregate = { | |
foo: 'bar', | |
events: [] | |
}; | |
const loadAggregate = () => aggregate; | |
const handler = ($type, state, event) => { | |
return Object.assign({}, state, event); | |
}; | |
const resume = es.resume(es.dispatchEvents([ | |
es.eventHandler(1, handler), | |
es.eventHandler(2, handler), | |
es.eventHandler(3, handler), | |
])); | |
const id = 99; | |
const result = es.getState(resume, loadAggregate, id); | |
assert.deepEqual(result, {foo: aggregate.foo, events: []}); | |
}); | |
it('applies events', () => { | |
const aggregate = { | |
foo: 'bar', | |
events: [ | |
{$type: 1, one: '1'}, | |
{$type: 2, two: '2'}, | |
{$type: 3, three: '3'}, | |
] | |
}; | |
const loadAggregate = () => aggregate; | |
const handler = ($type, state, event) => { | |
let result = Object.assign({}, state, event); | |
delete result.$type; | |
return result; | |
}; | |
const resume = es.resume(es.dispatchEvents([ | |
es.eventHandler(1, handler), | |
es.eventHandler(2, handler), | |
es.eventHandler(3, handler), | |
])); | |
const id = 99; | |
const result = es.getState(resume, loadAggregate, id); | |
assert.deepEqual(result, { | |
one: '1', | |
two: '2', | |
three: '3', | |
foo: 'bar', | |
events: aggregate.events, | |
}, 'result did not have all expected data'); | |
}); | |
}); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment