Last active
November 7, 2017 14:38
-
-
Save allupaku/4139d4a366983568206a to your computer and use it in GitHub Desktop.
This is a modified version for createChangeStream - because the default implementation as of today is not supporting where filter. This is using the same approach from persisted model and using loopback-filters for parsing the data which is been passed in. This may not be a perfect solution but a good workaround till we get the filters in core c…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Angular Live Set should be added as a dependency, on the | |
// inject the change stream library in your main app script. | |
var module = angular.module("lbServices", ['ngResource','ls.ChangeStream']); | |
// ... | |
// To the end of auto generated code | |
// .... | |
this.$get = ['$resource','createChangeStream','LoopBackAuth', function($resource,createChangeStream,LoopBackAuth) { | |
return function(url, params, actions) { | |
var resource = $resource(url, params, actions); | |
actions.find.cache = true; | |
//work around for fixing the change stream and filters | |
// This will create a change stream and return teh change stream in the call back. | |
// there was a problem with the nodejs/express server disconnecting the streams after two minutes by default. | |
// fixed that issue by modifying and adding a middle ware as shown in the next file gist. | |
resource.buildChangeStream = function(options,cb,listener){ | |
var src = new EventSource(actions.createChangeStream.url+'?_format=event-source&access_token='+LoopBackAuth.accessTokenId+'&options='+JSON.stringify(options)); | |
if(listener && typeof listener === "function"){ | |
src.addEventListener('data',listener); | |
} | |
var changes = createChangeStream(src); | |
cb(changes); | |
} | |
// Angular always calls POST on $save() | |
// This hack is based on | |
// http://kirkbushell.me/angular-js-using-ng-resource-in-a-more-restful-manner/ | |
resource.prototype.$save = function(success, error) { | |
// Fortunately, LoopBack provides a convenient `upsert` method | |
// that exactly fits our needs. | |
var result = resource.upsert.call(this, {}, this, success, error); | |
return result.$promise || result; | |
}; | |
// console.log("Actions when getting the resource"); | |
// console.log(urlBase); | |
return resource; | |
}; | |
}]; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var applyFilter = require('loopback-filters'); | |
var loopback = require('loopback'); | |
var PassThrough = require('stream').PassThrough; | |
module.exports = function(Order) { | |
Order.createChangeStream = function(options,cb) { | |
/* Based on persisted-model#createChangeStream | |
* | |
* currentUser is being populated in server.js using tips from here: | |
* https://github.com/strongloop/loopback/issues/569 | |
* | |
* Ignoring paramter userId, and defaulting to logged in user | |
* future improvement will check if user has role 'admin', and if so | |
* allow the userId to not match the logged in user | |
* | |
*/ | |
var idName = this.getIdName(); | |
var Model = this; | |
var changes = new PassThrough({objectMode: true}); | |
var writeable = true; | |
changes.destroy = function() { | |
changes.removeAllListeners('error'); | |
changes.removeAllListeners('end'); | |
writeable = false; | |
changes = null; | |
}; | |
changes.on('error', function() { | |
writeable = false; | |
}); | |
changes.on('end', function() { | |
writeable = false; | |
}); | |
process.nextTick(function() { | |
cb(null, changes); | |
}); | |
Model.observe('after save', createChangeHandler('save',options)); | |
Model.observe('after delete', createChangeHandler('delete',options)); | |
function createChangeHandler(type,options) { | |
return function(ctx, next) { | |
// since it might have set to null via destroy | |
if (!changes) { | |
return next(); | |
} | |
var where = ctx.where; | |
var data = ctx.instance || ctx.data; | |
var filtered = applyFilter([data], options); | |
if(filtered.length<1){ | |
return next(); | |
} | |
var whereId = where && where[idName]; | |
// the data includes the id | |
// or the where includes the id | |
var target; | |
if (data && (data[idName] || data[idName] === 0)) { | |
target = data[idName]; | |
} else if (where && (where[idName] || where[idName] === 0)) { | |
target = where[idName]; | |
} | |
var hasTarget = target === 0 || !!target; | |
var change = { | |
target: target, | |
where: where, | |
data: data | |
}; | |
switch (type) { | |
case 'save': | |
if (ctx.isNewInstance === undefined) { | |
change.type = hasTarget ? 'update' : 'create'; | |
} else { | |
change.type = ctx.isNewInstance ? 'create' : 'update'; | |
} | |
break; | |
case 'delete': | |
change.type = 'remove'; | |
break; | |
} | |
// TODO(ritch) this is ugly... maybe a ReadableStream would be better | |
if (writeable) { | |
changes.write(change); | |
} | |
next(); | |
}; | |
} | |
}; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// ... | |
// ... | |
app.middleware('routes:before', function (req, res, next) { | |
if (req.path.indexOf('change-stream') !== -1) { | |
res.setTimeout(24*3600*1000); | |
res.set('X-Accel-Buffering', 'no'); | |
return next(); | |
} else { | |
return next(); | |
} | |
}); | |
// ... | |
// ... |
Have you had any troubles with delivering data to event stream on instance change?
Sometimes i takes 3-4 model changes to get 1 record in EventStream on the client...
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
how are you using this from the Angular controller? I have tried and it's not filtering per the options I pass it.