Skip to content

Instantly share code, notes, and snippets.

@oximenvn
Created August 19, 2019 08:44
Show Gist options
  • Save oximenvn/1c496a9d9999e656e41a31540d58c5fd to your computer and use it in GitHub Desktop.
Save oximenvn/1c496a9d9999e656e41a31540d58c5fd to your computer and use it in GitHub Desktop.
demo access salesforce rest api
import requests
import json
class PyForce:
def __init__(self, client_id, client_secret, username, password, security_token):
self.client_id = client_id
self.client_secret = client_secret
self.username = username
self.password = password
self.security_token = security_token
def sf_api_login(self):
params = {
"grant_type": "password",
"client_id": self.client_id, # Consumer Key
"client_secret": self.client_secret, # Consumer Secret
"username": self.username, # The email you use to login
"password": self.password+self.security_token # Concat your password and your security token
}
r = requests.post("https://login.salesforce.com/services/oauth2/token", params=params)
self.access_token = r.json().get("access_token")
self.instance_url = r.json().get("instance_url")
print("Access Token:", self.access_token)
print("Instance URL", self.instance_url)
return self.access_token, self.instance_url
def api_call(self, action, parameters = {}, method = 'get', data = {}):
"""
Helper function to make calls to Salesforce REST API.
Parameters: action (the URL), URL params, method (get, post or patch), data for POST/PATCH.
"""
headers = {
'Content-type': 'application/json',
'Accept-Encoding': 'gzip',
'Authorization': 'Bearer %s' % self.access_token
}
if method == 'get':
r = requests.request(method, self.instance_url+action, headers=headers, params=parameters, timeout=30)
elif method in ['post', 'patch', 'put', 'delete']:
r = requests.request(method, self.instance_url+action, headers=headers, json=data, params=parameters, timeout=10)
else:
# other methods not implemented in this example
raise ValueError('Method should be get or post or patch.')
print('Debug: API %s call: %s' % (method, r.url) )
if r.status_code < 300:
if method in ['patch', 'delete']:
return None
else:
return r.json()
else:
raise Exception('API error when calling %s : %s' % (r.url, r.content))
def main():
instance = PyForce(
client_id = "11111",
client_secret = "11111",
username = "[email protected]",
password = "aaaaa",
security_token = "bbbbb"
)
instance.sf_api_login()
# GET
print(json.dumps(instance.api_call('/services/apexrest/Teams/'), indent=2))
print(json.dumps(instance.api_call('/services/apexrest/Teams/a042v000027DqomAAC'), indent=2))
# POST
data ={"name":"Team 7 by REST API"}
result = instance.api_call('/services/apexrest/Teams/',{}, 'post', data)
print(result['Id'])
print(json.dumps(result, indent=2))
# PUT
data = {
"id": result['Id'],
"name":"Team 6 by REST part3"
}
print(json.dumps(instance.api_call('/services/apexrest/Teams/',{}, 'put', data), indent=2))
# PATCH
data = {
"name":"Team 6 by REST part3"
}
print(json.dumps(instance.api_call('/services/apexrest/Teams/'+result['Id'],{}, 'patch', data), indent=2))
# DELETE
print(json.dumps(instance.api_call('/services/apexrest/Teams/'+result['Id'],{},'delete',{}), indent=2))
# QUERY
print(json.dumps(instance.api_call('/services/data/v39.0/query/', {
'q': 'SELECT Account.Name, Name, CloseDate from Opportunity where IsClosed = False order by CloseDate ASC LIMIT 10'
}), indent=2))
if __name__ == '__main__':
main()
@oximenvn
Copy link
Author

reference https:// jereze.com/code/authentification-salesforce-rest-api-python

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment