Skip to content

Instantly share code, notes, and snippets.

@mmeendez8
Created November 27, 2018 11:41
Show Gist options
  • Save mmeendez8/276961449e6113e81ef6a54b4afe5d6c to your computer and use it in GitHub Desktop.
Save mmeendez8/276961449e6113e81ef6a54b4afe5d6c to your computer and use it in GitHub Desktop.
Teradata access using python 3.5, pyodbc, pandas and fastload
import pyodbc
from subprocess import run, CalledProcessError
import os
from string import Template
import pandas as pd
''' ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~'''
''' DATABASE CONNECTION FILE (keep it simple) '''
''' ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~'''
pyodbc.pooling = False
class DBSession:
def __init__(self, db_name): # db_name should be either TeradataRead or TeradataReadWrite (configure Teradata dbms)
self.db_name = db_name
self.conn = None
self.curs = None
def open_conn(self):
try:
# Using DSN see (https://docs.teradata.com/reader/DGwi~cQTnQKZsGY0o~jtCQ/8W6OdKmuRJU17dFYy0STzg)
self.conn = pyodbc.connect('DSN=%s' % self.db_name + ';CHARSET=UTF8')
self.conn.autocommit = True
self.conn.setdecoding(pyodbc.SQL_CHAR, encoding='utf-8')
except:
self.logger.exception("Can't connect to database " + self.db_name)
def close_conn(self):
self.conn.commit()
self.conn.close()
def execute_query(self, query, params=None):
"""
Function to execute sql query agains db
:param query:
:param params:
:return:
"""
try:
self.curs = self.conn.cursor()
if params:
self.curs.execute(query, params)
else:
self.curs.execute(query)
except pyodbc.Error as ex:
exceptionHandler(ex, query, params)
if 'select' in query.lower().split()[0]: # Just get rows if query starts with select
res = self.curs.fetchall()
else:
res = []
self.curs.close()
return res
def upload_csv(self, csv_file, table, columns, user, password, verbose=True):
"""
This function uses Fastlaod utily to upload csv file delimited with "|" instead of ',' and where all values in
file are quoted. Ex: "value1" | "value2" | . . .
:param csv_file: csv file without columns names
:param table: Insertion table
:param columns: Column names
:param user: username
:param password:
:param verbose: True | False if output is required
"""
script_text = fastload_template.substitute(DATA_FILE=csv_file,
COLUMN_DEFINITIONS=',\n'.join(['"' + column + '" (varchar(2000))' for column in columns]),
VALUES=',\n'.join([':' + '"' + column + '"' for column in columns]),
DATABASE=self.database, TABLE=table, USER=user, PASSWORD=password)
tmp_file = csv_file[:-4]
script = open(tmp_file, "w")
script.writelines("%s\n" % script_text)
script.close()
try:
if verbose:
run(["fastload < " + tmp_file], check=True, shell=True)
else:
run(["fastload < " + tmp_file], check=True, shell=True, stdout=open(os.devnull, 'w'))
except CalledProcessError as e:
if e.args[0] != 8: # Fastload gives error 8 but insert is working.. so don't touch :)
raise e
os.remove(tmp_file)
def query_pandas(self, query):
return pd.read_sql(query, self.conn)
def exceptionHandler(ex, q, p):
sqlstate = ex.args[0]
if sqlstate == '42S02': # Table does not exist (just inform in log)
print(ex.args[1])
else:
print(q)
print(p)
raise ex
fastload_template = Template('''
SET SESSION CHARSET "UTF8";
.LOGON ${DATABASE}/"$USER","$PASSWORD";
.BEGIN LOADING $TABLE ERRORFILES ${TABLE}_ET, ${TABLE}_UT;
SET RECORD VARTEXT DELIMITER '|' QUOTE YES '"';
DEFINE
$COLUMN_DEFINITIONS
FILE='$DATA_FILE'; /* specifies the location path for the flat file */
INSERT INTO $TABLE
VALUES (
$VALUES
);
.END LOADING;
.LOGOFF;
eof
''')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment