Created
May 16, 2011 05:19
-
-
Save saga/973970 to your computer and use it in GitHub Desktop.
Reactive Extension 学习
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
http://blogs.msdn.com/b/rxteam/ | |
http://www.codeproject.com/KB/Parallel_Programming/RxByExample.aspx | |
http://msdn.microsoft.com/en-us/data/gg577609 | |
http://stackoverflow.com/questions/1969036/reactive-extensions-rx-and-asynchronous-class | |
Using dotnet4.0 needs some modifications !! Follow me. | |
1,Observer模式 | |
http://msdn.microsoft.com/en-us/library/dd990377(VS.100).aspx | |
自己实现IObserver和IObserable | |
其中IObserable中private list<T>,在Subscribe()调用list.Add(xx). | |
在IDispose的Dispose()中调用list.Remove(xx). | |
在event发生时调用list.OnNext(xx). | |
public void TrackLocation(Nullable<Location> loc) | |
{ | |
foreach (var observer in observers) { | |
if (! loc.HasValue) | |
observer.OnError(new LocationUnknownException()); | |
else | |
observer.OnNext(loc.Value); | |
} | |
} | |
在IObserver观察者class中,实现OnComplete, OnError, OnNext | |
2, 使用Rx, example1 | |
DotNet3.5 add reference system.reactive; system.coreex, system.threading. | |
DotNet4.0 add system.reactive; | |
注意Observable这个静态类,在3.5中属于System.Linq.Observable, | |
在4.0时候属于System.Reactive.Linq.Observable,非常之不一样。 | |
可使用ILSpy观察这两个版本的system.reactive.dll | |
另外4.0版本的reactive没有forkjoin,貌似可以直接使用join(). | |
//////////////////////////// | |
var result = GetAllDirectories(@"c:\aaa\"); | |
var observableResult = result.ToObservable(); | |
observableResult.Subscribe((line)=> | |
{ | |
Console.WriteLine(line); | |
}); | |
//////////////////////////// | |
static IEnumerable<string> GetAllDirectories(string path) | |
{ | |
string[] subdirs = null; | |
subdirs = Directory.GetDirectories(path); | |
if (subdirs != null) | |
{ | |
foreach (var subdir in subdirs) | |
{ | |
yield return subdir; | |
foreach (var grandchild in GetAllDirectories(subdir)) | |
{ | |
yield return grandchild; | |
} | |
} | |
} | |
} | |
/////////////////////////// | |
在ConsoleOutputter中,间隔一段时间向stdout写内容。 | |
在下面代码将process重定向到StandardOutput,然后通过GetLineReader得到IEnumerable接口。 | |
var info = new ProcessStartInfo("./ConsoleOutputter.exe"); | |
info.RedirectStandardError = true; | |
info.RedirectStandardOutput = true; | |
info.UseShellExecute = false; | |
var process = Process.Start(info); | |
var childStdOut = GetLineReader(process.StandardOutput).ToObservable(); | |
/////////////////////////////////////// | |
private static IEnumerable<string> GetLineReader(StreamReader reader) | |
{ | |
while (reader.BaseStream.CanRead) | |
{ | |
var l = reader.ReadLine(); | |
if (l == null) | |
{ | |
break; | |
} | |
yield return l; | |
} | |
} | |
3, 使用Rx, example2 | |
其中bufferedDirectories订阅输出函数的参数为IEnumerable<string> | |
IObservable<string> directories; | |
IObservable<IEnumerable<string>> bufferedDirectories; | |
IDisposable observer; | |
////// 在构造函数中,创建两个IEnumerable成员。 | |
public DirectoriesForm() | |
{ | |
directories = System.Reactive.Linq.Observable.ToObservable(GetAllDirectories(@"c:\ov\source\B2bAuthentication Service\")); | |
bufferedDirectories = System.Reactive.Linq.Observable.Buffer<string>(directories, TimeSpan.FromSeconds(1)); | |
...... | |
} | |
private void butStop_Click(object sender, EventArgs e) | |
{ | |
if (this.observer != null) | |
{ | |
this.observer.Dispose(); | |
this.observer = null; | |
this.butStop.Enabled = false; | |
this.butObserverSingle.Enabled = true; | |
this.butObserveBuffered.Enabled = true; | |
} | |
} | |
private void butObserverSingle_Click(object sender, EventArgs e) | |
{ | |
this.treeViewDirectories.Nodes.Clear(); | |
if (this.observer == null) | |
{ | |
// 建立订阅关系 !!!!!!!!!!!!!!!!! | |
this.observer = this.directories.Subscribe(outputDirectory); | |
this.butStop.Enabled = true; | |
this.butObserverSingle.Enabled = false; | |
this.butObserveBuffered.Enabled = false; | |
} | |
} | |
private void butObserveBuffered_Click(object sender, EventArgs e) | |
{ | |
this.treeViewDirectories.Nodes.Clear(); | |
if (this.observer == null) | |
{ | |
// 建立订阅关系 !!!!!!!!!!!!!!!!! | |
this.observer = this.bufferedDirectories.Subscribe(outputDirectories); | |
this.butStop.Enabled = true; | |
this.butObserverSingle.Enabled = false; | |
this.butObserveBuffered.Enabled = false; | |
} | |
} | |
private void outputDirectory(string path) | |
{ | |
// We check to see if the handle is created because when | |
// the form is disposing this may still be trying to observe. | |
if (this.treeViewDirectories.IsHandleCreated) | |
{ | |
this.treeViewDirectories.Nodes.Add(path); | |
} | |
} | |
private void outputDirectories(IEnumerable<string> paths) | |
{ | |
// We check to see if the handle is created because when | |
// the form is disposing this may still be trying to observe. | |
if (this.treeViewDirectories.IsHandleCreated) | |
{ | |
try | |
{ | |
this.treeViewDirectories.BeginUpdate(); | |
foreach (var path in paths) | |
{ | |
this.treeViewDirectories.Nodes.Add(path); | |
} | |
} | |
finally | |
{ | |
this.treeViewDirectories.EndUpdate(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment