Skip to content

Instantly share code, notes, and snippets.

@LeeCampbell
Created February 13, 2012 16:33
Show Gist options
  • Save LeeCampbell/1818029 to your computer and use it in GitHub Desktop.
Save LeeCampbell/1818029 to your computer and use it in GitHub Desktop.
Extension methods to help with converting APM pattern to Observable.
public static class ObservableAPMExtensions
{
public static IObservable<byte> ToObservable(this FileStream source)
{
return source.ToObservable(4096, Scheduler.CurrentThread);
}
public static IObservable<byte> ToObservable(this FileStream source, int buffersize, IScheduler scheduler)
{
var bytes = Observable.Create<byte>(o=>
{
var initialState = new StreamReaderState(source, buffersize);
Action<StreamReaderState, Action<StreamReaderState>> action =
(state, self)=>
{
//Todo dispose of subscription.
state.ReadNext().Subscribe(
bytesRead=>
{
for (int i = 0; i < bytesRead; i++)
{
o.OnNext(state.Buffer[i]);
}
if(bytesRead>0)
self(state);
else
o.OnCompleted();
},
ex=>{
o.OnError(ex);
});
};
var disposal = scheduler.Schedule(initialState, action);
return disposal;
});
return Observable.Using(()=>source, _=>bytes);
}
private sealed class StreamReaderState
{
private readonly int _bufferSize;
private readonly Func<byte[], int, int, IObservable<int>> _factory;
public StreamReaderState(FileStream source, int bufferSize)
{
_bufferSize = bufferSize;
_factory = Observable.FromAsyncPattern<byte[], int, int, int>(source.BeginRead, source.EndRead);
Buffer = new byte[bufferSize];
}
public IObservable<int> ReadNext()
{
return _factory(Buffer, 0, _bufferSize);
}
public byte[] Buffer {get;set;}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment