Created
June 17, 2012 07:12
-
-
Save safwank/2943770 to your computer and use it in GitHub Desktop.
Streaming tweets in 'real-time' using Rx
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public partial class TweetStreamer : Form | |
{ | |
private List<Tweet> tweets; | |
private readonly IObservable<EventPattern<EventArgs>> stopClicked; | |
private readonly DataContractJsonSerializer jsonSerializer; | |
public TweetStreamer() | |
{ | |
InitializeComponent(); | |
jsonSerializer = new DataContractJsonSerializer(typeof (TweetSearchResult)); | |
stopClicked = Observable.FromEventPattern<EventArgs>(stopButton, "Click"); | |
} | |
private void startButton_Click(object sender, EventArgs e) | |
{ | |
tweets = new List<Tweet>(); | |
tweetSource.DataSource = tweets; | |
DisplayProgress("Loading tweets..."); | |
tweetSource.ResetBindings(false); | |
GetTweetsOn(keyword.Text, 0).ToObservable(Scheduler.ThreadPool) | |
.Buffer(TimeSpan.FromSeconds(5)) | |
.TakeUntil(stopClicked) | |
.ObserveOn(SynchronizationContext.Current) | |
.Subscribe(LoadNewTweets(), DisplayError()); | |
} | |
private IEnumerable<Tweet> GetTweetsOn(string searchKeyword, long sinceId) | |
{ | |
var searchUrl = string.Format("http://search.twitter.com/search.json?q={0}&rpp=100&include_entities=true&result_type=mixed&since_id={1}", | |
searchKeyword, | |
sinceId); | |
var request = (HttpWebRequest) WebRequest.Create(searchUrl); | |
TweetSearchResult tweetSearchResult; | |
using (var response = (HttpWebResponse) request.GetResponse()) | |
{ | |
if (response.StatusCode != HttpStatusCode.OK) | |
{ | |
var status = string.Format("Server error (HTTP {0}: {1}).", response.StatusCode, response.StatusDescription); | |
DisplayStatus(status); | |
} | |
using (var responseStream = response.GetResponseStream()) | |
{ | |
tweetSearchResult = (TweetSearchResult) jsonSerializer.ReadObject(responseStream); | |
foreach (var tweet in tweetSearchResult.Tweets) | |
{ | |
yield return tweet; | |
} | |
} | |
} | |
foreach (var tweet in GetTweetsOn(searchKeyword, tweetSearchResult.MaxId)) | |
{ | |
yield return tweet; | |
} | |
} | |
private Action<IList<Tweet>> LoadNewTweets() | |
{ | |
return newTweets => | |
{ | |
var firstDisplayedRowIndex = Math.Max(tweetStreamGrid.FirstDisplayedScrollingRowIndex, 0); | |
tweets.AddRange(newTweets); | |
tweetSource.ResetBindings(false); | |
tweetStreamGrid.FirstDisplayedScrollingRowIndex = firstDisplayedRowIndex; | |
var progressMessage = string.Format("Loaded {0} tweets", tweets.Count.ToString("#,#")); | |
DisplayProgress(progressMessage); | |
}; | |
} | |
private Action<Exception> DisplayError() | |
{ | |
return exception => | |
{ | |
var message = exception.Message; | |
DisplayStatus(message.Contains("503") ? "Throttled by Twitter, retrying in 2 minutes..." : message); | |
}; | |
} | |
private void DisplayProgress(string progressMessage) | |
{ | |
progressLabel.Text = progressMessage; | |
} | |
private void DisplayStatus(string status) | |
{ | |
statusLabel.Text = status; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment