Skip to content

Instantly share code, notes, and snippets.

@JamesTryand
Created November 16, 2015 10:10
Show Gist options
  • Save JamesTryand/040e1024a571b5a78a43 to your computer and use it in GitHub Desktop.
Save JamesTryand/040e1024a571b5a78a43 to your computer and use it in GitHub Desktop.
Stream Listener - C# variable rate logstash equivalent.
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
// Please Note this has a dependency on Rx-Main
namespace The.Infrastructure.Streams
{
public class SchedulerPageHandle<T>
{
public readonly T Head;
public readonly int PageSize;
public readonly int CurrentCount;
public readonly bool LastWasEmpty;
public SchedulerPageHandle(T head, int pageSize, int currentCount, bool lastWasEmpty)
{
Head = head;
PageSize = pageSize;
CurrentCount = currentCount;
LastWasEmpty = lastWasEmpty;
}
}
/// <summary>
/// The actual implmentation and scheduling to adjust the rate of polling
/// </summary>
public class SchedulerPollerRate : IStateEnums
{
public static string Immediate = "Immediate";
public static string Fast = "Fast";
public static string Quick = "Quick";
public static string Slow = "Slow";
public static string Hybernate = "Hybernate";
public static SchedulerPollerRate ImmediateInstance = new SchedulerPollerRate(
StateName: SchedulerPollerRate.Immediate,
MaxValue: -1, MaxState: SchedulerPollerRate.Immediate,
MinValue: 2, MinState: SchedulerPollerRate.Fast,
RateDuration: TimeSpan.Zero);
public static SchedulerPollerRate FastInstance = new SchedulerPollerRate(
StateName: SchedulerPollerRate.Fast,
MaxValue: 2, MaxState: SchedulerPollerRate.Immediate,
MinValue: 10, MinState: SchedulerPollerRate.Quick,
RateDuration: TimeSpan.FromSeconds(30));
public static SchedulerPollerRate QuickInstance = new SchedulerPollerRate(
StateName: SchedulerPollerRate.Quick,
MaxValue: 2, MaxState: SchedulerPollerRate.Fast,
MinValue: 12, MinState: SchedulerPollerRate.Slow,
RateDuration: TimeSpan.FromMinutes(5));
public static SchedulerPollerRate SlowInstance = new SchedulerPollerRate(
StateName: SchedulerPollerRate.Slow,
MaxValue: 2, MaxState: SchedulerPollerRate.Quick,
MinValue: 24, MinState: SchedulerPollerRate.Hybernate,
RateDuration: TimeSpan.FromHours(1));
public static SchedulerPollerRate HybernateInstance = new SchedulerPollerRate(
StateName: SchedulerPollerRate.Hybernate,
MaxValue: 1, MaxState: SchedulerPollerRate.Slow,
MinValue: -1, MinState: SchedulerPollerRate.Hybernate,
RateDuration: TimeSpan.FromDays(1));
public SchedulerPollerRate(string StateName, int MaxValue, string MaxState, int MinValue, string MinState, TimeSpan RateDuration)
{
this.stateName = StateName;
this.maxValue = MaxValue;
this.maxState = MaxState;
this.minValue = MinValue;
this.minState = MinState;
this.rateDuration = RateDuration;
}
private readonly string stateName;
private readonly int maxValue;
private readonly int minValue;
private readonly string maxState;
private readonly string minState;
private readonly TimeSpan rateDuration;
public string StateName { get { return stateName; } }
public int MaxValue { get { return maxValue; } }
public int MinValue { get { return minValue; } }
public string MaxState { get { return maxState; } }
public string MinState { get { return minState; } }
public TimeSpan RateDuration { get { return rateDuration; } }
}
public delegate KeyValuePair<StreamState<TState, TStateEnums, TResult, TResultSource>, IObservable<TResult>> StateAction<TState, TStateEnums, TResult, TResultSource>(StreamState<TState, TStateEnums, TResult, TResultSource> priorState) where TStateEnums : IStateEnums;
/// <summary>
/// This service is for the specific scenario where you have something that produces an enumerable, and you need
/// to repeatedly call this, but need to vary the schedule depending upon the results.
///
/// To use this, it has an internal state object wrapped by it's own scheduled state wrapper
/// The initial state along with what determines how to change the state & schedule is provided along with
/// the rx scheduler that determines where this is run
/// and of course the method that lets you do the work (recursively).
/// So to use it specify
/// Where (to run this);
/// How (its configured);
/// What (to run)
/// </summary>
public class StreamingService
{
public class PubSubScheduler
{
/// <summary>
/// Returns a method for instantiating a subscription based pull, ready to subscribe to.
/// Basic : interface specifying only the return type and assuming a position indicator of long.
/// </summary>
/// <typeparam name="T">The Type</typeparam>
/// <param name="pubpull">The method to return the next set of values</param>
/// <param name="newHead">A method to determine the next new head value</param>
/// <param name="sourceCannotProgress">Method used to determine if the scheduler should start altering its request rate.</param>
/// <returns></returns>
public Func<long, int, IObservable<IObservable<T>>> PubSubSchedulerFactory<T>(Func<long, int, IEnumerable<T>> pubpull, Func<IEnumerable<T>, long> newHead, Func<IEnumerable<T>, bool> sourceCannotProgress)
{
return PubSubSchedulerFactory<T, IEnumerable<T>, long>(pubpull, newHead, sourceCannotProgress);
}
/// <summary>
/// Returns a method for instantiating a subscription based pull, ready to subscribe to.
/// Basic : interface specifying only the return type and assuming a position indicator of long.
/// </summary>
/// <typeparam name="TResult">The Return Type</typeparam>
/// <typeparam name="THead">The Type of the Head</typeparam>
/// <param name="pubpull">The method to return the next set of values</param>
/// <param name="newHead">A method to determine the next new head value</param>
/// <param name="sourceCannotProgress">Method used to determine if the scheduler should start altering its request rate.</param>
/// <returns></returns>
public Func<THead, int, IObservable<IObservable<TResult>>> PubSubSchedulerFactory<TResult, THead>(Func<THead, int, IEnumerable<TResult>> pubpull, Func<IEnumerable<TResult>, THead> newHead, Func<IEnumerable<TResult>, bool> sourceCannotProgress)
{
return PubSubSchedulerFactory<TResult, IEnumerable<TResult>, THead>(pubpull, newHead, sourceCannotProgress);
}
/// <summary>
/// Returns a method for instantiating a subscription based pull, ready to subscribe to.
/// Basic++ : same as basic but can specify distinct states involved.
/// </summary>
/// <typeparam name="TResult">The Event Container Return Type</typeparam>
/// <typeparam name="TResultGenerator">The type for returning the result of the return type TResult</typeparam>
/// <typeparam name="TPosition">The Type used to refer to position</typeparam>
/// <param name="pubpull">The method to return the next set of values</param>
/// <param name="newHead">A method to determine the next new head value</param>
/// <param name="sourceCannotProgress">Method used to determine if the scheduler should start altering its request rate.</param>
/// <returns></returns>
public Func<TPosition, int, IObservable<IObservable<TResult>>> PubSubSchedulerFactory<TResult, TResultGenerator, TPosition>(Func<TPosition, int, TResultGenerator> pubpull, Func<TResultGenerator, TPosition> newHead, Func<TResultGenerator, bool> sourceCannotProgress) where TResultGenerator : IEnumerable<TResult>
{
return PubSubAdvancedSchedulerFactory<TResult, TResultGenerator, TPosition>(newHead, sourceCannotProgress,
streamstate => // This is the actual job being used.
{
var enumerableValues = pubpull(streamstate.Of.Head, streamstate.Of.PageSize);
return new KeyValuePair<StreamState<SchedulerPageHandle<TPosition>, SchedulerPollerRate, TResult, TResultGenerator>, IObservable<TResult>>(
streamstate.Following(enumerableValues),
(enumerableValues as IEnumerable<TResult>).ToObservable(NewThreadScheduler.Default)); // the actual loop
});
}
/// <summary>
/// Returns a method for instantiating a subscription based pull, ready to subscribe to.
/// Advanced : providing access to the stateAction used for the transition.
/// </summary>
/// <typeparam name="TResult">The Event Container Return Type</typeparam>
/// <typeparam name="TResultGenerator">The type for returning the result of the return type TResult</typeparam>
/// <typeparam name="TPosition">The Type used to refer to position</typeparam>
/// <param name="newHead">A method to determine the next new head valu</param>
/// <param name="sourceCannotProgress">Method used to determine if the scheduler should start altering its request rate.</param>
/// <param name="stateAction">The method used in transitioning state.</param>
/// <returns></returns>
public Func<TPosition, int, IObservable<IObservable<TResult>>> PubSubAdvancedSchedulerFactory<TResult, TResultGenerator, TPosition>(Func<TResultGenerator, TPosition> newHead, Func<TResultGenerator, bool> sourceCannotProgress, StateAction<SchedulerPageHandle<TPosition>, SchedulerPollerRate, TResult, TResultGenerator> stateAction)
{
var SS = new StreamingService();
return (head, batchSize) => SS.StreamEnumerableWithSchedule<SchedulerPageHandle<TPosition>, SchedulerPollerRate, TResult, TResultGenerator>(
Scheduler.CurrentThread,
// Here is the initialization
() => new StreamState<SchedulerPageHandle<TPosition>, SchedulerPollerRate, TResult, TResultGenerator>(
new SchedulerPollerRate[] {
SchedulerPollerRate.ImmediateInstance,
SchedulerPollerRate.FastInstance,
SchedulerPollerRate.QuickInstance,
SchedulerPollerRate.SlowInstance,
SchedulerPollerRate.HybernateInstance
}.ToList(),
SchedulerPollerRate.ImmediateInstance,
new SchedulerPageHandle<TPosition>(head, batchSize, 0, false),
(value, state) =>
{
/// so what are the rules?
/// Well it's either going to go up or down
/// if the current one is less than the max carry on ( just increase the value )
/// else swap to the new one ( and reset the values )
// bool isEmpty = !value.Any();
bool isEmpty = sourceCannotProgress(value);
int comparitorValue = isEmpty
? state.CurrentPollRate.MinValue
: state.CurrentPollRate.MaxValue;
string potentialNextState = isEmpty
? state.CurrentPollRate.MinState
: state.CurrentPollRate.MaxState;
var nextCount = state.Of.LastWasEmpty == isEmpty // if it was the same as the last one
? state.Of.CurrentCount + 1
: 0;
var nextHead = isEmpty
? state.Of.Head
: newHead(value);
return state.Of.CurrentCount >= comparitorValue && isEmpty
? new StreamState<SchedulerPageHandle<TPosition>, SchedulerPollerRate, TResult, TResultGenerator>(
pollRateStates: state.PollRateStates,
currentPollRate: state.PollRateStates.Where(p => p.StateName == potentialNextState).FirstOrDefault(),
of: new SchedulerPageHandle<TPosition>(
head: nextHead,
pageSize: state.Of.PageSize,
currentCount: nextCount,
lastWasEmpty: isEmpty),
nextState: state.NextState)
: new StreamState<SchedulerPageHandle<TPosition>, SchedulerPollerRate, TResult, TResultGenerator>(
pollRateStates: state.PollRateStates,
currentPollRate: state.CurrentPollRate,
of: new SchedulerPageHandle<TPosition>(
head: nextHead,
pageSize: state.Of.PageSize,
currentCount: nextCount,
lastWasEmpty: isEmpty),
nextState: state.NextState);
}),
stateAction);
}
}
public IObservable<IObservable<TResult>> StreamEnumerableWithSchedule<TMember, TStateEnums, TResult, TResultSource>(
IScheduler scheduler,
Func<StreamState<TMember, TStateEnums, TResult, TResultSource>> initialisation,
StateAction<TMember, TStateEnums, TResult, TResultSource> stateAction) where TStateEnums : IStateEnums
{
return Observable.Create<IObservable<TResult>>(observer =>
{
var state = initialisation();
return scheduler.Schedule<StreamState<TMember, TStateEnums, TResult, TResultSource>>(state, state.CurrentPollRate.RateDuration, (sched, streamstate) =>
{
return sched.Schedule<StreamState<TMember, TStateEnums, TResult, TResultSource>>(streamstate, streamstate.CurrentPollRate.RateDuration, (sstate, action) =>
{
var stateresult = stateAction(sstate);
observer.OnNext(stateresult.Value);
var nextstate = stateresult.Key;
action(nextstate, nextstate.CurrentPollRate.RateDuration);
});
});
});
}
}
public class StreamState<TState, TStateEnums, TResult, TResultSource> where TStateEnums : IStateEnums
{
public readonly ReadOnlyCollection<TStateEnums> PollRateStates;
public readonly TStateEnums CurrentPollRate;
public readonly TState Of;
public readonly Func<TResultSource, StreamState<TState, TStateEnums, TResult, TResultSource>, StreamState<TState, TStateEnums, TResult, TResultSource>> NextState;
public StreamState(IList<TStateEnums> pollRateStates, TStateEnums currentPollRate, TState of, Func<TResultSource, StreamState<TState, TStateEnums, TResult, TResultSource>, StreamState<TState, TStateEnums, TResult, TResultSource>> nextState)
{
this.PollRateStates = new ReadOnlyCollection<TStateEnums>(pollRateStates);
this.CurrentPollRate = currentPollRate;
this.Of = of;
this.NextState = nextState;
}
public StreamState<TState, TStateEnums, TResult, TResultSource> Following(TResultSource result)
{
return NextState(result, this);
}
}
/// <summary>
/// This interface is to enable the different states to switch between.
/// The state machine envisaged here is to be a sequential set of ranges, rather than a true arbitary state machine.
/// The reason for this is that the states are designed to be moving between different rate.
/// </summary>
public interface IStateEnums
{
string StateName { get; }
int MaxValue { get; }
int MinValue { get; }
string MaxState { get; }
string MinState { get; }
TimeSpan RateDuration { get; }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment