Skip to content

Instantly share code, notes, and snippets.

@tpjfern03
Last active November 29, 2018 14:40
Show Gist options
  • Select an option

  • Save tpjfern03/26624b053e97dddc2297ca2f62080032 to your computer and use it in GitHub Desktop.

Select an option

Save tpjfern03/26624b053e97dddc2297ca2f62080032 to your computer and use it in GitHub Desktop.
[Python Template] Starting Template #template
# Program: extractUDMdata.py
# Description: This is a program to extract the customer UDM from a formatted Excel file
#
# Author: Terence Fernandes
# Date: 03/16/2018
# Version: 1.0
# pyinstaller extractUDMdata.py --add-data=logging.yaml:. --add-data=extractUDMdata.ini:. --add-data=udmFile.xlsx:.
# OR pyinstaller -F extractUDMdata.py --hidden-import=_cffi_backend
# --hidden-import=_cffi_backend
# pyi_rth_qt4plugins.py
# tar -czvf extractUDMdata.tar.gz *
# Find a string matching any in list: https://python-forum.io/Thread-Ho-to-check-if-string-contains-substring-from-list
# any(substring in string for substring in substring_list)
# if any(s in echipa1 for s in NotOKlist):
import os
os.environ['LANG'] = 'en_US.utf8'
import sys, getopt
from sys import platform
import configparser
from pathlib import Path
import codecs
# import cx_Oracle
#import re
import sqlalchemy
import argparse
import ruamel
import logging
import logging.config
#from openpyxl import load_workbook, utils
import pandas as pd
import numpy as np
import time
#from Crypto.Cipher import AES
#import base64
import subprocess
import dash
import dash_html_components as html
import dash_core_components as dcc
from dash.dependencies import Input, Output
from flask_caching import Cache
#<Major>.<YYMMDD>.<ChangeSet>
__version_info__ = 'N.YYMMDD.N'
__major_version__ = '1'
__change_set__ = '0'
__scriptName__ = 'name'
global generalSettings
global scadaSettings
def parseIntSet(nputstr=""):
selection = set()
invalid = set()
# tokens are comma separated values
tokens = [x.strip() for x in nputstr.split(',')]
for i in tokens:
if len(i) > 0:
if i[:1] == "<":
i = "1-%s"%(i[1:])
try:
# typically tokens are plain old integers
selection.add(int(i))
except:
# if not, then it might be a range
try:
token = [int(k.strip()) for k in i.split('-')]
if len(token) > 1:
token.sort()
# we have items seperated by a dash
# try to build a valid range
first = token[0]
last = token[len(token)-1]
for x in range(first, last+1):
selection.add(x)
except:
# not an int and not a range...
invalid.add(i)
# Report invalid tokens before returning valid selection
if len(invalid) > 0:
#print("Invalid integer set: %s" % str(invalid))
return invalid
return selection
# end parseIntSet
def str_to_bool(s):
if s.upper() == 'TRUE':
return True
elif s.upper() == 'FALSE':
return False
else:
raise ValueError # evil ValueError that doesn't tell you what the wrong value was
def setupLogging(logConfig, logLevel):
CONFIG_FILE = logConfig
if logConfig.upper() == 'INFO':
logLevel = logging.INFO
elif logConfig.upper() == 'ERROR':
logLevel = logging.ERROR
else:
logLevel = logging.DEBUG
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'rt') as f:
config = ruamel.yaml.safe_load(f.read())
logging.config.dictConfig(config)
print('Logging Module Enabled')
else:
print('%s file not found.' % (CONFIG_FILE))
logging.basicConfig(level=logLevel)
print('ERROR Logging Enabled')
logger.info("#######################################")
logger.info('####### Start of new Process ##########')
logger.info('### %s Version %s ###' % (__scriptName__, __version_info__))
logger.info("#######################################")
def getYAML(configFile, node):
with open(configFile, 'r') as yamlfile:
stream = yaml.load(yamlfile)
# Get the root Item
Items = stream.get(node) # dict_keys(['OPERATIONS_LOG', 'OPERATIONS_LOG_H', 'OPS_USER', 'NODE'])
return Items
def setYAML(configFile, node, key, val):
with open(configFile, 'r') as f:
doc = ruamel.yaml.load(f, Loader=ruamel.yaml.RoundTripLoader)
doc[node][key] = val
with open(configFile, 'w') as f:
ruamel.yaml.dump(doc, f, Dumper=ruamel.yaml.RoundTripDumper)
def encode(inString, encodeFlag=True):
str = inString.rjust(32)
secret_key = 'sfnwJkhFPZXm9KUwWsj3xLWieWo31zWD' # create new & store somewhere safe
cipher = AES.new(secret_key,AES.MODE_ECB)
if encodeFlag:
encoded = base64.b64encode(cipher.encrypt(str))
return encoded.decode("utf-8")
else:
decoded = cipher.decrypt(base64.b64decode(inString))
return decoded.strip().decode("utf-8")
def db_connect(schema,pw,host,dbport,sid):
''' Connect to Oracle '''
connstr = schema + '/' + pw + '@' + host + ':' + dbport + '/' + sid
logger.info('%s/****@%s:%s/%s' %(schema, host, dbport, sid))
conn = cx_Oracle.connect(connstr)
return conn
def db_close(conn):
conn.close()
return
def db_read_query(connection, query, params):
''' Execute the SQL and any parameters '''
cursor = connection.cursor()
cursor.prepare(query)
try:
cursor.execute( query, params )
## Convert the cx_Oracle object to pandas dataframe
names = [ x[0] for x in cursor.description]
data = cursor.fetchall()
df = pd.DataFrame.from_records( data, columns=names)
return df
finally:
if cursor is not None:
cursor.close()
else:
df = cursor.execute(query)
return df
def db_generate_table(dataframe, max_rows=10):
'''Given dataframe, return template generated using Dash components
'''
return html.Table(
# Header
[html.Tr([html.Th(col) for col in dataframe.columns])] +
# Body
[html.Tr([
html.Td(dataframe.iloc[i][col]) for col in dataframe.columns
]) for i in range(min(len(dataframe), max_rows))]
)
def runScript(scriptName):
# subprocess.call([scriptName])
task=[sys.executable, os.path.join(os.getcwd(), scriptName)]
# p=subprocess.Popen(task)
# result = subprocess.Popen(task, stdout = subprocess.PIPE).communicate()[0]
#p1 = subprocess.Popen(scriptName, stdout=subprocess.PIPE)
#p2 = subprocess.Popen(["grep", "test"], stdin=p1.stdout, stdout=subprocess.PIPE)
#output = p1.communicate()[0]
#p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits.
logger.info('Running %s from runScript def' % (scriptName))
result = subprocess.call(['sh', scriptName])
if result > 0:
logger.error('%d in subprocess call for %s' % (result, scriptName))
else:
logger.info('Return code for %s : %d' % (scriptName, result))
if __name__ == '__main__':
global logger
today = datetime.today()
__version_info__ = __major_version__ + '.' + today.strftime("%y%m%d") + '.' + __change_set__
logger = logging.getLogger(__name__)
setupLogging('logging.yaml', 'INFO')
dir_path = os.path.dirname(os.path.realpath(__file__))
if platform == 'win32':
configFile = dir_path + '\\' + __scriptName__ + '.yaml'
else:
configFile = dir_path + '/' + __scriptName__ + '.yaml'
initialize(configFile)
logger.info('Started server... %s' %(__scriptName__))
conn = db_connect(db_user, db_passwd, db_host, db_port, db_sid)
app.run_server(host=app_host, port=app_port, debug=True)
logger.info('Server stopped... %s' %(__scriptName__))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment