Skip to content

Instantly share code, notes, and snippets.

@JoeRobich
Created January 27, 2013 19:32
Show Gist options
  • Save JoeRobich/4649963 to your computer and use it in GitHub Desktop.
Save JoeRobich/4649963 to your computer and use it in GitHub Desktop.
Quick stab at an architecture for running processes over data.
public interface IRunProcesses<T>
{
void RegisterProcess(IProcessItems<T> process);
}
public interface IProcessItems<T>
{
void Process(IEnumerable<T> items);
}
public interface IQueryItems<T>
{
IEnumerable<T> Query();
}
public interface IFilterItems<T>
{
IEnumerable<T> Filter(IEnumerable<T> items);
}
public class AllItemFilter<T> : IFilterItems<T>
{
public IEnumerable<T> Filter(IEnumerable<T> items)
{
return items;
}
}
public class EmptyQuery<T> : IQueryItems<T>
{
public IEnumerable<T> Query()
{
return new List<T>();
}
}
public class Processor<T> : IRunProcesses<T>, IDisposable
{
protected readonly List<IProcessItems<T>> _processes;
private readonly IQueryItems<T> _query;
protected bool _disposed = false;
public Processor()
: this(new EmptyQuery<T>())
{
}
public Processor(IQueryItems<T> query)
{
_processes = new List<IProcessItems<T>>();
_query = query;
}
~Processor()
{
Dispose(false);
}
public virtual void RegisterProcess(IProcessItems<T> process)
{
if (!_processes.Contains(process))
_processes.Add(process);
}
public virtual void Process()
{
foreach (var process in _processes)
process.Process(_query.Query());
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
var disposableQuery = _query as IDisposable;
if (disposableQuery != null)
disposableQuery.Dispose();
}
_disposed = true;
}
}
public class PeriodicProcessor<T> : Processor<T>
{
protected long Interval { get; set;}
protected Timer _timer;
public PeriodicProcessor(long interval)
: base()
{
Interval = interval;
}
public PeriodicProcessor(long interval, IQueryItems<T> query)
: base(query)
{
Interval = interval;
}
public void Start()
{
if (_timer == null)
_timer = new Timer(StartProcessing, null, Interval, Timeout.Infinite);
}
public void Stop()
{
if (_timer != null)
{
_timer.Dispose();
_timer = null;
}
}
protected virtual void StartProcessing(object state)
{
Process();
_timer.Change(Interval, Timeout.Infinite);
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
if (_timer != null)
{
_timer.Dispose();
_timer = null;
}
}
base.Dispose(disposing);
}
}
public abstract class FilteredProcess<T> : IProcessItems<T>
{
protected readonly IFilterItems<T> _filter;
public FilteredProcess()
: this(new AllItemFilter<T>())
{
}
public FilteredProcess(IFilterItems<T> filter)
{
_filter = filter;
}
public void Process(IEnumerable<T> items)
{
ProcessItems(_filter.Filter(items));
}
public abstract void ProcessItems(IEnumerable<T> items);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment