Skip to content

Instantly share code, notes, and snippets.

@cefn
Last active August 29, 2015 14:23
Show Gist options
  • Select an option

  • Save cefn/e6595f7abf20a58d1d51 to your computer and use it in GitHub Desktop.

Select an option

Save cefn/e6595f7abf20a58d1d51 to your computer and use it in GitHub Desktop.
RxJS zip to test networked event sequences
/** Verifies that the over-the-wire MQTT messages arising from IdeaTree operations are as expected. */
describe("Tree Protocol", function(){
it("Sends branch keys then leaf values", function(){
var sniffer = mqtt.connect(conf.mqttWsAddr);
var subscribeAndSendPromise = util.createSubscriptionPromise(sniffer, '#')
.then(function(){
writer.setItem('oldmacdonald/farm', {
sheep:"Baa",
pig:"Oink",
cow:"Moo",
duck:"Quack",
});
});
var packetExtractor = function(topic, payload, packet){
packet.message = payload.toString();
return packet;
};
var actualStream = Rx.Observable.fromEvent(sniffer, "message", function(args){
return packetExtractor.apply(null, args);
});
var targetArray = [
["the_workshop/oldmacdonald", "[]" ] , //originally empty
["the_workshop/oldmacdonald/farm", JSON.stringify(['sheep','pig','cow','duck']) ] , //n.b. JSON array
["the_workshop/oldmacdonald/farm" + "/sheep", JSON.stringify("Baa")] , //n.b. quoted JSON strings
["the_workshop/oldmacdonald/farm" + "/pig", JSON.stringify("Oink")] ,
["the_workshop/oldmacdonald/farm" + "/cow", JSON.stringify("Moo")] ,
["the_workshop/oldmacdonald/farm" + "/duck", JSON.stringify("Quack")] ,
];
var targetStream = Rx.Observable.fromArray(targetArray);
var deepEqual = function(targetValues, actualPacket){
var actualTopic = actualPacket.topic;
var actualMessage = actualPacket.message;
var targetTopic = targetValues[0];
var targetMessage = targetValues[1];
assert(actualTopic == targetTopic);
assert(actualMessage == targetMessage);
return true;
};
var zippedStream = Rx.Observable.zip(targetStream, actualStream, deepEqual);
var receivePromise = zippedStream.toPromise();
receivePromise.finally(function(){
sniffer.end();
});
return Q.all([subscribeAndSendPromise, receivePromise]);
})
@cefn
Copy link
Copy Markdown
Author

cefn commented Jun 23, 2015

Struggling to understand why the receivePromise never seems to complete. Looks like zippedStream doesn't terminate when the targetStream terminates, and I don't know how to force this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment