Skip to content

Instantly share code, notes, and snippets.

@anaisbetts
Created May 8, 2013 22:14
Show Gist options
  • Save anaisbetts/5544087 to your computer and use it in GitHub Desktop.
Save anaisbetts/5544087 to your computer and use it in GitHub Desktop.
An Rx-friendly Filesystem Watcher
using System;
using System.IO;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using ReactiveUI;
namespace SaveAllTheTime.Models
{
interface IFilesystemWatchCache
{
IObservable<string> Register(string directory, string filter = null);
}
public class FilesystemWatchCache : IFilesystemWatchCache
{
MemoizingMRUCache<Tuple<string, string>, IObservable<string>> watchCache = new MemoizingMRUCache<Tuple<string, string>, IObservable<string>>((pair, _) => {
return Observable.Create<string>(subj => {
var disp = new CompositeDisposable();
var fsw = pair.Item2 != null ?
new FileSystemWatcher(pair.Item1, pair.Item2) :
new FileSystemWatcher(pair.Item1);
disp.Add(fsw);
var allEvents = Observable.Merge(
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(x => fsw.Changed += x, x => fsw.Changed -= x),
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(x => fsw.Created += x, x => fsw.Created -= x),
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(x => fsw.Deleted += x, x => fsw.Deleted -= x));
disp.Add(allEvents.Throttle(TimeSpan.FromMilliseconds(250), RxApp.TaskpoolScheduler)
.Select(x => x.EventArgs.FullPath)
.Synchronize(subj)
.Subscribe(subj));
fsw.EnableRaisingEvents = true;
return disp;
}).Publish().RefCount();
}, 25);
public IObservable<string> Register(string directory, string filter = null)
{
lock (watchCache) {
return watchCache.Get(Tuple.Create(directory, filter));
}
}
}
}
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Reactive.Testing;
using ReactiveUI;
using System.Reactive.Linq;
using System.Reactive.Disposables;
using ReactiveUI.Testing;
using SaveAllTheTime.Models;
using Xunit;
namespace SaveAllTheTime.Tests.Models
{
public class FilesystemWatchCacheTests : IEnableLogger
{
[Fact]
public void FilesystemWatchCacheSmokeTest()
{
(new TestScheduler()).With(sched => {
var targetDir = Path.GetTempPath();
var targetFile = Path.Combine(targetDir, Guid.NewGuid() + "__" + Guid.NewGuid());
var fixture = (new FilesystemWatchCache()).Register(targetDir);
var output = fixture.CreateCollection();
File.WriteAllText(targetFile, "Foo");
File.Delete(targetFile);
Assert.Equal(0, output.Count);
sched.AdvanceByMs(100);
Assert.Equal(0, output.Count);
sched.AdvanceByMs(250);
Assert.NotEqual(0, output.Count);
Assert.True(output.Contains(targetFile));
var currentCount = output.Count;
output.Dispose();
File.WriteAllText(targetFile, "Foo");
File.Delete(targetFile);
sched.AdvanceByMs(10000);
Assert.Equal(currentCount, output.Count);
foreach (var v in output) {
this.Log().Info(v);
}
});
}
}
}
@bradwilson
Copy link

Line 13 of the tests is my favorite. :)

@anaisbetts
Copy link
Author

@bradwilson Always be XUniting, every time

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment