Created
May 23, 2025 14:24
-
-
Save adiralashiva8/4af3a818842eb194c7c5dd98eecca4fc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import time | |
from ntpath import join | |
import os | |
import json | |
from simple_salesforce import Salesforce, format_soql, SFType | |
from SSHLibrary import * | |
from urllib.parse import urlparse | |
from robot.api import logger | |
from robot.api.deco import keyword | |
from robot.libraries.BuiltIn import BuiltIn | |
from socket import gethostname | |
import shutil | |
class SalesforceHelper: | |
''' | |
Library to add, update SF records and execute SOQL commands using Simple Salesforce library | |
| = Reference = | = Links = | | |
| simple-salesforce | https://github.com/simple-salesforce/simple-salesforce | | |
''' | |
def __init__(self, env=None): | |
""" | |
The following arguments can be optionally provided when importing this library: | |
- `env`: Based on the env, access token gets copied. The access token to use when authenticating the Salesforce REST client. Access token file ``sftoken.json` `should be present under ``SF_TOKEN_DIR`` environment variable path | |
""" | |
self.token_file = None | |
if env is None: | |
self.env = 'CRM' | |
else: | |
self.env = env | |
self.token_file = self.get_file_from_remote(self.env) | |
with open(self.token_file) as json_file: | |
self.data = json.load(json_file) | |
self.token = self.data['token'] | |
self.host = self.data['instance_url'] | |
self.instance = urlparse(self.host).netloc | |
self.sf = Salesforce(instance=self.instance, | |
session_id=self.data['token']) | |
self.response = None | |
self.record_count = None | |
def get_file_from_remote(self, env): | |
''' | |
Gets File From Remote Windows Machine | |
''' | |
s = SSHLibrary() | |
path = None | |
count = 0 | |
flag = True | |
while True: | |
if count == 5: | |
break | |
count += 1 | |
try: | |
if self.check_last_modified_date(env): | |
s.open_connection(os.environ.get( | |
"TOKEN_HOST").strip(), timeout='2 min') | |
s.login(os.environ.get("REMOTE_USER").strip(), | |
os.environ.get("REMOTE_PASSWORD").strip()) | |
path = os.path.join(env, 'sf_token.json') | |
if not os.path.isdir(env): | |
os.mkdir(env) | |
time.sleep(10) | |
s.get_file(os.environ.get("SF_TOKEN_DIR").strip(), path) | |
s.close_all_connections() | |
else: | |
path = os.path.join(env, 'sf_token.json') | |
break | |
except Exception as e: | |
print(e) | |
flag = False | |
s.close_all_connections() | |
if not flag: | |
BuiltIn().fail( | |
"Failed to connect to host:{_host}".format(_host=os.environ.get("TOKEN_HOST"))) | |
return path | |
def check_last_modified_date(self, env): | |
''' | |
Checks The Last Modified Date Of A File | |
''' | |
try: | |
file_path = os.path.join(env, 'sf_token.json') | |
modified_time = os.path.getmtime(file_path) | |
modified_time = datetime.datetime.fromtimestamp(modified_time) | |
current_time = datetime.datetime.now() | |
time_difference = current_time - modified_time | |
if env == "CRM" and time_difference > datetime.timedelta(hours=2): | |
return True | |
elif env == "UAT" and time_difference > datetime.timedelta(hours=6): | |
return True | |
else: | |
return False | |
except FileNotFoundError as e: | |
return True | |
except Exception as e: | |
BuiltIn().fail( | |
"Failed to compare last modified date of file:{_file} record.".format( | |
_file=os.path.join(env, 'sf_token.json'))) | |
print(e) | |
def copy_token_file_in_same_machine(self): | |
path = os.path.join(self.env, 'sf_token.json') | |
if not os.path.isdir(self.env): | |
os.mkdir(self.env) | |
shutil.copy(os.environ.get("SF_TOKEN_DIR").strip(), path) | |
self.token_file = path | |
@keyword | |
def create_salesforce_record(self, table, json_dict): | |
""" | |
Create record in salesforce based on `table`, `dict` values and return result in dict | |
""" | |
table_obj = SFType(str(table), self.token, self.instance) | |
result = table_obj.create(json_dict) | |
try: | |
parsed_result = json.dumps(result) | |
return json.loads(parsed_result) | |
except: | |
BuiltIn().fail( | |
"Failed to create salesforce table:{_table} record.".format(_table=table)) | |
@keyword | |
def update_salesforce_record(self, table, id, json_dict): | |
""" | |
Update record in salesforce using `id` & `table` with `dict` values | |
""" | |
table_obj = SFType(str(table), self.token, self.instance) | |
result = table_obj.update(str(id), json_dict) | |
try: | |
parsed_result = json.dumps(result) | |
return json.loads(parsed_result) | |
except: | |
BuiltIn().fail("Failed to update salesforce table:{_table} record: {_record}.".format( | |
_table=table, _record=id)) | |
@keyword | |
def delete_salesforce_record(self, table, id): | |
""" | |
Delete record in salesforce using `id` & `table` | |
""" | |
table_obj = SFType(str(table), self.token, self.instance) | |
result = table_obj.delete(str(id)) | |
try: | |
parsed_result = json.dumps(result) | |
return json.loads(parsed_result) | |
except: | |
BuiltIn().fail("Failed to delete salesforce record:{_table} record: {_record}.".format( | |
_table=table, _record=id)) | |
@keyword | |
def bulk_delete_records(self, object_name, record_ids): | |
bulk_op = self.sf.bulk.__getattr__(object_name) | |
results = bulk_op.delete(record_ids) | |
print(results) | |
@keyword | |
def get_salesforce_record_info_by_id(self, table, id): | |
""" | |
Returns detailed result of record (consist fields and values) | |
""" | |
table_obj = SFType(str(table), self.token, self.instance) | |
result = table_obj.get(str(id)) | |
try: | |
parsed_result = json.dumps(result) | |
return json.loads(parsed_result) | |
except: | |
BuiltIn().fail("Failed to get salesforce record info.") | |
@keyword | |
def execute_soql_query(self, query, multiple=False): | |
""" | |
Execute and return soql query result in dict or list based on `multiple` parameter value | |
- If `multiple` is `True` will return list of dicts | |
- If `multiple` is `False` will return dict (default is False) | |
""" | |
query_result = self.sf.query(format_soql(query)) | |
try: | |
if multiple: | |
parsed_result = json.dumps(query_result['records']) | |
self.response = json.loads(parsed_result) | |
self.record_count = len(self.response) | |
else: | |
parsed_result = json.dumps(query_result['records'][0]) | |
self.response = json.loads(parsed_result) | |
self.record_count = 1 | |
return self.response | |
except: | |
self.record_count = 0 | |
BuiltIn().fail( | |
"Records not found. \nResult: {_query}".format(_query=query_result)) | |
@keyword | |
def get_query_result_count(self): | |
""" | |
Returns the count of records returned by the executed soql query. Should be used after `Execute SOQL Query` keyword | |
""" | |
return self.record_count | |
@keyword() | |
def get_query_result_field_values(self, field_name): | |
""" | |
Returns the `field_name` values from query result. Should be used after `Execute SOQL Query` keyword. | |
- Returns list of values if `multiple` results returned by query else return type is scalar | |
""" | |
if self.response: | |
if self.record_count > 1: | |
field_result = [] | |
for response in self.response: | |
field_result.append(response[field_name]) | |
return field_result | |
else: | |
return self.response[field_name] | |
@keyword | |
def get_record_lightning_url(self, table, id): | |
""" | |
Generate's lighting url to access record | |
""" | |
return "https://" + str(self.instance) + "/lightning/r/" + str(table) + "/" + str(id) + "/view" | |
@keyword | |
def execute_tooling_soql_query(self, query, multiple=False): | |
""" | |
Execute and return soql query result in dict or list based on `multiple` parameter value | |
- If `multiple` is `True` will return list of dicts | |
- If `multiple` is `False` will return dict (default is False) | |
""" | |
query_result = self.sf.toolingexecute("query", method="GET", data=None, params={"q": query}) | |
try: | |
if multiple: | |
parsed_result = json.dumps(query_result['records']) | |
self.response = json.loads(parsed_result) | |
self.record_count = len(self.response) | |
else: | |
parsed_result = json.dumps(query_result['records'][0]) | |
self.response = json.loads(parsed_result) | |
self.record_count = 1 | |
return self.response | |
except: | |
self.record_count = 0 | |
BuiltIn().fail( | |
"Records not found. \nResult: {_query}".format(_query=query_result)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment