Last active
August 29, 2015 14:12
-
-
Save nickebbutt/62942e2ac5f124fdeb26 to your computer and use it in GitHub Desktop.
Spread Calculation from independent Bid and Offer streams
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.od.maprecord; | |
import org.junit.Before; | |
import org.junit.Test; | |
import rx.Observable; | |
import rx.subjects.BehaviorSubject; | |
import rx.subjects.Subject; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.List; | |
import static org.junit.Assert.assertEquals; | |
/** | |
* Show how a spread can be calculated from an observable stream of bid prices and an observable stream of offer prices | |
*/ | |
public class TestSpreadCalc { | |
private Subject<Double, Double> bidStream; | |
private Subject<Double, Double> offerStream; | |
private List<Double> resultantSpreads = new ArrayList<>(); | |
@Before | |
public void doSetUp() { | |
bidStream = BehaviorSubject.create(); | |
offerStream = BehaviorSubject.create(); | |
resultantSpreads.clear(); | |
} | |
@Test | |
public void testCombineBidAndOfferStreamsToSpreadStream() { | |
Observable<Double> spreadStream = Observable.combineLatest(bidStream, offerStream, (bid, offer) -> { | |
return offer - bid; | |
}); | |
//capture the output spreads | |
spreadStream.subscribe(resultantSpreads::add); | |
newBid(99); | |
//this will be ignored since there is not yet an offer | |
newBidAndOffer(99, 100); | |
//spread is 1 | |
newOffer(101); | |
//spread is 2 | |
newBid(98d); | |
//spread is 3 | |
//here we get 3 spreads in the output, which is what we want because we receive the second bid and second offer independently | |
assertEquals(Arrays.asList(1d, 2d, 3d), resultantSpreads); | |
} | |
@Test | |
public void testCombineDoesNotPreserveAtomicityWhenFieldDeltasReceivedTogether() { | |
Observable<Double> spreadStream = Observable.combineLatest(bidStream, offerStream, (bid, offer) -> { | |
return offer - bid; | |
}); | |
//capture the output spreads | |
spreadStream.subscribe(resultantSpreads::add); | |
newBidAndOffer(99, 100); | |
//spread is 1 | |
newBidAndOffer(98, 101); | |
//spread is 3 | |
//here we still get 1,2,3 in the output, which is not really what we want - we have lost the atomicity of the newBidAndOffer | |
//we really want 1, 3 without the transitional 2 | |
//this example serves to demonstrate the problem of preserving atomicity when we splitting an update into separate streams for bid and offer | |
assertEquals(Arrays.asList(1d, 2d, 3d), resultantSpreads); | |
} | |
//simluate receiving a new bid and offer atomically | |
private void newBidAndOffer(double newBid, double newOffer) { | |
bidStream.onNext(newBid); | |
offerStream.onNext(newOffer); | |
} | |
private void newBid(double newBid) { | |
bidStream.onNext(newBid); | |
} | |
private void newOffer(double newOffer) { | |
offerStream.onNext(newOffer); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.od.maprecord; | |
import org.junit.Before; | |
import org.junit.Test; | |
import rx.Observable; | |
import rx.subjects.BehaviorSubject; | |
import rx.subjects.Subject; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.List; | |
import static org.junit.Assert.assertEquals; | |
/** | |
* Fix the problem of atomicity where we split an atomic update into separate streams | |
*/ | |
public class TestSpreadCalcPreservingAtomicity { | |
private Subject<Double, Double> bidStream; | |
private Subject<Double, Double> offerStream; | |
private Subject<Long, Long> completionStream; | |
private long updateCount = 0; | |
private List<Double> resultantSpreads = new ArrayList<Double>(); | |
@Before | |
public void doSetUp() { | |
bidStream = BehaviorSubject.create(); | |
offerStream = BehaviorSubject.create(); | |
completionStream = BehaviorSubject.create(); | |
updateCount = 0; | |
resultantSpreads.clear(); | |
} | |
@Test | |
public void testCombineLatestWithSamplingOnCompletionOfUpdateToPreserveAtomicity() { | |
Observable<Double> spreadStream = Observable.combineLatest(bidStream, offerStream, (bid, offer) -> { | |
return offer - bid; | |
}); | |
//sample the spread stream to take the spread value only when each atomic set of deltas has been completely processed | |
spreadStream.sample(completionStream).subscribe(resultantSpreads::add); | |
newBidAndOffer(99, 100); | |
//spread is 1 | |
newBidAndOffer(98, 101); | |
//spread is 3 | |
//here we get 2 spreads in the output - we have solved the atomicity problem by sampling the latest | |
//value from the spread stream only when the neBidAndOffer message is fully complete | |
assertEquals(Arrays.asList(1d, 3d), resultantSpreads); | |
} | |
//simluate receiving a new bid and offer atomically | |
private void newBidAndOffer(double newBid, double newOffer) { | |
bidStream.onNext(newBid); | |
offerStream.onNext(newOffer); | |
notifyUpdateCompleted(); | |
} | |
private void newBid(double newBid) { | |
bidStream.onNext(newBid); | |
notifyUpdateCompleted(); | |
} | |
private void newOffer(double newOffer) { | |
offerStream.onNext(newOffer); | |
notifyUpdateCompleted(); | |
} | |
private void notifyUpdateCompleted() { | |
//now notify update complete, using a system timestamp | |
completionStream.onNext(updateCount++); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment