Skip to content

Instantly share code, notes, and snippets.

@adiralashiva8
Created May 23, 2025 14:24
Show Gist options
  • Save adiralashiva8/4af3a818842eb194c7c5dd98eecca4fc to your computer and use it in GitHub Desktop.
Save adiralashiva8/4af3a818842eb194c7c5dd98eecca4fc to your computer and use it in GitHub Desktop.
import datetime
import time
from ntpath import join
import os
import json
from simple_salesforce import Salesforce, format_soql, SFType
from SSHLibrary import *
from urllib.parse import urlparse
from robot.api import logger
from robot.api.deco import keyword
from robot.libraries.BuiltIn import BuiltIn
from socket import gethostname
import shutil
class SalesforceHelper:
'''
Library to add, update SF records and execute SOQL commands using Simple Salesforce library
| = Reference = | = Links = |
| simple-salesforce | https://github.com/simple-salesforce/simple-salesforce |
'''
def __init__(self, env=None):
"""
The following arguments can be optionally provided when importing this library:
- `env`: Based on the env, access token gets copied. The access token to use when authenticating the Salesforce REST client. Access token file ``sftoken.json` `should be present under ``SF_TOKEN_DIR`` environment variable path
"""
self.token_file = None
if env is None:
self.env = 'CRM'
else:
self.env = env
self.token_file = self.get_file_from_remote(self.env)
with open(self.token_file) as json_file:
self.data = json.load(json_file)
self.token = self.data['token']
self.host = self.data['instance_url']
self.instance = urlparse(self.host).netloc
self.sf = Salesforce(instance=self.instance,
session_id=self.data['token'])
self.response = None
self.record_count = None
def get_file_from_remote(self, env):
'''
Gets File From Remote Windows Machine
'''
s = SSHLibrary()
path = None
count = 0
flag = True
while True:
if count == 5:
break
count += 1
try:
if self.check_last_modified_date(env):
s.open_connection(os.environ.get(
"TOKEN_HOST").strip(), timeout='2 min')
s.login(os.environ.get("REMOTE_USER").strip(),
os.environ.get("REMOTE_PASSWORD").strip())
path = os.path.join(env, 'sf_token.json')
if not os.path.isdir(env):
os.mkdir(env)
time.sleep(10)
s.get_file(os.environ.get("SF_TOKEN_DIR").strip(), path)
s.close_all_connections()
else:
path = os.path.join(env, 'sf_token.json')
break
except Exception as e:
print(e)
flag = False
s.close_all_connections()
if not flag:
BuiltIn().fail(
"Failed to connect to host:{_host}".format(_host=os.environ.get("TOKEN_HOST")))
return path
def check_last_modified_date(self, env):
'''
Checks The Last Modified Date Of A File
'''
try:
file_path = os.path.join(env, 'sf_token.json')
modified_time = os.path.getmtime(file_path)
modified_time = datetime.datetime.fromtimestamp(modified_time)
current_time = datetime.datetime.now()
time_difference = current_time - modified_time
if env == "CRM" and time_difference > datetime.timedelta(hours=2):
return True
elif env == "UAT" and time_difference > datetime.timedelta(hours=6):
return True
else:
return False
except FileNotFoundError as e:
return True
except Exception as e:
BuiltIn().fail(
"Failed to compare last modified date of file:{_file} record.".format(
_file=os.path.join(env, 'sf_token.json')))
print(e)
def copy_token_file_in_same_machine(self):
path = os.path.join(self.env, 'sf_token.json')
if not os.path.isdir(self.env):
os.mkdir(self.env)
shutil.copy(os.environ.get("SF_TOKEN_DIR").strip(), path)
self.token_file = path
@keyword
def create_salesforce_record(self, table, json_dict):
"""
Create record in salesforce based on `table`, `dict` values and return result in dict
"""
table_obj = SFType(str(table), self.token, self.instance)
result = table_obj.create(json_dict)
try:
parsed_result = json.dumps(result)
return json.loads(parsed_result)
except:
BuiltIn().fail(
"Failed to create salesforce table:{_table} record.".format(_table=table))
@keyword
def update_salesforce_record(self, table, id, json_dict):
"""
Update record in salesforce using `id` & `table` with `dict` values
"""
table_obj = SFType(str(table), self.token, self.instance)
result = table_obj.update(str(id), json_dict)
try:
parsed_result = json.dumps(result)
return json.loads(parsed_result)
except:
BuiltIn().fail("Failed to update salesforce table:{_table} record: {_record}.".format(
_table=table, _record=id))
@keyword
def delete_salesforce_record(self, table, id):
"""
Delete record in salesforce using `id` & `table`
"""
table_obj = SFType(str(table), self.token, self.instance)
result = table_obj.delete(str(id))
try:
parsed_result = json.dumps(result)
return json.loads(parsed_result)
except:
BuiltIn().fail("Failed to delete salesforce record:{_table} record: {_record}.".format(
_table=table, _record=id))
@keyword
def bulk_delete_records(self, object_name, record_ids):
bulk_op = self.sf.bulk.__getattr__(object_name)
results = bulk_op.delete(record_ids)
print(results)
@keyword
def get_salesforce_record_info_by_id(self, table, id):
"""
Returns detailed result of record (consist fields and values)
"""
table_obj = SFType(str(table), self.token, self.instance)
result = table_obj.get(str(id))
try:
parsed_result = json.dumps(result)
return json.loads(parsed_result)
except:
BuiltIn().fail("Failed to get salesforce record info.")
@keyword
def execute_soql_query(self, query, multiple=False):
"""
Execute and return soql query result in dict or list based on `multiple` parameter value
- If `multiple` is `True` will return list of dicts
- If `multiple` is `False` will return dict (default is False)
"""
query_result = self.sf.query(format_soql(query))
try:
if multiple:
parsed_result = json.dumps(query_result['records'])
self.response = json.loads(parsed_result)
self.record_count = len(self.response)
else:
parsed_result = json.dumps(query_result['records'][0])
self.response = json.loads(parsed_result)
self.record_count = 1
return self.response
except:
self.record_count = 0
BuiltIn().fail(
"Records not found. \nResult: {_query}".format(_query=query_result))
@keyword
def get_query_result_count(self):
"""
Returns the count of records returned by the executed soql query. Should be used after `Execute SOQL Query` keyword
"""
return self.record_count
@keyword()
def get_query_result_field_values(self, field_name):
"""
Returns the `field_name` values from query result. Should be used after `Execute SOQL Query` keyword.
- Returns list of values if `multiple` results returned by query else return type is scalar
"""
if self.response:
if self.record_count > 1:
field_result = []
for response in self.response:
field_result.append(response[field_name])
return field_result
else:
return self.response[field_name]
@keyword
def get_record_lightning_url(self, table, id):
"""
Generate's lighting url to access record
"""
return "https://" + str(self.instance) + "/lightning/r/" + str(table) + "/" + str(id) + "/view"
@keyword
def execute_tooling_soql_query(self, query, multiple=False):
"""
Execute and return soql query result in dict or list based on `multiple` parameter value
- If `multiple` is `True` will return list of dicts
- If `multiple` is `False` will return dict (default is False)
"""
query_result = self.sf.toolingexecute("query", method="GET", data=None, params={"q": query})
try:
if multiple:
parsed_result = json.dumps(query_result['records'])
self.response = json.loads(parsed_result)
self.record_count = len(self.response)
else:
parsed_result = json.dumps(query_result['records'][0])
self.response = json.loads(parsed_result)
self.record_count = 1
return self.response
except:
self.record_count = 0
BuiltIn().fail(
"Records not found. \nResult: {_query}".format(_query=query_result))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment