Skip to content

Instantly share code, notes, and snippets.

@saga
Created May 16, 2011 05:19
Show Gist options
  • Save saga/973970 to your computer and use it in GitHub Desktop.
Save saga/973970 to your computer and use it in GitHub Desktop.
Reactive Extension 学习
http://blogs.msdn.com/b/rxteam/
http://www.codeproject.com/KB/Parallel_Programming/RxByExample.aspx
http://msdn.microsoft.com/en-us/data/gg577609
http://stackoverflow.com/questions/1969036/reactive-extensions-rx-and-asynchronous-class
Using dotnet4.0 needs some modifications !! Follow me.
1,Observer模式
http://msdn.microsoft.com/en-us/library/dd990377(VS.100).aspx
自己实现IObserver和IObserable
其中IObserable中private list<T>,在Subscribe()调用list.Add(xx).
在IDispose的Dispose()中调用list.Remove(xx).
在event发生时调用list.OnNext(xx).
public void TrackLocation(Nullable<Location> loc)
{
foreach (var observer in observers) {
if (! loc.HasValue)
observer.OnError(new LocationUnknownException());
else
observer.OnNext(loc.Value);
}
}
在IObserver观察者class中,实现OnComplete, OnError, OnNext
2, 使用Rx, example1
DotNet3.5 add reference system.reactive; system.coreex, system.threading.
DotNet4.0 add system.reactive;
注意Observable这个静态类,在3.5中属于System.Linq.Observable,
在4.0时候属于System.Reactive.Linq.Observable,非常之不一样。
可使用ILSpy观察这两个版本的system.reactive.dll
另外4.0版本的reactive没有forkjoin,貌似可以直接使用join().
////////////////////////////
var result = GetAllDirectories(@"c:\aaa\");
var observableResult = result.ToObservable();
observableResult.Subscribe((line)=>
{
Console.WriteLine(line);
});
////////////////////////////
static IEnumerable<string> GetAllDirectories(string path)
{
string[] subdirs = null;
subdirs = Directory.GetDirectories(path);
if (subdirs != null)
{
foreach (var subdir in subdirs)
{
yield return subdir;
foreach (var grandchild in GetAllDirectories(subdir))
{
yield return grandchild;
}
}
}
}
///////////////////////////
在ConsoleOutputter中,间隔一段时间向stdout写内容。
在下面代码将process重定向到StandardOutput,然后通过GetLineReader得到IEnumerable接口。
var info = new ProcessStartInfo("./ConsoleOutputter.exe");
info.RedirectStandardError = true;
info.RedirectStandardOutput = true;
info.UseShellExecute = false;
var process = Process.Start(info);
var childStdOut = GetLineReader(process.StandardOutput).ToObservable();
///////////////////////////////////////
private static IEnumerable<string> GetLineReader(StreamReader reader)
{
while (reader.BaseStream.CanRead)
{
var l = reader.ReadLine();
if (l == null)
{
break;
}
yield return l;
}
}
3, 使用Rx, example2
其中bufferedDirectories订阅输出函数的参数为IEnumerable<string>
IObservable<string> directories;
IObservable<IEnumerable<string>> bufferedDirectories;
IDisposable observer;
////// 在构造函数中,创建两个IEnumerable成员。
public DirectoriesForm()
{
directories = System.Reactive.Linq.Observable.ToObservable(GetAllDirectories(@"c:\ov\source\B2bAuthentication Service\"));
bufferedDirectories = System.Reactive.Linq.Observable.Buffer<string>(directories, TimeSpan.FromSeconds(1));
......
}
private void butStop_Click(object sender, EventArgs e)
{
if (this.observer != null)
{
this.observer.Dispose();
this.observer = null;
this.butStop.Enabled = false;
this.butObserverSingle.Enabled = true;
this.butObserveBuffered.Enabled = true;
}
}
private void butObserverSingle_Click(object sender, EventArgs e)
{
this.treeViewDirectories.Nodes.Clear();
if (this.observer == null)
{
// 建立订阅关系 !!!!!!!!!!!!!!!!!
this.observer = this.directories.Subscribe(outputDirectory);
this.butStop.Enabled = true;
this.butObserverSingle.Enabled = false;
this.butObserveBuffered.Enabled = false;
}
}
private void butObserveBuffered_Click(object sender, EventArgs e)
{
this.treeViewDirectories.Nodes.Clear();
if (this.observer == null)
{
// 建立订阅关系 !!!!!!!!!!!!!!!!!
this.observer = this.bufferedDirectories.Subscribe(outputDirectories);
this.butStop.Enabled = true;
this.butObserverSingle.Enabled = false;
this.butObserveBuffered.Enabled = false;
}
}
private void outputDirectory(string path)
{
// We check to see if the handle is created because when
// the form is disposing this may still be trying to observe.
if (this.treeViewDirectories.IsHandleCreated)
{
this.treeViewDirectories.Nodes.Add(path);
}
}
private void outputDirectories(IEnumerable<string> paths)
{
// We check to see if the handle is created because when
// the form is disposing this may still be trying to observe.
if (this.treeViewDirectories.IsHandleCreated)
{
try
{
this.treeViewDirectories.BeginUpdate();
foreach (var path in paths)
{
this.treeViewDirectories.Nodes.Add(path);
}
}
finally
{
this.treeViewDirectories.EndUpdate();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment