Created
March 11, 2015 15:39
-
-
Save bboyle1234/11b7f0f47e76e070d969 to your computer and use it in GitHub Desktop.
TickDataContext
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using ApexSolid; | |
using ApexSolid.Data; | |
using ApexInvesting.Platform; | |
using NinjaTrader.Data; | |
using NinjaTrader.Indicator; | |
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Threading; | |
using NinjaTrader.Cbi; | |
using ApexInvesting.Platform.GlobalModels.Market; | |
namespace ApexTools.DataProvider { | |
/// <summary> | |
/// Create this object to get a source of tick data for asynchronous processing | |
/// </summary> | |
public class TickDataContext : IDataContext { | |
/// <summary> | |
/// Fires when a dataseries has been obtained, just before processing begins. | |
/// Note that at the time this event fires, the first ticks have not yet been added | |
/// to the processing queue. | |
/// </summary> | |
public event Action<IDataContext> Ready; | |
/// <summary> | |
/// Fires when a error puts this object into an unusable state | |
/// </summary> | |
public event Action<IDataContext> Error; | |
/// <summary> | |
/// The instrument this object supplies tick data for | |
/// </summary> | |
readonly Instrument Instrument; | |
/// <summary> | |
/// The thread that pumps ticks out of the NinjaTrader bars series into the TickDataQueue | |
/// </summary> | |
readonly Thread TickPumpThread; | |
/// <summary> | |
/// The SolidTickDataQueue used to provide asynchronous access to ninjatrader tick data | |
/// </summary> | |
readonly SolidTickDataQueue TickQueue = new SolidTickDataQueue(); | |
/// <summary> | |
/// A reference to the ninjatrader bar series. The tick pump reads ticks from this. | |
/// </summary> | |
Bars bars; | |
/// <summary> | |
/// Stores the tick processing progress. This variable holds the index of the NinjaTrader 1-tick | |
/// bar that was last read. | |
/// </summary> | |
int indexOfLastTickRead = -1; | |
public DataContextStates State { | |
get; | |
private set; | |
} | |
public TimeSpan TimeSinceLastUpdate { | |
get { return DateTime.Now.Subtract(LastUpdateAtLocal); } | |
} | |
public DateTime LastUpdateAtLocal { | |
get; | |
private set; | |
} | |
public DateTime LastTickAtLocal { | |
get; | |
private set; | |
} | |
public double ProcessingCompletionRatio { | |
get { | |
if (null != LastError) | |
return 0; | |
if (null == bars || bars.Count <= 1 || indexOfLastTickRead < 0) | |
return 0; | |
return indexOfLastTickRead / bars.Count - 1; | |
} | |
} | |
public int NumUnprocessedTicks { | |
get { | |
if (null != LastError) | |
return 0; | |
if (null == bars || bars.Count == 0) | |
return 0; | |
return bars.Count - 1 - indexOfLastTickRead; | |
} | |
} | |
public Exception LastError { | |
get; | |
private set; | |
} | |
public bool IsDisposed { | |
get; | |
private set; | |
} | |
public TickDataContext(Instrument instrument) { | |
Instrument = instrument; | |
State = DataContextStates.Loading; | |
TickPumpThread = new Thread(RunTickPump); | |
TickPumpThread.IsBackground = true; | |
TickPumpThread.Start(); | |
} | |
/// <summary> | |
/// Main method for the tick pumping thread. | |
/// This method loads the ninjatrader bars and pumps them into a tick pool. | |
/// </summary> | |
void RunTickPump() { | |
try { | |
GetBars(); | |
Ready.TryInvoke(this); | |
PumpTicks(); | |
} catch (ThreadAbortException x) { // don't do OnError if the thread is simply aborting due to Disposal | |
} catch (Exception x) { | |
OnError(x); | |
} | |
} | |
void GetBars() { | |
// actually get the bars ... this can take a while | |
bars = TickDataProvider.GetBars(Instrument); | |
// if an exception has not been thrown, we were succesful getting the bars | |
// ... execution can continue | |
// since we have the bars, we need to create a hook so that | |
// this object goes into error state if the bars are invalidated | |
// by a reload event | |
TickDataProvider.BarsReload += TickDataProvider_BarsReload; | |
} | |
void PumpTicks() { | |
while (!IsDisposed) { | |
// get the index of the first ninjatrader tick bar containing an unprocessed tick | |
var indexStart = indexOfLastTickRead + 1; | |
// get the index of the last ninjatrader tick bar containing an unprocessed tick | |
var indexEnd = bars.Count - 1; | |
// only proceed if there are unprocessed ticks | |
if (indexEnd >= indexStart) { | |
// create the index variable that will be incremented with each tick we process | |
var index = indexStart; | |
// create storage for the ticks we process | |
var newTicks = new List<SolidTick>(indexEnd - indexStart + 1); | |
// now extract all the ticks we can | |
for (; index <= indexEnd; index++) { | |
// sometimes the ninjatrader bars has storage added (increasing the bars.Count property) before the bar is | |
// actually ready to be processed. If we try to read the new bar while it's in this state, NinjaTrader | |
// will throw an exception. If this happens, we need to understand that NinjaTrader needs a little more time | |
// to get the new bar into a state where it can be read. Therefore we need to stop trying to read right up | |
// to bars.Count-1 | |
try { | |
// get data out of the bar object and store it in a SolidTick object | |
newTicks.Add(new SolidTick { Price = bars.GetClose(index), Volume = bars.GetVolume(index), TimestampLocal = bars.GetTime(index) }); | |
} catch { | |
// NinjaTrader threw an exception indicating that the bar is not yet ready for us to read it | |
break; | |
} | |
} | |
// because ninjatrader may have thrown that exception, there may be no ticks to process | |
// therefore we check first | |
if (newTicks.Count > 0) { | |
// push the new ticks into the SolidTickQueue so they can be asynchronously processed | |
TickQueue.PushTicks(newTicks); | |
// set the index of the last bar read ... NOTE this is not always equal to bars.Count - 1 due | |
// to the ninjatrader exception. That's why we had to store the "index" variable outside of the | |
// for loop above | |
indexOfLastTickRead = index - 1; | |
// set some other status properties | |
LastUpdateAtLocal = DateTime.Now; | |
LastTickAtLocal = newTicks[newTicks.Count - 1].TimestampLocal; | |
} | |
} | |
// rest and give more ticks time to arrive | |
Thread.Sleep(50); | |
} | |
} | |
/// <summary> | |
/// Called when the bars being processed are invalidated by a bars reloading event. | |
/// Causes this object to go into an error state | |
/// </summary> | |
void TickDataProvider_BarsReload(string instrumentId) { | |
if (instrumentId == Instrument.Id) { | |
OnError(new BarsWereReloadedException()); | |
} | |
} | |
/// <summary> | |
/// Called when some sort of error causes this object to become unusable | |
/// </summary> | |
void OnError(Exception x) { | |
// set some status properties | |
LastError = x; | |
State = DataContextStates.Error; | |
// hopefully whoever listens to this event won't take forever to finish | |
Error.TryInvoke(this); | |
// finish up | |
Dispose(); | |
} | |
public void Dispose() { | |
if (!IsDisposed) { | |
IsDisposed = true; | |
TickDataProvider.BarsReload -= TickDataProvider_BarsReload; | |
TickPumpThread.Abort(); | |
} | |
} | |
#region class TickDataProvider | |
/// <summary> | |
/// A central provider for 1-tick dataseries, opening just one dataseries per instrument to keep NinjaTrader | |
/// running efficiently and providing instant access to any dataseries that has already been loaded once. | |
/// </summary> | |
/// <remarks> | |
/// Designed to be accessed only by the TickDataContext objects. | |
/// Public methods are not to be accessed by any other code | |
/// </remarks> | |
static class TickDataProvider { | |
/// <summary> | |
/// Notifies all TickDataContext objects that a bars reload event has occurred. | |
/// The TickDataContext objects will need to dispose themselves and register their "Error" state | |
/// </summary> | |
public static event Action<string> BarsReload; | |
/// <summary> | |
/// The start time of all dataseries to be loaded | |
/// </summary> | |
static readonly DateTime from = DateTime.Now.Date.AddDays(-120); | |
/// <summary> | |
/// The to time of all dataseries to be loaded | |
/// </summary> | |
static readonly DateTime to = DateTime.Now.Date.AddDays(10); | |
/// <summary> | |
/// The period of all dataseries to be loaded | |
/// </summary> | |
static readonly Period period = new Period(PeriodType.Tick, 1, MarketDataType.Last); | |
/// <summary> | |
/// The session of all dataseries to be loaded | |
/// </summary> | |
static readonly Session session = Session.String2Session("Default 24/7"); | |
/// <summary> | |
/// Synchronization locks for accessing dataseries entries | |
/// </summary> | |
static readonly Dictionary<string, object> entryLocks = new Dictionary<string, object>(); | |
/// <summary> | |
/// Synchronization locks for loading dataseries entries | |
/// </summary> | |
static readonly Dictionary<string, object> entryLoadingLocks = new Dictionary<string, object>(); | |
/// <summary> | |
/// All the dataseries entries | |
/// </summary> | |
static readonly Dictionary<string, TickDataEntry> entries = new Dictionary<string, TickDataEntry>(); | |
/// <summary> | |
/// Stores the bar reloads that have occured recently | |
/// </summary> | |
static readonly Dictionary<string, bool> recentBarReloads = new Dictionary<string, bool>(); | |
/// <summary> | |
/// Gets an entry synchronization lock object for the given instrument id. | |
/// Creates the lock object if it is not already stored in the EntryLocks dictionary. | |
/// </summary> | |
static object GetEntryLock(string instrumentId) { | |
// a bit of syntactical magic using IDictionary extension methods to create the object | |
// if it doesn't already exist in the dictionary | |
return entryLocks.GetWithConstructor(instrumentId, (key) => new object()); | |
} | |
/// <summary> | |
/// Gets an entry loading synchronization lock object for the given instrument id. | |
/// Creates the lock object if it is not already stored in the EntryLoadingLocks dictionary. | |
/// </summary> | |
static object GetEntryLoadingLock(string instrumentId) { | |
// a bit of syntactical magic using IDictionary extension methods to create the object | |
// if it doesn't already exist in the dictionary | |
return entryLoadingLocks.GetWithConstructor(instrumentId, (key) => new object()); | |
} | |
/// <summary> | |
/// Sets a flag that indicates whether a bar reload has recently occured for the given instrument | |
/// </summary> | |
static void SetBarReloadFlag(string instrumentId) { | |
recentBarReloads[instrumentId] = true; | |
} | |
/// <summary> | |
/// Clears the flag that indicates whether a bar reload has recently occured for the given instrument | |
/// </summary> | |
static void ClearBarReloadFlag(string instrumentId) { | |
recentBarReloads.Remove(instrumentId); | |
} | |
/// <summary> | |
/// Checks the flag that indicates whether a bar reload has recently occured for the given instrument | |
/// </summary> | |
static bool IsBarReloadFlagSet(string instrumentId) { | |
return recentBarReloads.ContainsKey(instrumentId); | |
} | |
static TickDataProvider() { | |
// subscribe to BarsReload event so that we can | |
// invalidate all the dataseries that are no longer attached to live market data | |
Bars.BarsReload += Bars_BarsReload; | |
} | |
/// <summary> | |
/// Responds to an event indicating that the dataseries has been detached from live market data. | |
/// Disposes the dataseries and removes its entry. | |
/// Notifies TickDataContext objects that their data is invalidated. | |
/// </summary> | |
static void Bars_BarsReload(object sender, BarsReloadEventArgs e) { | |
// start by recording the fact that this instrument has been reloaded. | |
// we need to do this because if bars are currently being loaded, for this instrument, | |
// we need to dispose those bars and reload again. | |
SetBarReloadFlag(e.Instrument.Id); | |
// wait for access to the entry object - this should be very fast, maximum 100ns | |
lock (GetEntryLock(e.Instrument.Id)) { | |
TickDataEntry entry; | |
if (entries.TryGetValue(e.Instrument.Id, out entry)) { | |
// remove the entry from our store | |
entries.Remove(e.Instrument.Id); | |
// dispose the dataseries so ninjatrader will no longer maintain it | |
entry.Bars.Dispose(); | |
} | |
} | |
// notify all TickDataContext objects that their dataseries has just been invalidated | |
BarsReload.TryInvoke(e.Instrument.Id); | |
} | |
/// <summary> | |
/// Gets a 1-tick dataseries for the requested instrument. | |
/// Returns an existing dataseries if it exists. Otherwise loads a new dataseries. | |
/// If a new dataseries must be loaded, the calling thread will be blocked for some time (perhaps up to a minute) | |
/// while NinjaTrader loads the data. | |
/// </summary> | |
/// <remarks> | |
/// Notice how this function takes care not to lock on the EntryLock for more than a few nanoseconds. | |
/// This is so that we never block NinjaTrader when it raises the Bar.BarsReload event, which also | |
/// waits on the EntryLock. | |
/// The desire to not block NinjaTrader from raising events while we were loading bars | |
/// is the reason for the use of the EntryLoadingLocks. | |
/// This method is ONLY for use by the TickDataContext object ... if you're using it | |
/// in your code, you're probably breaking something. | |
/// </remarks> | |
public static Bars GetBars(Instrument instrument) { | |
TickDataEntry entry; | |
// wait for access to the entry object - this should be very fast, maximum 100ns | |
lock (GetEntryLock(instrument.Id)) { | |
// if the entry exists, return the existing bar series. | |
if (entries.TryGetValue(instrument.Id, out entry)) { | |
return entry.Bars; | |
} | |
} | |
// entry does not exist - we have to load it. | |
// make sure the loading is only done once by preventing simultaneous access | |
lock (GetEntryLoadingLock(instrument.Id)) { | |
// if we had to wait on another thread before entering, it would have been | |
// because the other thread was loading a dataseries. Let's check once more | |
// to see if the entry has already been created by the thread we were waiting on | |
lock (GetEntryLock(instrument.Id)) { | |
// if the entry exists, return the existing bar series. | |
if (entries.TryGetValue(instrument.Id, out entry)) { | |
return entry.Bars; | |
} | |
} | |
ClearBarReloadFlag(instrument.Id); | |
// this will take a while - it's going to block the calling thread | |
var bars = Bars.GetBars(instrument, period, from, to, session, true, true); | |
// check if NinjaTrader has raised the Bars.BarsLoad event while the bars were loading | |
// in the line of code above. | |
if (IsBarReloadFlagSet(instrument.Id)) { | |
// get rid of these bars as we can't use them | |
// I'm actually not sure what happens in the Bars.GetBars function in this situation | |
// Perhaps it returns null, perhaps it throws an exception ... testing could provide | |
// more information | |
if (null != bars) | |
bars.Dispose(); | |
// throw an exception that will result in the calling TickDataContext object having an Error state | |
throw new BarsWereReloadedDuringLoadException(); | |
} | |
// create an entry object | |
entry = new TickDataEntry { | |
InstrumentId = instrument.Id, | |
Bars = bars, | |
}; | |
// now add the entry to the entry store and return the bars | |
lock (GetEntryLock(instrument.Id)) { | |
entries[instrument.Id] = entry; | |
return entry.Bars; | |
} | |
} | |
} | |
} | |
#endregion | |
#region class TickDataEntry | |
/// <summary> | |
/// Contains all related data for a given instance of 1-tick dataseries stored in TickDataProvider | |
/// </summary> | |
class TickDataEntry { | |
public string InstrumentId; | |
public Bars Bars; | |
} | |
#endregion | |
#region class BarsWereReloadedDuringLoadException | |
/// <summary> | |
/// Exception indicates that NinjaTrader raised the Bars.BarsReload event while the | |
/// TickDataContext was loading bars. | |
/// </summary> | |
class BarsWereReloadedDuringLoadException : Exception { | |
public BarsWereReloadedDuringLoadException() : base("NinjaTrader raised a BarLoad event while these bars were being loaded.") { } | |
} | |
#endregion | |
#region class BarsWereReloadedException | |
/// <summary> | |
/// Exception indicatores that NinjaTrader rasied the Bars.BarsReload event while the | |
/// TickDataContext was operating normally. | |
/// </summary> | |
class BarsWereReloadedException : Exception { | |
public BarsWereReloadedException() : base("Bars were reloaded.") { } | |
} | |
#endregion | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment