Last active
December 21, 2023 12:53
-
-
Save hynekcer/a9d72c959c4569af32141e2586c7b86e to your computer and use it in GitHub Desktop.
Parse Salesforce report data in Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Parse Salesforce report data in Python | |
details in my answer https://stackoverflow.com/a/45645135/448474 | |
""" | |
from collections import OrderedDict | |
from simple_salesforce import Salesforce | |
import pandas as pd | |
import json | |
class SfReportsApi(Salesforce): | |
def __init__(self, *args, **kwargs): | |
"""Create a Salesforce session some way | |
# e.g. the first or the second way | |
>>> sf = SfReportsApi(instance_url='https://na1.salesforce.com', session_id='') | |
>>> sf = SfReportsApi(username='[email protected]', password='password', security_token='token') | |
""" | |
super(SfReportsApi, self).__init__(*args, **kwargs) | |
self._last_resp = None | |
def describe_report(self, report_id): | |
return self._call_report(report_id, command='/describe') | |
def to_pandas_dataframe(self, report_id, metadata=None): | |
"""SF report details exported to DataFrame, can be modified by metadata""" | |
def get_label(x): | |
return x['label'] | |
resp = self._call_report(report_id, metadata=metadata) | |
if not resp['allData']: | |
print("Detailed data have been truncated to the usual report limit (2000).") | |
columns = [] | |
converters = [] | |
sf_pandas_map = { | |
'boolean': lambda x: x['value'], | |
'currency': lambda x: x['value']['amount'] if x['value'] else None, | |
'date': lambda x: pd.Timestamp(x['value']), | |
'datetime': lambda x: pd.Timestamp(x['value']), | |
'double': lambda x: x['value'], | |
'picklist': get_label, | |
'string': get_label, | |
'textarea': get_label, | |
} | |
for col in resp['reportExtendedMetadata']['detailColumnInfo'].values(): | |
columns.append(col['label']) | |
converters.append(sf_pandas_map.get(col['dataType'], get_label)) | |
data = [[conv(cell) for conv, cell in zip(converters, row['dataCells'])] | |
for sect_key, section in resp['factMap'].items() | |
if 'rows' in section | |
for row in section['rows'] | |
] | |
df = pd.DataFrame(data, columns=columns) | |
return df | |
def _call_report(self, report_id, metadata=None, command=None): | |
url = '{}analytics/reports/{}{}'.format(self.base_url, report_id, command or '') | |
data = json.dumps({'reportMetadata': metadata}) if metadata else None | |
resp = self._call_salesforce('POST' if metadata else 'GET', url, data=data) | |
resp = self._last_resp = resp.json(object_pairs_hook=OrderedDict) | |
return resp | |
def combine_call(self, report_id, id_column): | |
"""Split report data to chunks and combine them to one DataFrame | |
>>> sf.combine_call(report_id, id_column='ACCOUNT_ID') | |
id_column: ('ACCOUNT_ID') column that allows 'greater' operator and is sorted | |
case sensitive | |
and doesn't have 2000 duplicities | |
(beware that CreatedDate and LastModifiedDate are rounded to day) | |
""" | |
splitting_value = '' | |
descripion = self.describe_report(report_id) | |
self._report_description = descripion | |
orig_metadata = descripion['reportMetadata'] | |
sf_id_column = descripion['reportExtendedMetadata']['detailColumnInfo'][id_column]['label'] | |
print(sf_id_column) | |
# modify them or write only modified keys, e.g. change filters or remove subtotals | |
metadata = { | |
'reportBooleanFilter': '({}) AND {}'.format(orig_metadata['reportBooleanFilter'], | |
len(orig_metadata['reportFilters']) + 1), | |
'reportFilters': orig_metadata['reportFilters'] + [ | |
{'column': id_column, 'operator': 'greaterThan', 'value': splitting_value}, | |
], | |
'detailColumns': orig_metadata['detailColumns'][:], | |
'sortBy': [{'sortColumn': id_column, 'sortOrder': 'Asc'}], | |
} | |
if id_column not in metadata['detailColumns']: | |
metadata['detailColumns'].append(id_column) | |
out = [] | |
while not out or not self._last_resp['allData']: | |
metadata['reportFilters'][-1]['value'] = splitting_value | |
df = self.to_pandas_dataframe(report_id, metadata) | |
out.append(df) | |
splitting_value = df.iloc[-1][sf_id_column] | |
print(len(df), splitting_value) | |
df = pd.concat(out, ignore_index=True).reindex() | |
assert len(set(df[sf_id_column])) == len(df) | |
return df |
@hynekcer thanks for this solution. When I am writing sf.get_report(report_id, command='/describe') Im getting a TypeError that says 'SFType' object is not callable.
@dan-rossano I fixed the code to work with more types of reports. It works now with any report that contain original rows. It is not for summary reports without original rows. (The old code worked only for reports with rows and subtotals.)
@hynekcer Thanks for posting this! I am interested in using similar code to ingest salesforce reports. I prefer not to rely on simple_salesforce. If you have a minute, will you give me some pointers on replacing the simple_salesforce dependency in this class with only the request library? Thanks so much, buddy!
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@lukebyrne I prefer a simple solution to download all report data. I haven't used my code for "Salesforce Reports and Dashboards REST API" since writing this in August 2017. The API was complicated and the restriction for 2000 rows was terrible.