Skip to content

Instantly share code, notes, and snippets.

@azyobuzin
Created October 3, 2011 14:29
Show Gist options
  • Save azyobuzin/1259214 to your computer and use it in GitHub Desktop.
Save azyobuzin/1259214 to your computer and use it in GitHub Desktop.
using System;
using System.IO;
using System.Net;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
using System.Threading;
namespace Azyobuzi.TaskingTwLib
{
class RawStreamingObservable : IObservable<string>
{
public RawStreamingObservable(WebRequest request)
{
Scheduler.TaskPool.Schedule(() =>
{
while (!this.disposable.IsDisposed)
{
Thread.Sleep(100);
}
this.subject.Dispose();
request.Abort();
});
Scheduler.TaskPool.Schedule(() =>
{
try
{
using (var res = request.GetResponse())
using (var sr = new StreamReader(res.GetResponseStream()))
{
while (!sr.EndOfStream)
{
var line = sr.ReadLine();
if (!string.IsNullOrWhiteSpace(line))
this.subject.OnNext(line);
}
this.subject.OnCompleted();//一応
}
}
catch (Exception ex)
{
try
{
this.subject.OnError(ex);
}
catch (ObjectDisposedException)
{
//ごめんなさい許してください
}
}
});
}
private Subject<string> subject = new Subject<string>();
private BooleanDisposable disposable = new BooleanDisposable();
public IDisposable Subscribe(IObserver<string> observer)
{
this.subject.Subscribe(observer);
return this.disposable;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment