Created
February 13, 2012 16:33
-
-
Save LeeCampbell/1818029 to your computer and use it in GitHub Desktop.
Extension methods to help with converting APM pattern to Observable.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static class ObservableAPMExtensions | |
{ | |
public static IObservable<byte> ToObservable(this FileStream source) | |
{ | |
return source.ToObservable(4096, Scheduler.CurrentThread); | |
} | |
public static IObservable<byte> ToObservable(this FileStream source, int buffersize, IScheduler scheduler) | |
{ | |
var bytes = Observable.Create<byte>(o=> | |
{ | |
var initialState = new StreamReaderState(source, buffersize); | |
Action<StreamReaderState, Action<StreamReaderState>> action = | |
(state, self)=> | |
{ | |
//Todo dispose of subscription. | |
state.ReadNext().Subscribe( | |
bytesRead=> | |
{ | |
for (int i = 0; i < bytesRead; i++) | |
{ | |
o.OnNext(state.Buffer[i]); | |
} | |
if(bytesRead>0) | |
self(state); | |
else | |
o.OnCompleted(); | |
}, | |
ex=>{ | |
o.OnError(ex); | |
}); | |
}; | |
var disposal = scheduler.Schedule(initialState, action); | |
return disposal; | |
}); | |
return Observable.Using(()=>source, _=>bytes); | |
} | |
private sealed class StreamReaderState | |
{ | |
private readonly int _bufferSize; | |
private readonly Func<byte[], int, int, IObservable<int>> _factory; | |
public StreamReaderState(FileStream source, int bufferSize) | |
{ | |
_bufferSize = bufferSize; | |
_factory = Observable.FromAsyncPattern<byte[], int, int, int>(source.BeginRead, source.EndRead); | |
Buffer = new byte[bufferSize]; | |
} | |
public IObservable<int> ReadNext() | |
{ | |
return _factory(Buffer, 0, _bufferSize); | |
} | |
public byte[] Buffer {get;set;} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment