Created
December 1, 2014 16:09
-
-
Save rafinskipg/537a26e3d3b779c5d7dc to your computer and use it in GitHub Desktop.
Transform CSV or excels with streams for batch uploading processes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
var request = require('request'), | |
q = require('q'), | |
_s = require('underscore.string'), | |
_ = require('lodash'), | |
streamify = require('stream-array'), | |
log = require('config').apiLogger; | |
var csv = require("csv-streamify"); | |
var JSONStream = require("JSONStream"); | |
var es = require("event-stream"); | |
var excel = require('excel-stream'); | |
//Express initialization of this routers | |
var batchRouters = function(express) { | |
this.initialize(express); | |
}; | |
batchRouters.prototype = { | |
initialize: function(express) { | |
express.post('/batch/shops/:userId', this.batchProcessShops.bind(this)); | |
}, | |
batchProcessShops: function(req, res) { | |
var self = this; | |
// Direct async xhr stream data upload | |
if(req.xhr) { | |
try{ | |
var streamed; | |
if(req.headers.filetype == 'text/csv'){ | |
streamed = csv({objectMode: true, columns: true}); | |
}else{ | |
streamed = excel(); | |
} | |
//Go stream go | |
req | |
// Parse file | |
.pipe(streamed) | |
//Normalize field names | |
.pipe(self.normalizeFieldNames()) | |
//Transform to shop normalized model | |
.pipe(self.transformToShopModel()) | |
//Parse latlng | |
.pipe(self.parseLatlong()) | |
//Obtain google latlng | |
.pipe(self.validateLatLong()) | |
// Turn Into JSON Array String | |
.pipe(JSONStream.stringify()) | |
// Concat Strings | |
.pipe(es.wait()) | |
//Send to client | |
.pipe(res) | |
.on('end', function(){ | |
log.info('Batch: Succesfully parsed CSV for shops'); | |
}); | |
}catch(e){ | |
log.error('Batch: Error parsing csv : '+e); | |
res.send(500, 'Error parsing CSV'); | |
} | |
} | |
}, | |
//If it has latlng is valid, if not, get it from google | |
validateLatLong: function(){ | |
var self = this; | |
return es.map(function (line, cb) { | |
if(!line.latlng){ | |
self.getGoogleLatLng(line) | |
.then(function(latlng){ | |
line.latlng = latlng; | |
line.valid = true; | |
cb(null, line) | |
}) | |
.catch(function(err){ | |
line.valid = false; | |
cb(null, line); | |
}) | |
}else{ | |
line.valid = true; | |
cb(null, line); | |
} | |
}); | |
}, | |
//Gets latlng from google | |
getGoogleLatLng: function(parsedObj){ | |
var dfd = q.defer(); | |
var address = parsedObj.address; | |
//Add city | |
address = parsedObj.city ? address + ','+ parsedObj.city : address; | |
//Add country | |
address = parsedObj.country ? address + ','+ parsedObj.country : address; | |
var _GOOGLE_API_KEY = 'KEY_FAKE'; | |
var _GOOGLE_GEOCODER_URL = 'https://maps.googleapis.com/maps/api/geocode/json?sensor=false&'; | |
var geocoderUrl = _GOOGLE_GEOCODER_URL + 'address='+address; | |
if(parsedObj.country) { | |
geocoderUrl += '®ion=' + parsedObj.country; | |
} | |
geocoderUrl += '&key=' + _GOOGLE_API_KEY; | |
request(geocoderUrl, function(err, response, body){ | |
if(err){ | |
console.log('error calling Google', err); | |
dfd.reject(err); | |
}else{ | |
body = JSON.parse(body); | |
if(!body.results || body.results.length == 0){ | |
console.log('Google geocode: Error geocoding address: '+address); | |
dfd.reject('No results found'); | |
}else{ | |
console.log('Google geocode: Address succesfully geocoded : '+address); | |
dfd.resolve(body.results[0].geometry.location) | |
} | |
} | |
}) | |
return dfd.promise; | |
}, | |
//Normalize field names to a underscored version | |
//ej: 'Shop Name ey' -> 'shop_name_ey' | |
normalizeFieldNames: function(){ | |
var self = this; | |
return es.map(function(line, cb){ | |
var obj = {}; | |
_.keys(line).forEach(function(key){ | |
var newKey = _s.clean(key.toLowerCase()); | |
newKey = _s.escapeHTML(newKey); | |
newKey = newKey.replace('/', ''); | |
newKey = _s.underscored(newKey); | |
obj[newKey] = line[key]; | |
}); | |
cb(null, obj); | |
}) | |
}, | |
//Parses the entered lat long and if it's not a valid one, resets it to null | |
parseLatlong: function(){ | |
return es.map(function(shop, cb){ | |
var latlng = shop.latlng; | |
if(typeof(latlng) == 'string'){ | |
//Remove whitespaces | |
latlng = latlng.replace(/\s+/g, ''); | |
//get the elemenets | |
var arr = _s.words(latlng, ','); | |
if(arr.length == 2){ | |
latlng = { | |
lat: arr[0], | |
lng: arr[1] | |
} | |
shop.latlng = latlng; | |
}else{ | |
shop.latlng = null; | |
} | |
} | |
cb(null, shop); | |
}); | |
}, | |
//Return a shop-model compatible object, remember that | |
// the excel, csv, may have the column named differently, so we allow multiple options | |
transformToShopModel: function(){ | |
var self = this; | |
return es.map(function (line, cb) { | |
var shop = { | |
'name': line.shop_name || line.name || line.venue_name || line.store_name || line.store, | |
'latlng': line.latlng, | |
'address': line.address || line.street_address || line.venue_address || line.store_address, | |
'city': line.city, | |
'country': line.country, | |
'phone': line.phone || line.phone_number, | |
'state': line.state, | |
'postalCode': line.zip_code || line.zip || line.postalcode || line.postal_code, | |
'valid': line.valid | |
} | |
cb(null, shop); | |
}); | |
} | |
}; | |
module.exports = batchRouters; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment