Created
November 16, 2015 10:10
-
-
Save JamesTryand/040e1024a571b5a78a43 to your computer and use it in GitHub Desktop.
Stream Listener - C# variable rate logstash equivalent.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Collections.ObjectModel; | |
using System.Linq; | |
using System.Reactive.Concurrency; | |
using System.Reactive.Linq; | |
using System.Text; | |
using System.Threading.Tasks; | |
// Please Note this has a dependency on Rx-Main | |
namespace The.Infrastructure.Streams | |
{ | |
public class SchedulerPageHandle<T> | |
{ | |
public readonly T Head; | |
public readonly int PageSize; | |
public readonly int CurrentCount; | |
public readonly bool LastWasEmpty; | |
public SchedulerPageHandle(T head, int pageSize, int currentCount, bool lastWasEmpty) | |
{ | |
Head = head; | |
PageSize = pageSize; | |
CurrentCount = currentCount; | |
LastWasEmpty = lastWasEmpty; | |
} | |
} | |
/// <summary> | |
/// The actual implmentation and scheduling to adjust the rate of polling | |
/// </summary> | |
public class SchedulerPollerRate : IStateEnums | |
{ | |
public static string Immediate = "Immediate"; | |
public static string Fast = "Fast"; | |
public static string Quick = "Quick"; | |
public static string Slow = "Slow"; | |
public static string Hybernate = "Hybernate"; | |
public static SchedulerPollerRate ImmediateInstance = new SchedulerPollerRate( | |
StateName: SchedulerPollerRate.Immediate, | |
MaxValue: -1, MaxState: SchedulerPollerRate.Immediate, | |
MinValue: 2, MinState: SchedulerPollerRate.Fast, | |
RateDuration: TimeSpan.Zero); | |
public static SchedulerPollerRate FastInstance = new SchedulerPollerRate( | |
StateName: SchedulerPollerRate.Fast, | |
MaxValue: 2, MaxState: SchedulerPollerRate.Immediate, | |
MinValue: 10, MinState: SchedulerPollerRate.Quick, | |
RateDuration: TimeSpan.FromSeconds(30)); | |
public static SchedulerPollerRate QuickInstance = new SchedulerPollerRate( | |
StateName: SchedulerPollerRate.Quick, | |
MaxValue: 2, MaxState: SchedulerPollerRate.Fast, | |
MinValue: 12, MinState: SchedulerPollerRate.Slow, | |
RateDuration: TimeSpan.FromMinutes(5)); | |
public static SchedulerPollerRate SlowInstance = new SchedulerPollerRate( | |
StateName: SchedulerPollerRate.Slow, | |
MaxValue: 2, MaxState: SchedulerPollerRate.Quick, | |
MinValue: 24, MinState: SchedulerPollerRate.Hybernate, | |
RateDuration: TimeSpan.FromHours(1)); | |
public static SchedulerPollerRate HybernateInstance = new SchedulerPollerRate( | |
StateName: SchedulerPollerRate.Hybernate, | |
MaxValue: 1, MaxState: SchedulerPollerRate.Slow, | |
MinValue: -1, MinState: SchedulerPollerRate.Hybernate, | |
RateDuration: TimeSpan.FromDays(1)); | |
public SchedulerPollerRate(string StateName, int MaxValue, string MaxState, int MinValue, string MinState, TimeSpan RateDuration) | |
{ | |
this.stateName = StateName; | |
this.maxValue = MaxValue; | |
this.maxState = MaxState; | |
this.minValue = MinValue; | |
this.minState = MinState; | |
this.rateDuration = RateDuration; | |
} | |
private readonly string stateName; | |
private readonly int maxValue; | |
private readonly int minValue; | |
private readonly string maxState; | |
private readonly string minState; | |
private readonly TimeSpan rateDuration; | |
public string StateName { get { return stateName; } } | |
public int MaxValue { get { return maxValue; } } | |
public int MinValue { get { return minValue; } } | |
public string MaxState { get { return maxState; } } | |
public string MinState { get { return minState; } } | |
public TimeSpan RateDuration { get { return rateDuration; } } | |
} | |
public delegate KeyValuePair<StreamState<TState, TStateEnums, TResult, TResultSource>, IObservable<TResult>> StateAction<TState, TStateEnums, TResult, TResultSource>(StreamState<TState, TStateEnums, TResult, TResultSource> priorState) where TStateEnums : IStateEnums; | |
/// <summary> | |
/// This service is for the specific scenario where you have something that produces an enumerable, and you need | |
/// to repeatedly call this, but need to vary the schedule depending upon the results. | |
/// | |
/// To use this, it has an internal state object wrapped by it's own scheduled state wrapper | |
/// The initial state along with what determines how to change the state & schedule is provided along with | |
/// the rx scheduler that determines where this is run | |
/// and of course the method that lets you do the work (recursively). | |
/// So to use it specify | |
/// Where (to run this); | |
/// How (its configured); | |
/// What (to run) | |
/// </summary> | |
public class StreamingService | |
{ | |
public class PubSubScheduler | |
{ | |
/// <summary> | |
/// Returns a method for instantiating a subscription based pull, ready to subscribe to. | |
/// Basic : interface specifying only the return type and assuming a position indicator of long. | |
/// </summary> | |
/// <typeparam name="T">The Type</typeparam> | |
/// <param name="pubpull">The method to return the next set of values</param> | |
/// <param name="newHead">A method to determine the next new head value</param> | |
/// <param name="sourceCannotProgress">Method used to determine if the scheduler should start altering its request rate.</param> | |
/// <returns></returns> | |
public Func<long, int, IObservable<IObservable<T>>> PubSubSchedulerFactory<T>(Func<long, int, IEnumerable<T>> pubpull, Func<IEnumerable<T>, long> newHead, Func<IEnumerable<T>, bool> sourceCannotProgress) | |
{ | |
return PubSubSchedulerFactory<T, IEnumerable<T>, long>(pubpull, newHead, sourceCannotProgress); | |
} | |
/// <summary> | |
/// Returns a method for instantiating a subscription based pull, ready to subscribe to. | |
/// Basic : interface specifying only the return type and assuming a position indicator of long. | |
/// </summary> | |
/// <typeparam name="TResult">The Return Type</typeparam> | |
/// <typeparam name="THead">The Type of the Head</typeparam> | |
/// <param name="pubpull">The method to return the next set of values</param> | |
/// <param name="newHead">A method to determine the next new head value</param> | |
/// <param name="sourceCannotProgress">Method used to determine if the scheduler should start altering its request rate.</param> | |
/// <returns></returns> | |
public Func<THead, int, IObservable<IObservable<TResult>>> PubSubSchedulerFactory<TResult, THead>(Func<THead, int, IEnumerable<TResult>> pubpull, Func<IEnumerable<TResult>, THead> newHead, Func<IEnumerable<TResult>, bool> sourceCannotProgress) | |
{ | |
return PubSubSchedulerFactory<TResult, IEnumerable<TResult>, THead>(pubpull, newHead, sourceCannotProgress); | |
} | |
/// <summary> | |
/// Returns a method for instantiating a subscription based pull, ready to subscribe to. | |
/// Basic++ : same as basic but can specify distinct states involved. | |
/// </summary> | |
/// <typeparam name="TResult">The Event Container Return Type</typeparam> | |
/// <typeparam name="TResultGenerator">The type for returning the result of the return type TResult</typeparam> | |
/// <typeparam name="TPosition">The Type used to refer to position</typeparam> | |
/// <param name="pubpull">The method to return the next set of values</param> | |
/// <param name="newHead">A method to determine the next new head value</param> | |
/// <param name="sourceCannotProgress">Method used to determine if the scheduler should start altering its request rate.</param> | |
/// <returns></returns> | |
public Func<TPosition, int, IObservable<IObservable<TResult>>> PubSubSchedulerFactory<TResult, TResultGenerator, TPosition>(Func<TPosition, int, TResultGenerator> pubpull, Func<TResultGenerator, TPosition> newHead, Func<TResultGenerator, bool> sourceCannotProgress) where TResultGenerator : IEnumerable<TResult> | |
{ | |
return PubSubAdvancedSchedulerFactory<TResult, TResultGenerator, TPosition>(newHead, sourceCannotProgress, | |
streamstate => // This is the actual job being used. | |
{ | |
var enumerableValues = pubpull(streamstate.Of.Head, streamstate.Of.PageSize); | |
return new KeyValuePair<StreamState<SchedulerPageHandle<TPosition>, SchedulerPollerRate, TResult, TResultGenerator>, IObservable<TResult>>( | |
streamstate.Following(enumerableValues), | |
(enumerableValues as IEnumerable<TResult>).ToObservable(NewThreadScheduler.Default)); // the actual loop | |
}); | |
} | |
/// <summary> | |
/// Returns a method for instantiating a subscription based pull, ready to subscribe to. | |
/// Advanced : providing access to the stateAction used for the transition. | |
/// </summary> | |
/// <typeparam name="TResult">The Event Container Return Type</typeparam> | |
/// <typeparam name="TResultGenerator">The type for returning the result of the return type TResult</typeparam> | |
/// <typeparam name="TPosition">The Type used to refer to position</typeparam> | |
/// <param name="newHead">A method to determine the next new head valu</param> | |
/// <param name="sourceCannotProgress">Method used to determine if the scheduler should start altering its request rate.</param> | |
/// <param name="stateAction">The method used in transitioning state.</param> | |
/// <returns></returns> | |
public Func<TPosition, int, IObservable<IObservable<TResult>>> PubSubAdvancedSchedulerFactory<TResult, TResultGenerator, TPosition>(Func<TResultGenerator, TPosition> newHead, Func<TResultGenerator, bool> sourceCannotProgress, StateAction<SchedulerPageHandle<TPosition>, SchedulerPollerRate, TResult, TResultGenerator> stateAction) | |
{ | |
var SS = new StreamingService(); | |
return (head, batchSize) => SS.StreamEnumerableWithSchedule<SchedulerPageHandle<TPosition>, SchedulerPollerRate, TResult, TResultGenerator>( | |
Scheduler.CurrentThread, | |
// Here is the initialization | |
() => new StreamState<SchedulerPageHandle<TPosition>, SchedulerPollerRate, TResult, TResultGenerator>( | |
new SchedulerPollerRate[] { | |
SchedulerPollerRate.ImmediateInstance, | |
SchedulerPollerRate.FastInstance, | |
SchedulerPollerRate.QuickInstance, | |
SchedulerPollerRate.SlowInstance, | |
SchedulerPollerRate.HybernateInstance | |
}.ToList(), | |
SchedulerPollerRate.ImmediateInstance, | |
new SchedulerPageHandle<TPosition>(head, batchSize, 0, false), | |
(value, state) => | |
{ | |
/// so what are the rules? | |
/// Well it's either going to go up or down | |
/// if the current one is less than the max carry on ( just increase the value ) | |
/// else swap to the new one ( and reset the values ) | |
// bool isEmpty = !value.Any(); | |
bool isEmpty = sourceCannotProgress(value); | |
int comparitorValue = isEmpty | |
? state.CurrentPollRate.MinValue | |
: state.CurrentPollRate.MaxValue; | |
string potentialNextState = isEmpty | |
? state.CurrentPollRate.MinState | |
: state.CurrentPollRate.MaxState; | |
var nextCount = state.Of.LastWasEmpty == isEmpty // if it was the same as the last one | |
? state.Of.CurrentCount + 1 | |
: 0; | |
var nextHead = isEmpty | |
? state.Of.Head | |
: newHead(value); | |
return state.Of.CurrentCount >= comparitorValue && isEmpty | |
? new StreamState<SchedulerPageHandle<TPosition>, SchedulerPollerRate, TResult, TResultGenerator>( | |
pollRateStates: state.PollRateStates, | |
currentPollRate: state.PollRateStates.Where(p => p.StateName == potentialNextState).FirstOrDefault(), | |
of: new SchedulerPageHandle<TPosition>( | |
head: nextHead, | |
pageSize: state.Of.PageSize, | |
currentCount: nextCount, | |
lastWasEmpty: isEmpty), | |
nextState: state.NextState) | |
: new StreamState<SchedulerPageHandle<TPosition>, SchedulerPollerRate, TResult, TResultGenerator>( | |
pollRateStates: state.PollRateStates, | |
currentPollRate: state.CurrentPollRate, | |
of: new SchedulerPageHandle<TPosition>( | |
head: nextHead, | |
pageSize: state.Of.PageSize, | |
currentCount: nextCount, | |
lastWasEmpty: isEmpty), | |
nextState: state.NextState); | |
}), | |
stateAction); | |
} | |
} | |
public IObservable<IObservable<TResult>> StreamEnumerableWithSchedule<TMember, TStateEnums, TResult, TResultSource>( | |
IScheduler scheduler, | |
Func<StreamState<TMember, TStateEnums, TResult, TResultSource>> initialisation, | |
StateAction<TMember, TStateEnums, TResult, TResultSource> stateAction) where TStateEnums : IStateEnums | |
{ | |
return Observable.Create<IObservable<TResult>>(observer => | |
{ | |
var state = initialisation(); | |
return scheduler.Schedule<StreamState<TMember, TStateEnums, TResult, TResultSource>>(state, state.CurrentPollRate.RateDuration, (sched, streamstate) => | |
{ | |
return sched.Schedule<StreamState<TMember, TStateEnums, TResult, TResultSource>>(streamstate, streamstate.CurrentPollRate.RateDuration, (sstate, action) => | |
{ | |
var stateresult = stateAction(sstate); | |
observer.OnNext(stateresult.Value); | |
var nextstate = stateresult.Key; | |
action(nextstate, nextstate.CurrentPollRate.RateDuration); | |
}); | |
}); | |
}); | |
} | |
} | |
public class StreamState<TState, TStateEnums, TResult, TResultSource> where TStateEnums : IStateEnums | |
{ | |
public readonly ReadOnlyCollection<TStateEnums> PollRateStates; | |
public readonly TStateEnums CurrentPollRate; | |
public readonly TState Of; | |
public readonly Func<TResultSource, StreamState<TState, TStateEnums, TResult, TResultSource>, StreamState<TState, TStateEnums, TResult, TResultSource>> NextState; | |
public StreamState(IList<TStateEnums> pollRateStates, TStateEnums currentPollRate, TState of, Func<TResultSource, StreamState<TState, TStateEnums, TResult, TResultSource>, StreamState<TState, TStateEnums, TResult, TResultSource>> nextState) | |
{ | |
this.PollRateStates = new ReadOnlyCollection<TStateEnums>(pollRateStates); | |
this.CurrentPollRate = currentPollRate; | |
this.Of = of; | |
this.NextState = nextState; | |
} | |
public StreamState<TState, TStateEnums, TResult, TResultSource> Following(TResultSource result) | |
{ | |
return NextState(result, this); | |
} | |
} | |
/// <summary> | |
/// This interface is to enable the different states to switch between. | |
/// The state machine envisaged here is to be a sequential set of ranges, rather than a true arbitary state machine. | |
/// The reason for this is that the states are designed to be moving between different rate. | |
/// </summary> | |
public interface IStateEnums | |
{ | |
string StateName { get; } | |
int MaxValue { get; } | |
int MinValue { get; } | |
string MaxState { get; } | |
string MinState { get; } | |
TimeSpan RateDuration { get; } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment