Last active
December 16, 2020 05:02
-
-
Save patter001/6a04bdc3cd9290e5e5bd9d3a6f424031 to your computer and use it in GitHub Desktop.
Python method for synchronizing the startup of IBC
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, | |
# INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A | |
# PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT | |
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF | |
# CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE | |
# OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | |
""" | |
To use, import in to your file, and the main thing you want is run the *safe_launch_ibc* function | |
pass in your path to your start script (code assumes LOG directory is in the same directory), pass | |
in "ibg" or "tws" based on IB gateway or TWS, and True/False as to whether you are starting | |
a paper account. | |
safe_launch_ibc | |
**Be sure to call disconnect your client before running this. If you client stays connected it can chew up CPU | |
cycles and cause slow down on small VMs | |
""" | |
import psutil | |
import multiprocessing | |
import subprocess | |
import io | |
import re | |
import os | |
import sys | |
import time | |
import datetime | |
from socketserver import ThreadingMixIn | |
from xmlrpc.server import SimpleXMLRPCServer | |
import xmlrpc.client | |
# need to do this because default server seems to block and handle | |
# only one request at a time | |
class ThreadedServer(ThreadingMixIn, SimpleXMLRPCServer): | |
pass | |
# time we wait for IBC to have finished launching | |
# this should be nearly instantanously typically though | |
_IBC_LAUNCH_TIMEOUT=90 | |
# time we will wait while looking at IBC log files to see if the gateway is ready for us to connect | |
_IBG_LAUNCH_TIMEOUT=90 | |
# i admit, this might/probably just needs to be be a threaded lock... | |
_restart_lock = multiprocessing.Lock() | |
# holds a running server instance | |
_ibc_manager = None | |
def is_ib_gateway_running(): | |
## keep in mind, ibcontroller launches gateway, | |
## this returns true if gateway is running, but if you kill this proc | |
## it also kills controller | |
for proc in psutil.process_iter(): | |
try: | |
if proc.name() == "java.exe": | |
for arg in proc.cmdline(): | |
if arg.count("ibgateway"): | |
return proc | |
except: | |
# need better try/except, but sometimes a permissions error occurs getting the name or cmdline | |
raise | |
# proc not found... | |
return None | |
def is_ib_controller_running(): | |
for proc in psutil.process_iter(): | |
try: | |
if proc.name() == "java.exe": | |
try: | |
for arg in proc.cmdline(): | |
if arg.count("IBC"): | |
return proc | |
except psutil._exceptions.AccessDenied: | |
pass | |
except: | |
# need better try/except, but sometimes a permissions error occurs getting the name or cmdline | |
pass | |
# proc not found... | |
return None | |
# these should never be called directly | |
# but they need to be in the global space due to python pickling when launching a subprocess | |
def _start_ibc(*args): | |
# see if we acquire the lock, if we do, it is safe to restart | |
#print("Checking lock...") | |
owner = _restart_lock.acquire(False) | |
#print("Lock owner: %s"%owner) | |
# if we are not the owner, someone else has started a restart | |
# wait till we are the owner -- which means they are done | |
# and then return | |
#print("Acquired lock for restart") | |
if not owner: | |
# block this time | |
#print("Not the owner, waiting for start/restart to finish") | |
_restart_lock.acquire() | |
_restart_lock.release() | |
return | |
# we own the lock, no contentions.... | |
ibc_proc = is_ib_controller_running() | |
if ibc_proc is not None: | |
parent = ibc_proc.parent() | |
if parent is not None: | |
parent.kill() | |
ibc_proc.kill() | |
time.sleep(2) | |
try: | |
launch_ibc(*args) # what to do on failure??? | |
finally: | |
_restart_lock.release() | |
# use to confirm connection to rpc server | |
def _check_connection(): | |
return True | |
# has to be in global space due to pickling constratints... | |
def _run_ibc_manager(): | |
manager = ThreadedServer(("127.0.0.1",2244), | |
allow_none=True, | |
) | |
manager.register_function(_start_ibc, "start_ibc") | |
manager.register_function(_check_connection, "check_connection") | |
print("Starting IBC Managing server") | |
manager.serve_forever() | |
def get_ibc_manager(): | |
global _ibc_manager | |
if _ibc_manager is not None: | |
return _ibc_manager | |
try: | |
#print("Connecting to xmlrpc client.") | |
_ibc_manager = xmlrpc.client.ServerProxy("http://127.0.0.1:2244") | |
# run this to check connection | |
_ibc_manager.check_connection() | |
except ConnectionRefusedError: | |
# if connection refused, server is not started...start it | |
# save a reference since we own this and need to kill it on exit | |
#print("Connecting failed starting manager...") | |
proc = multiprocessing.Process(target=_run_ibc_manager) | |
proc.daemon = True | |
proc.start() | |
time.sleep(1) # not sure if this is really needed | |
# not try to connect again, should be there this time | |
_ibc_manager = xmlrpc.client.ServerProxy("http://127.0.0.1:2244") | |
return _ibc_manager | |
def safe_launch_ibc(*args): | |
"""launches IBC with multi-process manager""" | |
mgr = get_ibc_manager() | |
mgr.start_ibc(*args) | |
def launch_ibc(start_ibc_script, tws_software, paper_account): | |
""" | |
Args: | |
start_ibc_script - script to use to launch the gateway | |
tws_software - whether it is "tws" or "ibg" | |
paper_account - true/false whether account is a paper account | |
""" | |
# remove old log files -- this assume someone else already kill any running ibc | |
logdir = os.path.join(os.path.dirname(start_ibc_script),"LOGS") | |
for fname in list(os.listdir(logdir)): | |
if fname != "README.txt": | |
print(os.path.join(logdir,fname)) | |
os.remove(os.path.join(logdir,fname)) | |
# launch ibc and return when it is ready... | |
# if we have a script to start ibgateway, then use it here... | |
launch_time = datetime.datetime.now() | |
print("Starting IB Controller...") | |
_ib_proc = subprocess.Popen(start_ibc_script, shell=True) | |
# give it a second to start | |
ibc = None | |
starttime = datetime.datetime.now() | |
# wait for logfiles to get oupen | |
log_files = [] | |
found = 0 | |
while not found: | |
# uses same timeout as waiting for IBC to start.... | |
if (datetime.datetime.now() - starttime)> datetime.timedelta(_IBC_LAUNCH_TIMEOUT): | |
raise RuntimeError("ERROR starting up IBC") | |
for fname in list(os.listdir(logdir)): | |
if fname != "README.txt": | |
log_filepath = os.path.join(logdir,fname) | |
found = 1 | |
break | |
else: | |
# yield processor for those low cost VMs | |
time.sleep(5) | |
found = False | |
if tws_software=="tws": | |
required_lines = [ | |
re.compile(".*Interactive Brokers.*event=Activated") | |
] | |
if tws_software == "ibg": | |
required_lines = [ | |
re.compile(".*Starting application.*event=Closed") | |
] | |
if paper_account: | |
required_lines.append(re.compile("Warning.*event=Closed")) | |
with io.open(log_filepath) as myfile: | |
while found < len(required_lines): | |
if datetime.datetime.now()-launch_time > datetime.timedelta(seconds=_IBG_LAUNCH_TIMEOUT): | |
raise RuntimeError("Error starting IBC, timeout waiting for IBC to complete the launch of the gateway") | |
line = myfile.readline() | |
# waiting for more data... | |
if line == "": | |
time.sleep(5) | |
sys.stdout.write('.') | |
sys.stdout.flush() | |
continue | |
for i, rl in enumerate(required_lines): | |
if rl.search(line): | |
#print("Found %d - %s"%(i,line)) | |
found += 1 | |
break | |
# and now we are ready to connect... |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment