Created
April 8, 2012 23:38
-
-
Save bobbychopra/2340392 to your computer and use it in GitHub Desktop.
Take Initial Messages until Timeout Received (rx framework)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Reactive.Linq; | |
namespace ScratchApp | |
{ | |
enum MessageType { Initial, Update } | |
class Message | |
{ | |
public MessageType MessageType { get; set; } | |
public int SequenceNo { get; set; } | |
public object Data { get; set; } | |
public Message(MessageType messageType, int seqNo, object data) | |
{ | |
MessageType = messageType; | |
SequenceNo = seqNo; | |
Data = data; | |
} | |
} | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
var messages = Observable.Interval(TimeSpan.FromSeconds(1)) | |
.Select(i => new Message(i < 5 ? MessageType.Initial : MessageType.Update, (int)i, null)); | |
//all initial messages | |
messages.TakeWhile(m => m.MessageType == MessageType.Initial) | |
.Subscribe(m => Console.WriteLine(" InitialData: " + m.SequenceNo)); | |
//all update messages | |
messages.SkipWhile(m => m.MessageType == MessageType.Initial) | |
.Subscribe(m => Console.WriteLine(" update: " + m.SequenceNo)); | |
//create a timeout after 3 seconds | |
var timeout = Observable.Timer(TimeSpan.FromSeconds(3)); | |
timeout.Subscribe(t => Console.WriteLine(" >>> TIMEOUT REACHED!")); | |
//all initial messages until timeout received | |
messages.TakeWhile(m => m.MessageType == MessageType.Initial) | |
.TakeUntil(timeout) | |
.Subscribe(m => Console.WriteLine(" initial data (no timeout): " + m.SequenceNo)); | |
Console.ReadLine(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment