Last active
December 16, 2015 02:38
-
-
Save pierre3/5363558 to your computer and use it in GitHub Desktop.
async /await で複数の値を返す場合は IObservable
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// 複数のファイルをまとめて非同期に読む | |
// Observable.Create がいいらしい | |
// IObservable<T> Create<T>(Func<IObserver<T>,CancellationToken, Task> subscribe) | |
// このオーバーロードがない(Rx2.0?)ので自作してみる | |
private IObservable<Tuple<string, string>> OpenFiles(string[] fileNames) | |
{ | |
return ObservableEx.Create<Tuple<string, string>>(async (observer, ct) => | |
{ | |
try | |
{ | |
foreach (var name in fileNames) | |
{ | |
if (ct.IsCancellationRequested) | |
{ return; } | |
using (var reader = new System.IO.StreamReader(name)) | |
{ | |
var text = await reader.ReadToEndAsync().ConfigureAwait(false); | |
observer.OnNext(Tuple.Create(System.IO.Path.GetFileName(name), text)); | |
} | |
await Task.Delay(2000);//動作確認用 | |
} | |
observer.OnCompleted(); | |
} | |
catch (Exception e) | |
{ | |
observer.OnError(e); | |
} | |
}); | |
} | |
private async void FileOpened(string[] fileNames) | |
{ | |
StatusMessage = "*** Opened Files ***" + Environment.NewLine; | |
var disposable = OpenFiles(fileNames).Subscribe(file => | |
{ | |
StatusMessage += file.Item1 + Environment.NewLine; | |
FileContentCollection.Add(file); | |
}, | |
e=> StatusMessage += "Error: " + e.Message, | |
() => StatusMessage += "Completed."); | |
//キャンセルしてみる | |
await Task.Delay(4000); | |
disposable.Dispose(); | |
} | |
//非同期処理用のObservable.Create | |
public static class ObservableEx | |
{ | |
public static IObservable<T> Create<T>(Func<IObserver<T>,CancellationToken, Task> subscribe) | |
{ | |
return Observable.Create<T>(observer => | |
{ | |
var cts = new CancellationTokenSource(); | |
var subscription = subscribe(observer,cts.Token).ToObservable() | |
.Subscribe(_ => { }, observer.OnError, observer.OnCompleted); | |
return new CompositeDisposable(subscription,new CancellationDisposable(cts)); | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment