Last active
August 29, 2015 14:09
-
-
Save rbirkby/78d6c952aec44ff6e43e to your computer and use it in GitHub Desktop.
C# port of Netflix Rx video example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
internal class VideoExample { | |
/** | |
* Demonstrate how Rx is used to compose Observables together such as | |
* how a web service would to generate a JSON response. | |
* | |
* The simulated methods for the metadata represent different services | |
* that are often backed by network calls. | |
* | |
* This will return a sequence of dictionaries such as this: | |
* | |
* [id:1000, title:video-1000-title, length:5428, bookmark:0, | |
* rating:[actual:4, average:3, predicted:0]] | |
*/ | |
public IObservable<dynamic> GetVideoGridForDisplay(int userId) | |
{ | |
return GetListOfLists(userId).SelectMany(list => | |
{ | |
return list.Videos() | |
.Take(10) | |
.SelectMany(video => | |
{ | |
var m = video.Metadata().Select(md => new { title = md["title"], length = md["duration"] }); | |
var b = video.Bookmark(userId).Select(bookmark => bookmark); | |
var r = video.Rating(userId).Select(rating => new | |
{ | |
actual = rating.ActualStarRating(), | |
average = rating.AverageStarRating(), | |
predicted = rating.PredictedStarRating() | |
}); | |
return Observable.Zip(m, b, r, (metadata, bookmark, rating) => | |
{ | |
return new | |
{ | |
id = video.VideoId, | |
metadata.title, | |
metadata.length, | |
bookmark, | |
rating | |
}; | |
}); | |
}); | |
}); | |
} | |
public IObservable<VideoList> GetListOfLists(int userId) | |
{ | |
return Observable.Create<VideoList>(observer => | |
{ | |
// this will happen on a separate thread as it requires a network call | |
TaskPoolScheduler.Default.Schedule(() => { | |
// simulate network latency | |
Thread.Sleep(180); | |
foreach (var i in Enumerable.Range(0, 15)) { | |
observer.OnNext(new VideoList(i)); | |
} | |
observer.OnCompleted(); | |
}); | |
return () => { }; | |
}); | |
} | |
} | |
class VideoList { | |
private int _listPosition; | |
public VideoList(int position) { | |
_listPosition = position; | |
} | |
public string ListName() { | |
return "ListName-" + _listPosition; | |
} | |
public int ListPosition() { | |
return _listPosition; | |
} | |
public IObservable<Video> Videos() { | |
return Observable.Create<Video>((Func<IObserver<Video>, Action>)(observer => | |
{ | |
// we already have the videos once a list is loaded | |
// so we won't launch another thread but return | |
// the sequence of videos via push | |
foreach (var i in Enumerable.Range(0, 50)) { | |
observer.OnNext(new Video((_listPosition * 1000) + i)); | |
} | |
observer.OnCompleted(); | |
return () => { }; | |
})); | |
} | |
} | |
class Video { | |
private int _videoId; | |
public Video(int videoId) { | |
_videoId = videoId; | |
} | |
public int VideoId | |
{ | |
get { return _videoId; } | |
} | |
// synchronous | |
public IObservable<Dictionary<string, string>> Metadata() { | |
// simulate fetching metadata from an in-memory cache | |
// so it will not asynchronously execute on a thread but | |
// immediately return an Observable with the data | |
return Observable.Create<Dictionary<string, string>>(observer => | |
{ | |
observer.OnNext(new Dictionary<string, string> { | |
{"title", "video-" + _videoId + "-title"}, | |
{"duration", "5428"} | |
}); | |
observer.OnCompleted(); | |
return () => { }; | |
}); | |
} | |
// asynchronous | |
public IObservable<int> Bookmark(int userId) { | |
// simulate fetching the bookmark for this user | |
// that specifies the last played position if | |
// this video has been played before | |
return Observable.Create<int>(observer => | |
{ | |
// this will happen on a separate thread as it requires a network call | |
TaskPoolScheduler.Default.Schedule(() => { | |
// simulate network latency | |
Thread.Sleep(4); | |
if (new Random().Next(6) > 1) { | |
// most of the time they haven't watched a movie | |
// so the position is 0 | |
observer.OnNext(new Random().Next(0)); | |
} else { | |
observer.OnNext(new Random().Next(4000)); | |
} | |
observer.OnCompleted(); | |
}); | |
return () => { }; | |
}); | |
} | |
// asynchronous | |
public IObservable<VideoRating> Rating(int userId) | |
{ | |
// simulate fetching the VideoRating for this user | |
return Observable.Create<VideoRating>(observer => | |
{ | |
// this will happen on a separate thread as it requires a network call | |
TaskPoolScheduler.Default.Schedule(() => | |
{ | |
// simulate network latency | |
Thread.Sleep(10); | |
observer.OnNext(new VideoRating(_videoId, userId)); | |
observer.OnCompleted(); | |
}); | |
return () => { }; | |
}); | |
} | |
} | |
class VideoRating { | |
private int _videoId; | |
private int _userId; | |
public VideoRating(int videoId, int userId) { | |
_videoId = videoId; | |
_userId = userId; | |
} | |
public int PredictedStarRating() { | |
return new Random().Next(5); | |
} | |
public int AverageStarRating() { | |
return new Random().Next(4); | |
} | |
public int ActualStarRating() { | |
return new Random().Next(5); | |
} | |
} | |
static void Main() | |
{ | |
var v = new VideoExample(); | |
Console.WriteLine("---- sequence of video dictionaries ----"); | |
v.GetVideoGridForDisplay(1).Subscribe( | |
videoDictionary => {// onNext | |
// this will print the dictionary for each video | |
// and is a good representation of how progressive rendering could work | |
Console.WriteLine(videoDictionary); | |
}, | |
exception => Console.WriteLine("Error: " + exception), | |
(Action)(() => { }) | |
); | |
v = new VideoExample(); | |
v.GetVideoGridForDisplay(1).ToList().Subscribe( | |
videoDictionaryList => {// onNext | |
// this will be called once with a list | |
// and demonstrates how a sequence can be combined | |
// for document style responses (most webservices) | |
Console.WriteLine("\n ---- single list of video dictionaries ----\n" + videoDictionaryList); | |
}, | |
exception => {// onError | |
Console.WriteLine("Error: " + exception); | |
}, | |
(Action)(() => { })); | |
Console.ReadKey(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment