Skip to content

Instantly share code, notes, and snippets.

@gdbassett
Last active August 24, 2024 17:53
Show Gist options
  • Save gdbassett/5ee80a57c4eb80d9c5a481cbdb819a2c to your computer and use it in GitHub Desktop.
Save gdbassett/5ee80a57c4eb80d9c5a481cbdb819a2c to your computer and use it in GitHub Desktop.
#!/usr/bin/python
############################################################################
# #
# Copyright (c)2008, 2009, Digi International (Digi). All Rights Reserved. #
# #
# Permission to use, copy, modify, and distribute this software and its #
# documentation, without fee and without a signed licensing agreement, is #
# hereby granted, provided that the software is used on Digi products only #
# and that the software contain this copyright notice, and the following #
# two paragraphs appear in all copies, modifications, and distributions as #
# well. Contact Product Management, Digi International, Inc., 11001 Bren #
# Road East, Minnetonka, MN, +1 952-912-3444, for commercial licensing #
# opportunities for non-Digi products. #
# #
# DIGI SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT LIMITED #
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A #
# PARTICULAR PURPOSE. THE SOFTWARE AND ACCOMPANYING DOCUMENTATION, IF ANY, #
# PROVIDED HEREUNDER IS PROVIDED "AS IS" AND WITHOUT WARRANTY OF ANY KIND. #
# DIGI HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, #
# ENHANCEMENTS, OR MODIFICATIONS. #
# #
# IN NO EVENT SHALL DIGI BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, #
# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST PROFITS, #
# ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF #
# DIGI HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES. #
#
############################################################################
# change log - v0 DRIVEN-4 CODE #
# Initial code to point channel data to TW cloud #
# Includes retrieving settings from cloud in response #
# Ryan S #
############################################################################
# change log - v1 DRIVEN-4 CODE #
# Changed the timing where the system should now deliver a heartbeat #
# every 2 seconds, and do a data dump (and retrieval of changes #
# from the server) every 3 seconds. Ryan S #
############################################################################
# change log - v2 DRIVEN-4 CODE #
# added reset_unit feature - if true on TW, then unit reboots 3-24-21 #
############################################################################
# change log - v3 DRIVEN-4 CODE 26May2021 #
# Modified dia.py to receive settings from cloud and sync device control #
# before changing #
# Modified dia.py to send all channels up at each time of cloud connect #
# (removed ignoring of set_ items and sending all be they changed or not) # #
############################################################################
# change log - v4 DRIVEN-4 CODE 16June2021 #
# Modified dia.py to send channels that have changed every 3 sec #
# Modified dia.py to send all channels every 3 minutes #
############################################################################
# change log - PROD_1.0 DRIVEN-4 CODE 5Jan2023 #
# Changed BETA_1.0 to PROD_1.0 for prod release #
############################################################################
# change log - PROD_1.1 DRIVEN-4 CODE 20Jan2023 #
# Added Python version checking for verification of ssl cert implemented #
# after python 2.7.9 #
############################################################################
"""\
To run this, use command line: python dia.py [config.yml]
"""
# imports
import sys
try:
import os.path
except ImportError:
# os comes from python.zip on our gateways.
print """\
Unable to import 'os' module. Check that your system has a 'python.zip' file,\r
and that it is the correct one for your device.\r
"""
sys.exit(1)
import os
import gc
import time
import httplib
import json
import re
import ssl
import csv # to write CSV log
import copy # for deepcopy when outputting settings as jsonl
ENTITY_MAP = {'<': '&lt;', '>': '&gt;', '&': '&amp;'}
# constants
BOOTSTRAP_VERSION = "2.1.0.8"
GC_COLLECTION_INTERVAL = 1 # seconds per loop
stime = int(round(time.time()))
json_setup = '{"status":"Success","set_light_state":"0","set_audio_ack":"False","set_audio_data":"0", "set_blower":"0","set_clean_cycle":"0","set_clean_lock_state":"0","set_heat_pump":"0","set_mz_ack":"False","set_mz_light":"0","set_pump1_speed":"0","set_pump2_speed":"0","set_pump3_speed":"0","set_spa_lock_state":"0","set_stm_state":"0","set_system_reset":"0","set_tanas_menu_entry":"0","set_tanas_menu_entry_ack":"False","set_temp_lock_state":"0","set_temperature":"100","set_dumpdelay":"1","set_temperature_ack":"False","reset_unit":"false"}'
json_last = json.loads(json_setup)
delay_time = 1
iter_number = 0
first = True
### gabe constants
gabe_thinkworks_logfile = "./gabe_thinkworx.csv" # "./gabe_thinkworx.jsonl"
csv_columns = ["time_in_service", "output_configuration_2_pump_system", "pcb_temperature", "set_clean_lock_state", "pump2_speed", "g3_level2_errors",
"peripheral_status_wireless_audio", "feature_configuration_24_hour_operation", "peripheral_status_free_pad", "lost_line_counter",
"mz_light_configuration_zone_1", "control_box_error_flow_error", "mz_light_configuration_zone_3", "mz_light_configuration_zone_4",
"spa_state", "rtc_hours", "spa_size", "mz_model", "salline_test", "pump3_timeout", "output_configuration_blower_service_mode",
"output_configuration_multi_zone_exterior", "peripheral_status_i_cast", "mz_Accent_loop_speed", "jet_1_low_operation_seconds",
"mz_POL_loop_speed", "temp_lock_state", "audio_bass", "rtc_seconds", "set_pump1_speed", "lls_power_flash", "mz_Exterior_light_intensity",
"mz_Main_color_state", "output_configuration_multi_zone_themes", "feature_configuration_display_blanking", "hawk_status_circ", "lls_power_on",
"current_jet3", "control_box_error_config_error", "output_configuration_three_pump_concurrency", "jet_2_low_operation_seconds",
"peripheral_status_chlorine", "set_system_reset", "rtc_status", "lls_power_and_ready_flash", "sys_state_spa_lock", "mz_Exterior_loop_speed",
"spa_lock_state", "rtc_date", "circ_pump_operation_seconds", "g3_chlor_test_data", "set_stm_state", "set_mz_ack", "power_jet3", "audio_treble",
"peripheral_status_two_way_audio", "set_blower", "output_configuration_heater_service_mode", "set_tanas_menu_entry_ack", "peripheral_current",
"output_configuration_blower", "peripheral_status_rheodyne", "hawk_status_filter", "pump1_operation_seconds", "sys_state_blower", "lls_set_bits",
"mz_version", "sys_state_water_care", "pump1_timeout", "light_operation_seconds", "audio_volume", "mz_POL_color_state", "sys_state_light_intensity",
"mz_24_hour_light_status", "heat_pump_current_mode_setting", "set_temp_lock_state", "peripheral_status_cd_player", "mz_light_configuration_zone_2",
"compatibility_byte", "set_light_state", "audio_source_selection", "set_clean_cycle", "mz_POL_light_intensity", "sys_state_jet_1_high", "rtc_year",
"pump3_speed", "sys_state_circ", "filter_time_2", "filter_time_1", "audio_power", "peripheral_status_networking", "mz_Exterior_wheel_status",
"audio_subwoofer_volume", "feature_configuration_pump_1_speed", "blower_operation_seconds", "daily_clean_cycle", "dosing_state", "spa_usage",
"peripheral_status_adv_lite", "rtc_month", "sys_state_clean", "ctrl_head_water_temperature", "heater_operation_seconds", "lls_ready_flash",
"g3_clrmtr_test_data", "set_heat_pump", "mz_Main_light_intensity", "peripheral_status_ace_gen_3", "set_audio_data", "sys_state_jet_3_high",
"peripheral_status_hpc", "feature_configuration_pump_2_speed", "version_box", "version_checksum", "pump_configuration", "hawk_status_econ",
"control_head_type", "high_limit_temperature", "peripheral_status_mp3", "peripheral_status_sp_audio_2", "heat_pump_current_operational_status",
"pump3_operation_seconds", "pump1_speed", "set_temperature", "voltage_heater", "sys_state_jet_2_low", "control_box_error_probe_error",
"control_box_error_zc_signal", "mz_Accent_wheel_status", "lls_power_and_ready_alter", "clean_lock_state", "power_l2", "version_head",
"power_l1", "peripheral_status_frog", "time", "sys_state_jet_1_low", "model_type", "rtc_minutes", "power_heater", "set_mz_light", "mz_Main_loop_speed",
"audio_radio_signal_strength", "mz_Accent_light_intensity", "peripheral_status_water_feature", "output_configuration_3_pump_system",
"current_heater", "pump2_operation_seconds", "blower_speed", "audio_bluetooth_pairing", "g3_sensor_status", "peripheral_status_mz_light",
"feature_configuration_extended_auxillary_key", "lls_power_and_ready_ace_err", "mz_POL_wheel_status", "audio_play_pause_status",
"sys_state_summer_time_mode", "mz_Main_wheel_status", "audio_balance", "audio_wireless_channel", "mz_system_status", "stm_state",
"sys_state_temp_lock", "hawk_filter_reset", "lls_power_and_ready_on", "voltage_l2", "voltage_l1", "set_tanas_menu_entry", "set_audio_ack",
"sys_state_jet_2_high", "sys_state_navigator_sync", "mz_Accent_color_state", "g3_sensor_data", "set_pump2_speed", "set_pump3_speed",
"feature_configuration_temp_dsp_set", "pump2_timeout", "blower_timeout", "ctrl_head_set_temperature", "voltage_jet3", "peripheral_status_video",
"sys_state_heater", "set_spa_lock_state", "g3_ph_test_data", "version_hardware", "current_l1", "peripheral_status_sp_audio",
"control_box_error_flow_signal_open", "mz_Exterior_color_state", "spa_light_state", "feature_configuration_degree_celcius", "light_timeout",
"set_temperature_ack", "current_l2"]
### Try and correct a python serializing bug
def convert(o):
return "%s" % o
raise TypeError
# internal functions & classes
class Sample(object):
# Using slots saves memory by keeping __dict__ undefined.
__slots__ = ["timestamp", "value", "unit"]
def __init__(self, timestamp=0, value=0, unit=""):
self.timestamp = timestamp
self.value = value
self.unit = unit
def __repr__(self):
try:
return '<Sample: "%s" "%s" at "%s">' % (self.value, self.unit,
iso_date(self.timestamp))
except:
return '<Sample: "%s" "%s" at "%s">' % (self.value, self.unit,
self.timestamp)
def spin_forever(core):
""" This routine prevents the main thread from exiting when the
framework is run directly from __main__.
"""
hb_loop_count = 0
ds_loop_count = 0
garbage_collect_time = 0
try:
while not core.shutdown_requested():
if garbage_collect_time >= 60:
garbage_collect_time = 0
collected_items = gc.collect()
if collected_items:
# only print if something accomplished
print ("GarbageCollector: collected %d objects."
% collected_items)
if ds_loop_count >= 3:
ds_loop_count = 0
Thingworx_con(core)
if hb_loop_count >= 2:
hb_loop_count = 0
Heartbeat(core)
#collected_items = gc.collect()
hb_loop_count = hb_loop_count + 1
ds_loop_count = ds_loop_count + 1
garbage_collect_time = garbage_collect_time + 1
core.wait_for_shutdown(GC_COLLECTION_INTERVAL)
finally:
core._shutdown()
print "dia.py is exiting...\n"
def cleanhtml(raw_html):
""" This routine takes raw html return and cleans out html encoding. - Copyright (c)2020 R.DRIVEN-4 - 030520
"""
cleanr = re.compile('<.*?>')
cleantext = re.sub(cleanr, '', raw_html)
return cleantext
def cleanup(tobeclean):
""" This routine takes out encoding to leave json text structure. - Copyright (c)2020 R.DRIVEN-4 - 030520
"""
tobeclean = tobeclean.replace("#x7b;", "{")
tobeclean = tobeclean.replace("&quot;", '"')
tobeclean = tobeclean.replace("#x3a;", ":")
tobeclean = tobeclean.replace("#x7d;", "}")
tobeclean = tobeclean.replace("&", "")
tobeclean = cleanhtml(tobeclean)
return tobeclean
def Heartbeat(core):
""" This routine produces a heartbeat every 3 seconds to the ThingWorx cloud. The heartbeat data to the
cloud is epoch time in seconds. The response from the ThingWorx cloud is are the user setting attributes.
- Copyright (c)2020 R.DRIVEN-4 - 030520
"""
#keep the settings from last time to see if they changed
global json_last
global ds_loop_count
global delay_time
global first
#time for heartbeat
htime = int(round(time.time()))
#begin building payload
payload = "{\n\t\"Heartbeat\": \"%s\"\n}" % htime
#connection settings
conn = httplib.HTTPSConnection("connectedspa.watkinsmfg.com")
if (sys.version_info > (2,7,9)):
conn = httplib.HTTPSConnection("connectedspa.watkinsmfg.com", context = ssl._create_unverified_context())
headers = {"Content-Type": "application/json", "appkey": "4c53253f-efac-46a8-a7df-6d8c5a63df26"}
url = "/Thingworx/Things/00000000000000000004F3FFFF559D0A/Services/GetHeart"
#try to connect to cloud and push heartbeat, retrieve settings back
try:
conn.request("POST", url, payload, headers)
heartbeat_response = conn.getresponse()
read_data = heartbeat_response.read()
clean_data = cleanup(read_data)
clean_data = "{" + clean_data.split('{')[1]
json_rec = json.loads(clean_data)
#If there was an error, use the last settings that were good
except:
json_rec = json_last.copy()
#if there is a settings change, apply to appropriate channel
if first == True:
json_last = json_rec.copy()
first = False
if sorted(json_rec.items()) != sorted(json_last.items()):
payload = "{\n\t\"Param\": {\n\t\"command_ack\": \"true\","
sdb = core.get_service('channel_manager').channel_database_get()
chan_list = sdb.__dict__['_ChannelDatabase__channels'].keys()
first_element = next(iter(chan_list))
pre_name = first_element.split('.')[0]
with open("./gabe_heartbeat.log", 'w') as filehandle:
filehandle.write("<gabe>\n")
filehandle.write("<gabe>\n")
filehandle.write("pre_name:\n")
filehandle.write(pre_name)
filehandle.write("\n")
filehandle.write("Settings:\n")
filehandle.write(json.dumps(json_rec))
filehandle.write("\n")
filehandle.write("</gabe>\n")
for key in json_rec:
if json_rec[key] != json_last[key]:
if json_rec["reset_unit"] == "true":
os.system('reboot')
if key != "set_dumpdelay":
channel_name = pre_name + "." + key
channel = sdb.channel_get(channel_name)
setting_name = str(key)
setting_set = str(json_rec[key])
#try to set the channel
try:
sample = Sample(int(time.time()), channel.type()(str(json_rec[key])))
channel.set(sample)
except:
setting_set = str(json_last[key])
time.sleep(0.05)
iter_payload = "\n\t\"%s\": \"%s\"," % (setting_name, setting_set)
payload = payload + iter_payload
elif key == "set_dumpdelay":
setting_name = str(key)
setting_set = str(json_rec[key])
delay_time = int(setting_set)
iter_payload = "\n\t\"%s\": \"%s\"," % (setting_name, setting_set)
payload = payload + iter_payload
payload = payload[:-1]
payload = payload + "\n\t}\n}"
# connection setup
conn = httplib.HTTPSConnection("connectedspa.watkinsmfg.com")
if (sys.version_info > (2,7,9)):
conn = httplib.HTTPSConnection("connectedspa.watkinsmfg.com", context = ssl._create_unverified_context())
headers = {"Content-Type": "application/json", "appkey": "4c53253f-efac-46a8-a7df-6d8c5a63df26"}
url = "/Thingworx/Things/00000000000000000004F3FFFF559D0A/Services/GetData"
# try the connection, if it doenst work, dont error out
try:
conn.request("POST", url, payload, headers)
except:
pass
time.sleep(delay_time)
conn.close()
ds_loop_count = 20
json_last = json_rec.copy()
return core
def Thingworx_con(core):
global iter_number
global old_list_values
""" This routine pushes all data channels to the cloud at intervals of 3 minutes. All channels are pushed
to the cloud at 3 minute intervals. Every three seconds only those channels that have changed are pushed.
Copyright (c)2021 R.DRIVEN-4 - 0615201
"""
#get current time and format it
rtime = int(round(time.time()))
strn_rtime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(rtime))
payload = "{\n\t\"Param\": {\n\t\"Time_Stamp\": \"%s\",\n\t\"dia_version\": \"PROD_1.0\"," % strn_rtime
if iter_number == 0:
old_list_values = {}
cdb = core.get_service('channel_manager').channel_database_get()
old_list = cdb.__dict__['_ChannelDatabase__channels'].keys()
for element in old_list:
channel = cdb.channel_get(element)
sample = channel.get()
value = sample.value
name = element.split('.')[1]
old_list_values[name] = value
#if it has been 30 seconds since startup, go ahead and pull channels and push to cloud
if (rtime - stime) >= 30:
new_list_values = {}
cdb = core.get_service('channel_manager').channel_database_get()
data_list = cdb.__dict__['_ChannelDatabase__channels'].keys()
for element in data_list:
channel = cdb.channel_get(element)
sample = channel.get()
value = sample.value
name = element.split('.')[1]
new_list_values[name] = value
#(not name.startswith("set_") and old_list_values[name] != new_list_values[name]) or
if (iter_number == 0) or (iter_number == 60):
iter_payload = "\n\t\"%s\": \"%s\"," % (name, value)
payload = payload + iter_payload
elif 1 <= iter_number <= 59:
if new_list_values[name] != old_list_values[name]:
iter_payload = "\n\t\"%s\": \"%s\"," % (name, value)
payload = payload + iter_payload
### Save every 3 minutes
if (iter_number == 0) or (iter_number == 60):
if not os.path.isfile(gabe_thinkworks_logfile):
with open(gabe_thinkworks_logfile, 'a') as filehandle:
writer = csv.DictWriter(filehandle, fieldnames=csv_columns, restval="")
writer.writeheader()
with open(gabe_thinkworks_logfile, 'a') as filehandle:
gabe_settings_dict = copy.deepcopy(new_list_values)
gabe_settings_dict['time'] = strn_rtime
# filehandle.write(json.dumps(gabe_settings_dict, default=convert) + "\n")
writer = csv.DictWriter(filehandle, fieldnames=csv_columns, restval="")
writer.writerow(gabe_settings_dict)
payload = payload[:-1]
payload = payload + "\n\t}\n}"
iter_number = iter_number + 1
if iter_number >= 61:
iter_number = 1
old_list_values = new_list_values
#connection setup
conn = httplib.HTTPSConnection("connectedspa.watkinsmfg.com")
if (sys.version_info > (2,7,9)):
conn = httplib.HTTPSConnection("connectedspa.watkinsmfg.com", context = ssl._create_unverified_context())
headers = {"Content-Type": "application/json", "appkey": "4c53253f-efac-46a8-a7df-6d8c5a63df26"}
url = "/Thingworx/Things/00000000000000000004F3FFFF559D0A/Services/GetData"
# try the connection, if it doenst work, dont error out
try:
conn.request("POST", url, payload, headers)
except:
pass
#time.sleep(5)
conn.close()
return core
def setup_path_and_zip():
""" Sets up the paths to import from the appropriate locations.
Also detects if a zip file is present and returns the path to the
archive
"""
#Does the dia.zip exist in our local directory?
expected_zip_path = os.path.join(os.path.abspath("."), 'dia.zip')
if os.path.exists(expected_zip_path):
#Yes, add it to the path:
sys.path.append(expected_zip_path)
#Add the paths internal to the zip file
for lib_path in ['lib', 'src']:
sys.path.insert(0, os.path.join(expected_zip_path, lib_path))
return expected_zip_path
else:
#We may be operating in a environment that doesn't need dia.zip
#find files, like /src/core/core_services.py
if not os.path.exists(os.path.join('src', 'core', 'core_services.py')):
raise RuntimeError("Unable to find dia.zip or core libraries"
", please load dia.zip and try again")
else:
#We're running in the root of the source tree, directly add the
#/src and /lib directories, insert them, so they are imported
#before base libraries, in case of conflict
for lib_path in ['lib', 'src']:
sys.path.insert(0, lib_path)
return None
def locate_configuration_file(expected_zip_path):
""" Locates the settings file, returns the settings in a file like object,
returns the source name of the settings (the file name used),
and returns the destination of where the settings could be saved.
The source and destination will differ if the source is the dia.zip file.
"""
settings_file = None
settings_flo = None
dest_file = None
#To locate the settings file, parse command line
if sys.argv and len(sys.argv) > 1:
#Use the one supplied by the args
settings_file = sys.argv[1]
if not os.path.exists(settings_file):
#Try again for NDS, doesn't have concept of local directory
settings_file = os.path.abspath(settings_file)
if not os.path.exists(settings_file):
raise RuntimeError("Settings file: %s given, but not found"
" in path" %(settings_file))
#settings file found, read it in
settings_flo = open(settings_file, 'r')
dest_file = settings_file
else:
#No direction from args to find settings file
#Search local path for dia.pyr, dia.yml in order
for possible_fname in ['dia.pyr', 'dia.yml']:
if os.path.exists(os.path.abspath(possible_fname)):
dest_file = settings_file = os.path.abspath(possible_fname)
settings_flo = open(settings_file, 'r')
break
if (expected_zip_path is not None) and (settings_flo is None):
#Previous attempts at finding the configuration are unsuccessful
#Search the dia.zip archive for the settings file, dia.pyr/dia.yml
import zipfile
import StringIO
dia_zip_flo = open(expected_zip_path, 'rb')
dia_zip = zipfile.ZipFile(dia_zip_flo)
#Find the fname, then break
for possible_fname in ['dia.pyr', 'dia.yml']:
if possible_fname in dia_zip.namelist():
#Set the flo object to the file buffer in the zip file
#Provide an alternative location, we cannot overwrite the
#configuration in the zipfile
settings_file = os.path.join(expected_zip_path, possible_fname)
dest_file = os.path.join(os.path.abspath("."), possible_fname)
settings_flo = StringIO.StringIO(dia_zip.read(possible_fname))
dia_zip_flo.close()
dia_zip.close()
break
return settings_file, settings_flo, dest_file
def do_slowdown_check():
""" Performs the slow down check and if true, slows down the startup of
the Dia. This is done to prevent platforms that have an auto restart on
exit of the Dia from spinning fast enough to prevent modification of the
platform if something causes the startup to fail immediately.
"""
#Check for the file that enables this feature
stop_fname = os.path.join(os.path.abspath("."), "nospin.txt")
if not os.path.exists(stop_fname):
print "Dia auto-detect of rapid reboot DISABLED"
return
print "Dia auto-detect of rapid reboot ENABLED"
#Continue, check the timestamp file for entries
slowdown = False
ts_file = open(stop_fname, 'r')
entries = ts_file.readlines()
ts_file.close()
##Remove all non-float compatible entries
for ent in entries[:]:
try:
float(ent)
except ValueError:
entries.remove(ent)
#If more than 9 entries, find average of last 10 entries
if len(entries) >= 10:
curr_time = time.time()
avg_time = sum(float(x.strip()) for x in entries[-10:]) / 10.0
#Compare them to the current time, and if less than 20 minutes
#mark us ready for slow down
diff_time = curr_time - avg_time
if diff_time < 1200:
print ("Initiating slow down in order to prevent the Dia from "
"spinning too fast")
slowdown = True
#Cycle out old timestamps
entries.append(str(time.time()) + os.linesep)
ts_file = open(stop_fname, "w")
ts_file.writelines(entries[-10:])
ts_file.close()
#If need to slow down, do so here
if slowdown:
print "Slowing down, pausing for 10 minutes"
time.sleep(600)
print "Done slowing down."
def main():
""" Acts as the startup script for the Dia. Sets up the environment
(sys.path) and loads the configuration file for use by the core services
to load the rest of the system
"""
#Perform slow down check in case of reboot cycle
do_slowdown_check()
#File name of the settings being used
settings_file = None
#File object derived from opening the settings file
settings_flo = None
#If using a zip file, will return path to zip
expected_zip_path = setup_path_and_zip()
#We've found the library files, verify matching version
try:
from src.common.dia_version import DIA_VERSION
except Exception:
if expected_zip_path:
print ("Error reading from Zipfile: %s Common cause for error"
"is that the files inside the zip file were compiled with "
"the incorrect version of python." %expected_zip_path)
else:
print ("No dia.zip found and unable to locate the file "
"src/common/dia_version.py in the local path.")
raise
if DIA_VERSION != BOOTSTRAP_VERSION:
raise RuntimeError("Library files found, but bootstrap and library "
"versions do not match! Expected: %s, Found: %s"
%(BOOTSTRAP_VERSION, DIA_VERSION))
#Locate the settings file that we use for starting up
settings_file, settings_flo, dest_file = \
locate_configuration_file(expected_zip_path)
if settings_file is None or settings_flo is None:
raise RuntimeError("Unable to locate settings file in local directory,"
" from command line input, or dia.zip. Please "
"provide a configuration file in one of these "
"locations.")
print "Running in environment: %s" % sys.platform
print "iDigi Device Integration Application Version %s" % DIA_VERSION
print "Source settings file: %s" % settings_file
print "Destination settings file: %s" % dest_file
from core.core_services import CoreServices
core = CoreServices(settings_flo=settings_flo,
settings_filename=dest_file)
if __name__ == "__main__":
# Don't exit. If not __main__ then the caller needs to guarantee this.
spin_forever(core)
return core
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment