Node.js offers a great environment for building ETL scripts. This is because Node is very easy to program and work with, AND has interface libraries for almost everything under the sun.
We need a framework that makes writing ETL scripts easy:
- Supports creation and re-use of components
- Cleanly divides input/output concerns from transformation concerns
- Works identically in both event-driven and batch scenarios
- Supports an easy development/testing mode
- Provides good visibility when running in production
- Has a simple DSL for expressing transformation logic
- Supports SQL easily as a transformation/aggregation tool
- Provides a strong prescriptive framework to make future maintenance easier
The framework is built around the simple concept of a pipeline. Components are assembled into nodes in DAG to construct the pipeline, and data passes through the pipeline from inputs to outputs. This is similar to the concept of UNIX pipes and the pipeline Node Red.
Data is contained in messages and messages are passed down the pipeline. By default, messages passed as input to a component will be generated unchanged as output from the component. However, a component may choose to edit or remove messages from the pipeline, or a component may generate new messages to be added to the pipeline.
Messages have a fixed "meta" schema, but may contain loosely-typed data.
In addition to messages, the pipeline contains a single global context. This dictionary contains value which may be setup by initialization of the pipeline or set by components during processing. The context lives for the duration of the pipeline processing (until the program exits).
The pipeline is
// Read table "roles":
// firstname, lastname, email_address, salary, bonus
// Write to table "users":
// name, email, compensation
var dbquery = require('dbquery');
var dbwriter = require('dbwriter');
var Pipeline = require('pipeline');
var Scheduler = require('scheduler');
pipeline = Pipeline();
pipeline.use(dbquery('select * from roles'));
pipeline.use(function(msgs, context) {
return msgs.map(function(role) {
return {
meta: role.meta,
payload: {
name: role.payload.firstname + ' ' + role.payload.lastname,
email: role.payload.email_address,
compensation: role.payload.salary + role.bonus
}
}
});
});
pipeline.use(dbwriter('users'));
Scheduler.run(pipeline, {interval: 60*60});
Components can be written as either synchronous or async functions, depending on the signature of the function:
f(msg_array, context)
- Indicates a synchronous function. The return value will be added to the pipeline. An error should be reported by throwing an exception.
f(msg_array. context, callback)
- Indicates an async function. Invoke the callback as callback(err, msgs)
.
Example
def dbquery(msg_array, context, callback) {
db.select("select * from users", function(err, rows) {
if (err) {
callback(err);
} else {
callback(null, rows.map(function(row) {return {payload:row}}));
}
});
}