Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package org.od.maprecord;
- import org.junit.Before;
- import org.junit.Test;
- import rx.Observable;
- import rx.subjects.BehaviorSubject;
- import rx.subjects.Subject;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.List;
- import static org.junit.Assert.assertEquals;
- /**
- * Show how a spread can be calculated from an observable stream of bid prices and an observable stream of offer prices
- */
- public class TestSpreadCalc {
- private Subject<Double, Double> bidStream;
- private Subject<Double, Double> offerStream;
- private List<Double> resultantSpreads = new ArrayList<>();
- @Before
- public void doSetUp() {
- bidStream = BehaviorSubject.create();
- offerStream = BehaviorSubject.create();
- resultantSpreads.clear();
- }
- @Test
- public void testCombineBidAndOfferStreamsToSpreadStream() {
- Observable<Double> spreadStream = Observable.combineLatest(bidStream, offerStream, (bid, offer) -> {
- return offer - bid;
- });
- //capture the output spreads
- spreadStream.subscribe(resultantSpreads::add);
- newBid(99);
- //this will be ignored since there is not yet an offer
- newBidAndOffer(99, 100);
- //spread is 1
- newOffer(101);
- //spread is 2
- newBid(98d);
- //spread is 3
- //here we get 3 spreads in the output, which is what we want because we receive the second bid and second offer independently
- assertEquals(Arrays.asList(1d, 2d, 3d), resultantSpreads);
- }
- @Test
- public void testCombineDoesNotPreserveAtomicityWhenFieldDeltasReceivedTogether() {
- Observable<Double> spreadStream = Observable.combineLatest(bidStream, offerStream, (bid, offer) -> {
- return offer - bid;
- });
- //capture the output spreads
- spreadStream.subscribe(resultantSpreads::add);
- newBidAndOffer(99, 100);
- //spread is 1
- newBidAndOffer(98, 101);
- //spread is 3
- //here we still get 1,2,3 in the output, which is not really what we want - we have lost the atomicity of the newBidAndOffer
- //we really want 1, 3 without the transitional 2
- //this example serves to demonstrate the problem of preserving atomicity when we splitting an update into separate streams for bid and offer
- assertEquals(Arrays.asList(1d, 2d, 3d), resultantSpreads);
- }
- private void newBidAndOffer(double newBid, double newOffer) {
- bidStream.onNext(newBid);
- offerStream.onNext(newOffer);
- }
- private void newBid(double newBid) {
- bidStream.onNext(newBid);
- }
- private void newOffer(double newOffer) {
- offerStream.onNext(newOffer);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement