Skip to content

Instantly share code, notes, and snippets.

@neilk
Created January 14, 2014 17:43
Show Gist options
  • Save neilk/8422454 to your computer and use it in GitHub Desktop.
Save neilk/8422454 to your computer and use it in GitHub Desktop.
python versus node for stateful upserting
/**
* postgres does not have replace into, so...
*
* @param {Object} tile specification {tx: ty: zoom:}
* @param {Object} quad properties of that tile
* @param {Knex.transaction} t database transaction
* @return {Promise}
*/
function insertOrUpdateTile(tile, quad, t) {
return db('tile').transacting(t).where(tile).count('*').then(
function(result) {
var count = parseInt(result[0].aggregate, 10);
if (count === 0) {
var tileRecord = merge(true, tile, quad);
return db('tile').transacting(t).insert(tileRecord);
} else if (count === 1) {
return db('tile').transacting(t).where(tile).update(quad);
} else {
throw 'unexpected database count = ' + count + ' result for tile ' + JSON.stringify(tile) + '; should be 0 or 1';
}
}
);
}
function quadLoop(pairs) {
console.log(" quadLoop", pairs.length);
return db.transaction(function(t) {
return promiseForEach(pairs, function(pair) {
var quadId = pair[0];
var quad = pair[1];
var tile = mt.getTileFromQuadTreePath(quadId);
return insertOrUpdateTile(tile, quad, t);
}).then(t.commit, t.rollback);
});
}
/**
* @param {Object} results array of zoomlevels, each zoomlevel is a dictionary of quadTreePaths (ids) to properties
* @return {Promise}
*/
function writeResultsToDb(results) {
// results is an array of zoom levels, in each zoom level is a hash of quadtreeid -> { various props, with same names as db columns }
// we need to turn each of those into a record, then insert or update them
console.log("---> write results to db");
var doZoomDict = function (zoomDict) {
var pairs = getKeyValPairs(zoomDict);
console.log("pairs", pairs.length);
var pairsGrouped = groupArray(pairs, UPSERTS_PER_TRANSACTION);
return promiseForEach(pairsGrouped, quadLoop);
};
// filtering out false values (empty zoomlevels) because promiseWhile terminates on false
results = results.filter(function(el) { return el; });
return promiseForEach(results, doZoomDict, "doZoomDict");
}
def upsertZooms(Session, zooms):
for zoom in zooms:
if zoom == None:
continue
session = Session()
db = Db(session)
count = 0
print len(zoom.items())
for quadId, quad in zoom.items():
upsertQuad(db, quadId, quad)
if count > 0 and count % MAX_UPSERTS_PER_COMMIT == 0:
session.commit()
count += 1
session.commit()
def upsertQuad(db, quadId, newQuadProps):
(tx, ty, zoom) = getTileFromQuadTreePath(quadId)
quad = db.getQuad(tx, ty, zoom)
if quad is None:
db.insertQuad(tx, ty, zoom, newQuadProps)
else:
db.updateQuad(tx, ty, zoom, newQuadProps)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment