Skip to content

Instantly share code, notes, and snippets.

@elken
Created May 24, 2017 18:32
Show Gist options
  • Save elken/b70c1511d90a474035b16409f4476e64 to your computer and use it in GitHub Desktop.
Save elken/b70c1511d90a474035b16409f4476e64 to your computer and use it in GitHub Desktop.
Line 338 to 343 shows implementation, 287 256 and 225 for the individual functions
package daytrader.api;
import com.ib.client.Contract;
import com.ib.client.Types;
import com.ib.controller.ApiConnection;
import com.ib.controller.Bar;
import com.ib.controller.Formats;
import daytrader.EntityHandler;
import daytrader.api.exceptions.TWSException;
import daytrader.api.exceptions.login.AlreadyLoggedInException;
import daytrader.api.exceptions.login.ApiDisabledException;
import daytrader.api.exceptions.login.NotLoggedInException;
import daytrader.api.exceptions.request.BadContractException;
import daytrader.api.exceptions.request.MissingPointsException;
import daytrader.api.exceptions.request.NoPointsException;
import daytrader.cacheTypes.Request;
import daytrader.cacheTypes.RequestCache;
import daytrader.datamodel.*;
import daytrader.hibernate.SessionHelper;
import daytradertasks.HistoricDataTask;
import javafx.beans.property.SimpleBooleanProperty;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.hibernate.SessionFactory;
import java.time.Duration;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by elken on 27/06/2016.
*/
public class DaytraderController implements com.ib.controller.ApiController.IConnectionHandler {
/**
* The Instance.
*/
static DaytraderController INSTANCE;
private final Logger logger = LogManager.getLogger();
private final boolean debug = true;
private final boolean showIn = false;
private final boolean showOut = false;
private final ApiConnection.ILogger inLogger = s -> {
if (showIn) System.out.println("In: " + s);
};
private final ApiConnection.ILogger outLogger = s -> {
if (showOut) System.out.println("Out: " + s);
};
private final Object connectLock = new Object();
private final Object dataReady = new Object();
private final Object timerLock = new Object();
private boolean isConnected = false;
private com.ib.controller.ApiController apiController;
private int count;
private int lastMessage = 0;
private List<String> accountList = new ArrayList<>();
private LruQueue lruQueue;
private ReentrantLock lock;
private RequestCache requestCache;
private static List<Request> requestList;
private Contract createContract(Putup putup) {
Contract contract = new Contract();
contract.symbol(putup.getTickerCode());
contract.exchange(putup.getMarket().toString());
contract.secType("STK");
contract.currency("USD");
contract.primaryExch(putup.getMarket().toString());
contract.localSymbol(putup.getTickerCode());
return contract;
}
/**
* Instantiates a new Daytrader controller.
*/
public DaytraderController() {
SessionFactory sessionHelper = new SessionHelper().getSessionFactory();
lock = new ReentrantLock();
requestCache = new RequestCache().load();
requestList = requestCache.getRequests();
lruQueue = requestCache.getLruQueue();
count = requestCache.getCount();
}
/**
* Connect.
*
* @throws TWSException the tws exception
*/
public void connect() throws TWSException {
if (!isConnected) {
try {
controller().connect("127.0.0.1", 7496, 0, null);
} catch (Exception ex) {
logger.error(ex);
}
synchronized (connectLock) {
try {
connectLock.wait(1000);
} catch (InterruptedException e) {
logger.error(e);
}
}
if (lastMessage == 502) {
throw new ApiDisabledException();
} else if (lastMessage == 501 || lastMessage == 507) {
throw new AlreadyLoggedInException();
}
}
}
private com.ib.controller.ApiController controller() {
if (apiController == null) {
apiController = new com.ib.controller.ApiController(this, inLogger, outLogger);
}
return apiController;
}
/**
* Gets instance.
*
* @return the instance
*/
public static synchronized DaytraderController getInstance() {
if (INSTANCE == null) {
INSTANCE = new DaytraderController();
}
return INSTANCE;
}
/**
* TWS method invoked when an API client connects
*/
@Override
public void connected() {
controller().reqCurrentTime(l -> show("Server date/time is: " + Formats.fmtDate(l * 1000)));
show("Connected");
isConnected = true;
// Dirty trick because IB are bad
synchronized (connectLock) {
connectLock.notify();
}
}
/**
* TWS method invoked when an API client disconnects
*/
@Override
public void disconnected() {
show("Disconnected");
isConnected = false;
}
/**
* TWS method invoked when an account list is returned
* @param arrayList The account list to set
*/
@Override
public void accountList(ArrayList<String> arrayList) {
show("Got account list");
accountList.clear();
accountList.addAll(arrayList);
}
/**
* TWS method called when an error is encountered
* @param e The error
*/
@Override
public void error(Exception e) {
logger.error(e);
}
/**
* TWS method called when any message needs to be printed
* @param id The ID of the message
* @param messageCode The message code (https://interactivebrokers.github.io/tws-api/message_codes.html)
* @param message The text of the message
*/
@Override
public void message(int id, int messageCode, String message) {
if (debug) {
if (lastMessage == 2105 && messageCode == 2106) {
// TODO: Add a cache entry for this time
show("Restarted historic data limit");
synchronized (timerLock) {
timerLock.notify();
}
count = 0;
requestList.clear();
}
}
lastMessage = messageCode;
if (messageCode == 162) {
synchronized (dataReady) {
dataReady.notify();
}
}
show(String.format("\"%s\" %s(%s)", message, id > 0 ? "" : String.format("for %s ", id), messageCode));
}
/**
* TWS method invoke when something happens (not exactly sure of the difference between this and message)
* @see #message(int, int, String)
* @param s The string
*/
@Override
public void show(String s) {
logger.info(s);
}
/**
* Check that the request count is less than 60.
*
* @return the boolean
*/
public boolean checkIsInLimit() {
boolean hasWaited = false;
if (count >= 59) {
try {
long duration = 600000 - Duration.between(requestList.get(0).getTimeRequested(), LocalTime.now()).toMillis();
if (duration > 0 && duration <= 600000) {
synchronized (timerLock) {
logger.error(String.format("60 requests have been tried recently, retry in %d minutes.", TimeUnit.MILLISECONDS.toMinutes(duration)));
timerLock.wait(duration);
}
}
hasWaited = true;
} catch (InterruptedException e) {
logger.error(e);
}
count = 0;
requestList.clear();
} else {
count++;
}
return hasWaited;
}
/**
* Check that the current request hasn't recently been requested
* TODO: Fix an issue where this triggers too often
*
* @param request the request
* @return the boolean
*/
public boolean checkLruQueue(Request request) {
boolean hasWaited = false;
if (!lruQueue.containsValues(request) || lruQueue.size() == 0) {
lruQueue.push(request);
}
if (lruQueue.size() == 5) {
try {
long duration = 2000 - Duration.between(lruQueue.get(0).getTimeRequested(), LocalTime.now()).toMillis();
if (duration > 0 && duration <= 2000) {
synchronized (timerLock) {
logger.error(String.format("Recently tried 5 or more of this contract and exchange, waiting %d seconds.", TimeUnit.MILLISECONDS.toSeconds(duration)));
timerLock.wait(duration);
}
}
hasWaited = true;
lruQueue.clear();
} catch (InterruptedException e) {
logger.error(e);
}
}
return hasWaited;
}
/**
* Check that the current request isn't on the last 60 request list.
* TODO: Fix an issue where this triggers too often
*
* @param request the request
* @return the boolean
*/
public boolean checkRequestList(Request request) {
boolean hasWaited = false;
if (requestList.contains(request)) {
try {
long duration = Duration.between(LocalTime.now(), requestList.get(0).getTimeRequested().plusSeconds(15)).toMillis();
if (duration > 0 && duration <= 15000) {
synchronized (timerLock) {
logger.error(String.format("Recently tried this request, waiting %d seconds.", TimeUnit.MILLISECONDS.toSeconds(duration)));
timerLock.wait(duration);
}
}
hasWaited = true;
requestList.clear();
} catch (InterruptedException e) {
logger.error(e);
}
} else {
requestList.add(request);
}
return hasWaited;
}
/**
* Attempt the historic request as safely as possible (for now).
* TODO: Add adjustable retry count
* TODO: Cleanup
*
* @param putup the putup
* @param historicDataTask the historic data task
* @param length the length
* @param useCache flag to check if cache should be used (mostly a debugging tool)
* @return the bar point graph
* @throws TWSException the tws exception
*/
public BarPointGraph run(Putup putup, HistoricDataTask historicDataTask, int length, boolean useCache) throws TWSException {
if (!isConnected) {
throw new NotLoggedInException();
}
BarPointGraph points = null;
Contract contract = createContract(putup);
Request request = new Request(contract, historicDataTask);
EntityHandler entityHandler = new EntityHandler();
if (useCache) {
BarPointGraph pointGraph = entityHandler.getPointsByRequest(request);
if (pointGraph != null && pointGraph.size() > 0) {
return pointGraph;
}
}
lock.lock();
while (true) {
if (!checkRequestList(request)) {
if (!checkLruQueue(request)) {
checkIsInLimit();
}
}
ExecutorService service = Executors.newFixedThreadPool(1);
Future<BarPointGraph> future = service.submit(() -> {
SimpleBooleanProperty isDataReady = new SimpleBooleanProperty(false);
String endTime = historicDataTask.getEndDateTime().format(DateTimeFormatter.ofPattern("yyyyMMdd HH:mm:ss"));
BarHandler handler = new BarHandler(putup, isDataReady);
isDataReady.addListener(((observable, oldValue, newValue) -> {
synchronized (dataReady) {
dataReady.notify();
}
}));
controller().reqHistoricalData(contract, endTime + " EST", length, historicDataTask.getDurationToUse(), historicDataTask.getBarSize(), Types.WhatToShow.TRADES, true, handler);
synchronized (dataReady) {
dataReady.wait(2000);
}
return handler.getBars();
});
try {
points = future.get();
service.shutdownNow();
if (lastMessage == 200) {
throw new BadContractException();
}
if (points.size() == 0) {
throw new NoPointsException();
}
Integer expectedLength = length / Integer.valueOf(historicDataTask.getBarSize().toString().substring(0, 1));
if (points.first().getTime().compareTo(LocalTime.of(9, 30, 5)) == 0) {
expectedLength--;
}
if (points.size() == expectedLength) {
logger.info(String.format("Got %s/%s points.%n", points.size(), expectedLength));
break;
} else {
throw new MissingPointsException(points.size(), expectedLength);
}
} catch (BadContractException e) {
logger.error(e);
break;
} catch (Exception e) {
logger.error(e);
}
}
lock.unlock();
request.setPoints(points);
requestCache.setCount(count);
requestCache.setLruQueue(lruQueue);
requestCache.setRequests(requestList);
requestCache.save();
if (useCache) entityHandler.insertRequest(request);
return points;
}
/**
* Attempt the realtime request (not very safe yet)
*
* @param putup the putup
* @return the bar point graph
* @throws TWSException the tws exception
*/
public BarPointGraph run(Putup putup) throws TWSException {
if (!isConnected) {
throw new NotLoggedInException();
}
BarHandler handler = new BarHandler(putup);
Contract contract = createContract(putup);
controller().reqRealTimeBars(contract, Types.WhatToShow.TRADES, true, handler);
lock.lock();
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException ex) {
logger.error(ex);
} finally {
lock.unlock();
}
return handler.getBars();
}
/**
* Wrapper for the various bar handler interfaces
*/
public class BarHandler implements com.ib.controller.ApiController.IHistoricalDataHandler, com.ib.controller.ApiController.IRealTimeBarHandler {
/**
* Gets bars.
*
* @return the bars
*/
public BarPointGraph getBars() {
return bars;
}
/**
* The Bars.
*/
BarPointGraph bars;
/**
* The Putup.
*/
Putup putup;
/**
* Dirty trick to handle the bars reactively
*/
SimpleBooleanProperty isDataReady;
/**
* Instantiates a new Bar handler.
*
* @param putup the putup
*/
public BarHandler(Putup putup) {
this.putup = putup;
this.bars = new BarPointGraph();
}
/**
* Instantiates a new Bar handler.
*
* @param putup the putup
* @param isDataReady the is data ready
*/
public BarHandler(Putup putup, SimpleBooleanProperty isDataReady) {
this(putup);
this.isDataReady = isDataReady;
}
/**
* TWS method called when a historic bar is returned
* @param bar The bar
* @param b Not sure tbh
*/
@Override
public void historicalData(Bar bar, boolean b) {
bars.add(new BarPoint(bar));
}
/**
* TWS method called when all the bars have been returned
*/
@Override
public void historicalDataEnd() {
isDataReady.setValue(Boolean.TRUE);
}
/**
* TWS method called when a realtime bar is returned
* @param bar The bar
*/
@Override
public void realtimeBar(Bar bar) {
show(bar.toString() + " " + putup.getTickerCode());
GlobalPoints.getInstance().getGraphByTicker(putup.getTickerCode()).add(new BarPoint(bar));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment