Skip to content

Instantly share code, notes, and snippets.

Last active June 27, 2018 14:38
Show Gist options
  • Save johncoder/fb1002336a5ffb05a7c315fafc8c3be8 to your computer and use it in GitHub Desktop.
Save johncoder/fb1002336a5ffb05a7c315fafc8c3be8 to your computer and use it in GitHub Desktop.
EventSourcing in Node.js
version: '3'
container_name: geteventstore
image: eventstore/eventstore
- "2113:2113"
- "1113:1113"
- .:/var/lib/eventstore
const R = require('ramda');
// TODO(john): Snapshotting
// TODO(john): Consider some form of optimistic concurrency? Requires versioning.
const symbols = {
ok: Symbol('ok'),
rejected: Symbol('ok'),
failed: Symbol('ok'),
void: Symbol('void'),
unapplied: Symbol('unapplied'),
const getState = R.curry((
) => {
const aggregate = loadAggregate(id);
resume(aggregate, null);
return aggregate;
const sendCommand = R.curry((
) => {
const aggregate = loadAggregate(id);
resume(aggregate, null);
const events = [];
const result = mutate(f => events.push(f), aggregate, command);
if (result !== symbols.ok) {
// TODO(john): It would be really nice to decouple aggregate from
// its events at some ponit. =;
resume(aggregate, events);
const commandHandler = R.curry(($type, next, command) => {
if ($type !== command.$type) {
return null;
return next($type);
const processCommand = R.curry((handlers, apply, aggregate, command) => {
let result = symbols.unapplied;
for (let i = 0; i < handlers.length; i++) {
const handler = handlers[i](command);
if (handler === null) {
result = handler(apply, aggregate, command);
if (result !== symbols.unapplied) {
return result;
return result;
const eventHandler = R.curry(($type, next, input) => {
return {
state: input.event.$type === $type ?
next($type, input.state, input.event) : input.state,
event: input.event,
const dispatchEvents = R.curry((handlers, aggregate, event) => {
let state = aggregate;
for (let i = 0; i < handlers.length; i++) {
state = handlers[i]({state, event}).state;
return state;
const resume = R.curry((handle, aggregate, newEvents) => {
// TODO(john): It would be really nice to decouple aggregate from
// its events at some point.
const events = newEvents ||;
let state = null;
// TODO(john): try replacing with reduce(handle, events, {})
for (let i = 0; i < events.length; i++) {
state = handle(state || aggregate, events[i]);
// NOTE(john): This might not work for Mongoose. Must try...
return Object.assign(aggregate, state);
module.exports = {
const Buffer = require('buffer').Buffer;
const uuid = require('uuid/v4');
const eventstore = require('eventstore-node');
const eventstoreUrl = process.env.EVENTSTORE_URL || 'tcp://localhost:1113';
const connSettings = {};
const esConnection = eventstore.createConnection(connSettings, eventstoreUrl);
esConnection.once('connected', endpoint => {
console.log(`Connected to eventstore at ${}:${endpoint.port}`);
function stream(streamName) {
return async function sendMessage(message, type) {
const id = uuid();
const event = eventstore.createJsonEventData(id, message, null, type);
// console.log('sending...');
try {
const result = await esConnection.appendToStream(streamName, eventstore.expectedVersion.any, event);
// console.log(`Successfully stored ${id}; result=${JSON.stringify(result)}`);
} catch (exception) {
console.error(`Error sending ${id}: ${exception}`);
const send = stream('hello-world');
const writing = process.env.WRITING === 'true';
if (writing) {
setInterval(() => {
message: 'hello world!',
value: uuid(),
created: (new Date()).toISOString(),
}, 'pulse');
}, 10);
} else {
setImmediate(async function resume() {
try {
const start = new Date();
const result = await esConnection
.readStreamEventsForward('hello-world', 0, 4096);
// console.log(JSON.stringify(result, null, ' '));
const events = => ({
id: e.event.eventId,
number: e.event.eventNumber,
type: e.event.eventType,
created: e.created,
event: JSON.parse(Buffer.from('utf8')),
// events.forEach(e => console.log(JSON.stringify(e)));
let c = 0;
events.forEach(() => c++);
const finish = new Date();
console.log(`done loading ${c} events in ${finish - start}ms`);
} catch (exception) {
console.error(`Error resuming: ${exception}`);
const assert = require('assert');
const R = require('ramda');
const es = require('../event-sourcing');
const noopCommandHandler = R.curry(($type, apply, aggregate, command) => {
// do nothing?
describe('event-sourcing', () => {
describe('commandHandler', () => {
it('returns null for wrong type', () => {
const handler = es.commandHandler(1, noopCommandHandler, {$type: 2});
assert.ok(handler === null, 'handler was not null');
it('returns a function for correct type', () => {
const handler = es.commandHandler(1, noopCommandHandler, {$type: 1});
assert.equal(typeof handler, 'function', 'handler was not a Function');
describe('processCommand', () => {
it('returns symbols.unapplied if no matching command handlers', () => {
const process = es.processCommand([
es.commandHandler(1, noopCommandHandler),
es.commandHandler(2, noopCommandHandler),
es.commandHandler(3, noopCommandHandler),
const events = [];
const result = process(f => events.push(f), {events:[]}, {$type:4});
assert.equal(events.length, 0, 'unexpected events');
assert.equal(result, es.symbols.unapplied);
it('returns a result if handled', () => {
const process = es.processCommand([
es.commandHandler(1, noopCommandHandler),
es.commandHandler(2, noopCommandHandler),
es.commandHandler(3, R.curry(($type, apply, aggregate, command) => {
message: command.message,
return es.symbols.ok;
const events = [];
const result = process(f => events.push(f), {events:[]}, {$type:3,message:'hi'});
assert.notEqual(result, es.symbols.unapplied, `expected unapplied but got ${result.toString()}`);
assert.equal(events.length, 1, `expected one event but found ${events.length}`);
assert.equal(events[0].message, 'hi', 'event did not have command message');
describe('eventHandler', () => {
it('returns the original state when event is wrong type', () => {
const state = {foo:5};
const input = {state, event: {$type:2, message: 'hi'}};
const result = es.eventHandler(1,
(t, s, e) => Object.assign({}, s, e),
assert.equal(result.state, state, 'state was not the same');
assert.equal(,, 'property was different');
assert.ok(!result.state.hasOwnProperty('message'), 'had message property');
assert.equal(result.event, input.event);
it('returns new state when event is correct type', () => {
const state = {foo:5};
const message = 'hi';
const input = {state, event: {$type:1, message}};
const result = es.eventHandler(1,
(t, s, e) => Object.assign({}, s, e),
assert.notEqual(result.state, state, 'state was the same');
assert.equal(,, 'property was different');
assert.equal(result.state.message, message, 'message was not the same');
assert.equal(result.event, input.event);
describe('dispatchEvents', () => {
it('handler is invoked for matching type', () => {
const aggregate = {events:[]};
const event = {$type:5, message: 'hi'};
const makeTestHandler = R.curry(($type, state, event) => {
return Object.assign({}, state, event, {$type});
const handlers = [
es.eventHandler(1, makeTestHandler),
es.eventHandler(2, makeTestHandler),
es.eventHandler(3, makeTestHandler),
es.eventHandler(4, makeTestHandler),
es.eventHandler(5, makeTestHandler),
const result = es.dispatchEvents(handlers, aggregate, event);
assert.equal(result.$type, event.$type, 'unexpected $type');
it('handler is not invoked for no matching types', () => {
const aggregate = {events:[]};
const event = {$type:0, message: 'hi'};
const makeTestHandler = R.curry(($type, state, event) => {
return Object.assign({}, state, event, {$type});
const handlers = [
es.eventHandler(1, makeTestHandler),
es.eventHandler(2, makeTestHandler),
es.eventHandler(3, makeTestHandler),
es.eventHandler(4, makeTestHandler),
es.eventHandler(5, makeTestHandler),
const result = es.dispatchEvents(handlers, aggregate, event);
assert.ok(!result.hasOwnProperty('$type'), 'had property named $type');
it('handler is invoked for all matching types', () => {
const aggregate = {events:[]};
const event = {$type:1, message: 'hi'};
let count = 0;
const makeTestHandler = R.curry(($type, state, event) => {
return Object.assign({}, state, event, {$type});
const handlers = [
es.eventHandler(1, makeTestHandler),
es.eventHandler(1, makeTestHandler),
es.eventHandler(1, makeTestHandler),
es.eventHandler(1, makeTestHandler),
es.eventHandler(1, makeTestHandler),
const result = es.dispatchEvents(handlers, aggregate, event);
assert.equal(count, handlers.length, 'was not called for all handlers');
describe('resume', () => {
it('invokes each handler for each event on aggregate', () => {
const aggregate = {
foo: 'bar',
const handler = R.curry((prop, $type, state, event) => {
const obj = {};
obj[prop] = event[prop];
return Object.assign(obj, state);
const handlers = es.dispatchEvents([
es.eventHandler(1, handler('one')),
es.eventHandler(2, handler('two')),
es.eventHandler(3, handler('three')),
es.eventHandler(4, handler('four')),
es.eventHandler(5, handler('five')),
const result = es.resume(handlers, aggregate, [
{$type: 1, one: '1'},
{$type: 2, two: '2'},
{$type: 3, three: '3'},
{$type: 4, four: '4'},
{$type: 5, five: '5'},
assert.equal(, '1');
assert.equal(result.two, '2');
assert.equal(result.three, '3');
assert.equal(result.four, '4');
assert.equal(result.five, '5');
describe('sendCommand', () => {
it('is unapplied if no command handlers are called', () => {
const handlers = es.dispatchEvents([]);
const resume = es.resume(handlers);
const process = es.processCommand([
es.commandHandler(1, noopCommandHandler),
es.commandHandler(2, noopCommandHandler),
es.commandHandler(3, R.curry(($type, apply, aggregate, command) => {
message: command.message,
return es.symbols.ok;
const aggregate = {
events: [
{$type: 8, message: 'handled'},
const loadAggregate = () => {
return aggregate;
const sendCommand = es.sendCommand(resume, process, loadAggregate);
const id = 99;
const command = {$type: 8, message: 'unused'};
const resolve = result => {, 'did not expect to get here');
const reject = result => {
assert.equal(result, es.symbols.unapplied, 'command was unexpectedly applied');
sendCommand(id, command, resolve, reject);
it('is applied if command handlers are called', () => {
const handler = ($type, state, event) => {
return Object.assign(state, event);
const handlers = es.dispatchEvents([
es.eventHandler(1, handler),
es.eventHandler(2, handler),
es.eventHandler(3, handler),
const resume = es.resume(handlers);
const process = es.processCommand([
es.commandHandler(2, R.curry(($type, apply, aggregate, command) => {
message: command.message,
return es.symbols.ok;
const aggregate = {
events: [
{$type: 1, message: 'handled', one: '1'},
const loadAggregate = () => {
return aggregate;
const sendCommand = es.sendCommand(resume, process, loadAggregate);
const id = 99;
const command = {$type: 2, message: 'command'};
const resolve = result => {
assert.equal(result.message, command.message, 'did not get message from command');
assert.equal(,[0].one, 'did not get previous state');
const reject = result => {, 'did not expect to ge there');
sendCommand(id, command, resolve, reject);
describe('getState', () => {
it('returns original object if no handlers', () => {
const aggregate = {
foo: 'bar',
events: []
const loadAggregate = () => aggregate;
const handler = ($type, state, event) => {
return Object.assign({}, state, event);
const resume = es.resume(es.dispatchEvents([
es.eventHandler(1, handler),
es.eventHandler(2, handler),
es.eventHandler(3, handler),
const id = 99;
const result = es.getState(resume, loadAggregate, id);
assert.deepEqual(result, {foo:, events: []});
it('applies events', () => {
const aggregate = {
foo: 'bar',
events: [
{$type: 1, one: '1'},
{$type: 2, two: '2'},
{$type: 3, three: '3'},
const loadAggregate = () => aggregate;
const handler = ($type, state, event) => {
let result = Object.assign({}, state, event);
delete result.$type;
return result;
const resume = es.resume(es.dispatchEvents([
es.eventHandler(1, handler),
es.eventHandler(2, handler),
es.eventHandler(3, handler),
const id = 99;
const result = es.getState(resume, loadAggregate, id);
assert.deepEqual(result, {
one: '1',
two: '2',
three: '3',
foo: 'bar',
}, 'result did not have all expected data');
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment