Created
March 10, 2021 12:56
-
-
Save broschke/e064f8458fed91c1313d5715426473b8 to your computer and use it in GitHub Desktop.
Download and parse tableau server workbooks for meta data
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding: utf-8 | |
import xml.etree.ElementTree as ET | |
import tableauserverclient as TSC | |
import pandas as pd | |
import numpy as np | |
import os | |
import shutil | |
from pathlib import Path | |
import glob | |
import zipfile | |
from datetime import datetime | |
USERNAME = os.getenv('AD_USER') | |
PW = os.getenv('AD_PASS') | |
SERVER = '' | |
SITE_ID = '' | |
tableau_auth = TSC.TableauAuth(USERNAME, PW, site_id=SITE_ID) | |
request_options = TSC.RequestOptions(pagesize=1000) | |
server = TSC.Server(SERVER, use_server_version=True) | |
with server.auth.sign_in(tableau_auth): | |
all_workbooks, pagination_item = server.workbooks.get(req_options=request_options) | |
workbook_list = [[workbook.id, workbook.name, workbook.project_name] for workbook in all_workbooks] | |
workbook_ids = [item[0] for item in workbook_list] | |
p = Path(os.getcwd()) | |
if not os.path.exists(str(p)+'\\wb_downloads'): | |
os.makedirs(str(p)+'\\wb_downloads') | |
else: | |
shutil.rmtree(str(p)+'\\wb_downloads') | |
os.makedirs(str(p)+'\\wb_downloads') | |
os.chdir(str(p)+'\\wb_downloads') | |
with server.auth.sign_in(tableau_auth): | |
for wb in workbook_ids: | |
try: | |
file_path = server.workbooks.download(wb) | |
except: | |
print(f'Workbook {wb} could not be found.') | |
print("\nDownloaded workbook to {0}.".format(file_path)) | |
print("Download complete!") | |
twb_files = glob.glob('*.twb*') | |
for file in twb_files: | |
if file[-4:] == 'twbx': | |
with zipfile.ZipFile(file, 'r') as pack_wb: | |
pack_wb.extractall(os.getcwd()) | |
twb_files = glob.glob('*.twb') | |
for file in twb_files: | |
pre, ext = os.path.splitext(file) | |
os.rename(file, pre + '.xml') | |
xml_files = glob.glob('*.xml') | |
wb_name_list = [] | |
wb_table_list = [] | |
wb_query_list = [] | |
print('Parsing data sources') | |
for file in xml_files: | |
root = ET.parse(file).getroot() | |
for elem in root.iter('relation'): | |
wb_table_list.append(elem.get('table')) | |
wb_name_list.append(file) | |
for elem in root.iter('relation'): | |
wb_query_list.append(elem.text) | |
wb_name_list = [wb[:-4] for wb in wb_name_list] | |
wb_query_list = [query.replace('\r\n',' ') if query != None else query for query in wb_query_list] | |
wb_query_list = [query.replace('\n',' ') if query != None else query for query in wb_query_list] | |
df_query = pd.DataFrame(zip(wb_name_list, wb_table_list, wb_query_list), columns=['workbook', 'table', 'query']) | |
df_query = df_query[df_query.table != '[Extract].[Extract]'] | |
df_query = df_query.reset_index(drop=True) | |
df_query = df_query.replace(r'^\s*$', np.nan, regex=True) | |
df_query = df_query.dropna(thresh=2) | |
df_query = df_query.drop_duplicates() | |
print('Parsing calculated fields') | |
calcDict = {} | |
calcList = [] | |
for file in xml_files: | |
root = ET.parse(file).getroot() | |
for item in root.findall('.//column[@caption]'): | |
if item.find(".//calculation") is None: | |
continue | |
else: | |
calcDict[item.attrib['name']] = '[' + item.attrib['caption'] + ']' | |
for item in root.findall('.//column[@caption]'): | |
if item.find(".//calculation") is None: | |
continue | |
else: | |
if item.find(".//calculation[@formula]") is None: | |
continue | |
else: | |
wb_name = file[:-4] | |
calc_caption = '[' + item.attrib['caption'] + ']' | |
calc_name = item.attrib['name'] | |
calc_raw_formula = item.find(".//calculation").attrib['formula'] | |
calc_comment = '' | |
calc_formula = '' | |
for line in calc_raw_formula.split('\r\n'): | |
if line.startswith('//'): | |
calc_comment = calc_comment + line + ' ' | |
else: | |
calc_formula = calc_formula + line + ' ' | |
for name, caption in calcDict.items(): | |
calc_formula = calc_formula.replace(name, caption) | |
calc_row = (wb_name, calc_caption, calc_name, calc_formula, calc_comment) | |
calcList.append(list(calc_row)) | |
df_calcs = pd.DataFrame(calcList, columns=['Workbook', 'Name', 'Remote Name', 'Formula', 'Comment']) | |
# remove duplicate rows from data frame | |
df_calcs = df_calcs.drop_duplicates(subset=None, keep='first', inplace=False) | |
today = datetime.now().date() | |
with pd.ExcelWriter(f'{today}_zico_audit_{SITE_ID}.xlsx') as writer: | |
df_query.to_excel(writer, sheet_name='data_sources', index=False) | |
df_calcs.to_excel(writer, sheet_name='calc_fields', index=False) | |
with Path(os.getcwd()+f'\\{today}_zico_audit_{SITE_ID}.xlsx') as file: | |
destination = Path(os.path.dirname(os.getcwd())) | |
shutil.copy(file, destination) | |
os.chdir('..') | |
shutil.rmtree(str(Path(os.getcwd()))+'\\wb_downloads') | |
print("Audit complete.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment