I'm still very new to Kafka, eventsourcing, stream processing, etc. I'm in the middle of building my first production system with this stuff and am writing this at the request of a few folks on Twitter. So if you do have experience, please do me and anyone else reading this a favor by pointing out things I get wrong :)
- The Log — http://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying
- Turning the database inside out — http://www.confluent.io/blog/2015/03/04/turning-the-database-inside-out-with-apache-samza/
- Why local state is a fundamental primitive in stream processing — http://radar.oreilly.com/2014/07/why-local-state-is-a-fundamental-primitive-in-stream-processing.html
- Samza
- Various functional systems been exposed to over past year or so, React, Flux, RX, etc.
I'm the technical co-founder at RelateRocket. We're a fairly early stage startup based in SF building an algorithmic social proof product. We hook into marketing automation and CRM tools to auto-produce custom personalized pages. And we're planning on doing this on a massive scale, e.g. a company would create and email out 100,000 pages with custom relatable content.
So the systems I'm building need to a) handle this sort of scale and b) slice and dice the data dozens of ways for internal anaytics, customer-facing analytics, alerting, and for pushing data back to various other tools that our customers are using.
So while studying this problem, I discovered event sourcing. I'd read The Log blog post a few years ago when it came out and was very attracted to the idea but didn't really connect the dots about how to turn that into a working system until discovering event sourcing.
Event sourcing has two very attractive properties. A) You don't throw data away. You just record everything you can think of and then figure out how to make use of it. This is very attractive to a analytics-heavy product + in a startup when you're really not sure upfront what data is useful and what's not. B) Seperating reads and writes. In school and in the few backend systems I've written (I've mostly done frontend work), I've never enjoyed designing database schemas. They always felt hacky and ungainly. I can now see that most of the ugliness came from the unnatural coupling of the read and write schemas. Writing events feels very natural. You just declare what happened. A user was updated. By this person. And this is what was changed. And that's the end of it. You don't have to awkwardly mutate a user object and perhaps if you're feeling ambitious, write to an audit log table who made the change. And for reads, as I'll get to in a bit, you have unlimited freedom to mold the raw event data into whatever form makes sense for your application. Which is very easy and rather fun actually.
Somewhere around when I discovered event sourcing, I also discovered Kafka. Which I won't write much about as there's tons of info on the internet but Kafka is a beautiful piece of software. Highly performant, durable, replayable pub-sub. The perfect tool for so many data tasks.
So event sourcing is super cool but how to do you turn your low-level raw events into usable, queryable objects? Stream processing is the normal answer (there's also batch processing with say Hadoop but that's so 3 years ago).
Basically as new events flow through your system, you "process" them into some sort of higher-level form. E.g. a userCreate
event is the start of a new user. A userUpdate
event flows by and that's grabbed to update an existing user. A userLoggedIn
event happens and we increment the times_logged_in
field on the user.
For React.js peeps reading out there, this should sound exactly like the Flux architecture.
There's a variety of stream processing tools out there e.g. Spark and Samza. I've choosen (for now anyways) to forgo using those and instead, do stream processing with node.js. Those tools both sound great and we'll probably use them someday but they don't seem necessary given that we're still small-data not big-data and as the sole developer, I really need to limit the number of tools I'm using to keep the complexity of the product within bounds. So as I'm already using Node.js extensively, it seems appropriate to keep on using it.
Samza, at its heart, is actually very simple. You expose a function (ignoring that Java makes you wrap functions in ugly classes) that's subscribed to a Kafka topic that Samza calls whenever there's a new message. You do something to the message and then generally re-emit the processed message onto a new topic.
So to my earlier example, a userCreated
event comes in and you process that into the user schema and then publish that new object to the user
topic. Another system that's responsible for responding to user information queries would then listen to that topic and use changes there to update its store.
This is how it'd look in node.js.
var HighLevelProducer, KeyedMessage, Immutable, List, Map, fromJS, client, emit, fromJS, kafka, producer, users;
{fromJS, List, Map} = require('immutable');
_ = require('underscore');
// Setup our Kafka consumer.
{HighLevelConsumer, KeyedMessage} = kafka = require('kafka-node');
client = new kafka.Client();
consumer = new HighLevelConsumer(client);
// Setup our Kafka producer.
{HighLevelProducer, KeyedMessage} = kafka = require('kafka-node');
client = new kafka.Client();
producer = new HighLevelProducer(client);
// Create user topic.
producer.on('ready', function() {
return producer.createTopics(['user'], false, function(err, data) {
if (err) {
return console.log(err);
users = Map();
emit = function(user) {
var message;
if (user) {
message = new KeyedMessage(user.get('id'), JSON.stringify(user));
return producer.send([
topic: 'user',
messages: message
], function(err, data) {
if (err) {
return console.log(err);
// Listen to new events.
consumer = new Consumer(client, [
topic: 'events'
], {
groupId: 'user-aggregator'
consumer.on('message', function(event) {
var e, failedLogins, logins;
switch (event.event_type) {
case "userCreated":
users = users.set(event.entity_id, fromJS({
id: event.entity_id,
name: event.event.name,
email: event.event.email,
roles: event.event.roles,
organization_id: event.event.organization_id,
created_at: event.timestamp,
updated_at: event.timestamp,
logins: [],
logins_failed: []
return emit(users.get(event.entity_id));
case "userUpdated":
users = users.mergeDeepIn([event.entity_id], fromJS({
name: event.event.name,
email: event.event.email,
roles: event.event.roles,
updated_at: event.timestamp
return emit(users.get(event.entity_id));
case "userLoggedIn":
e = _.extend(event.event, {
timestamp: event.timestamp
logins = users.getIn([event.actor_id, 'logins']).push(e);
users = users.setIn([event.actor_id, 'logins'], logins);
return emit(users.get(event.entity_id));
case "userFailedLogin":
e = _.extend(event.event, {
timestamp: event.timestamp
failedLogins = users.getIn([event.entity_id, 'logins_failed']).push(e);
users = users.setIn([event.entity_id, 'logins_failed'], failedLogins);
return emit(users.get(event.entity_id));
- aggregate objects from raw events
- real-time updates — it's easy to connect socket.io or some other websocket/push tech to the stream of new objects and push them to the dashboard. We have a acivity stream page working this way and will be adding more soon.
- campaigns — We let you create campaigns, arbitary groupings of pages you're sending out. I'll be writing soon a campaign aggregator that watches for new page analytics related to a campaign and group that together to drive campaign-specific analytic dashboards.
- integration with other tools. Each marketing/sales tool has there own way of viewing the world. To drive integration, I'll write a stream processor for each one that translates our events into updates understandable by that tool.
- Enrichments — we record the IP address for everyone that visits one of our custom pages. A natural thing to do is lookup geo information on that IP to "enrich" the event.