Created
August 19, 2019 08:44
-
-
Save oximenvn/1c496a9d9999e656e41a31540d58c5fd to your computer and use it in GitHub Desktop.
demo access salesforce rest api
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import json | |
class PyForce: | |
def __init__(self, client_id, client_secret, username, password, security_token): | |
self.client_id = client_id | |
self.client_secret = client_secret | |
self.username = username | |
self.password = password | |
self.security_token = security_token | |
def sf_api_login(self): | |
params = { | |
"grant_type": "password", | |
"client_id": self.client_id, # Consumer Key | |
"client_secret": self.client_secret, # Consumer Secret | |
"username": self.username, # The email you use to login | |
"password": self.password+self.security_token # Concat your password and your security token | |
} | |
r = requests.post("https://login.salesforce.com/services/oauth2/token", params=params) | |
self.access_token = r.json().get("access_token") | |
self.instance_url = r.json().get("instance_url") | |
print("Access Token:", self.access_token) | |
print("Instance URL", self.instance_url) | |
return self.access_token, self.instance_url | |
def api_call(self, action, parameters = {}, method = 'get', data = {}): | |
""" | |
Helper function to make calls to Salesforce REST API. | |
Parameters: action (the URL), URL params, method (get, post or patch), data for POST/PATCH. | |
""" | |
headers = { | |
'Content-type': 'application/json', | |
'Accept-Encoding': 'gzip', | |
'Authorization': 'Bearer %s' % self.access_token | |
} | |
if method == 'get': | |
r = requests.request(method, self.instance_url+action, headers=headers, params=parameters, timeout=30) | |
elif method in ['post', 'patch', 'put', 'delete']: | |
r = requests.request(method, self.instance_url+action, headers=headers, json=data, params=parameters, timeout=10) | |
else: | |
# other methods not implemented in this example | |
raise ValueError('Method should be get or post or patch.') | |
print('Debug: API %s call: %s' % (method, r.url) ) | |
if r.status_code < 300: | |
if method in ['patch', 'delete']: | |
return None | |
else: | |
return r.json() | |
else: | |
raise Exception('API error when calling %s : %s' % (r.url, r.content)) | |
def main(): | |
instance = PyForce( | |
client_id = "11111", | |
client_secret = "11111", | |
username = "[email protected]", | |
password = "aaaaa", | |
security_token = "bbbbb" | |
) | |
instance.sf_api_login() | |
# GET | |
print(json.dumps(instance.api_call('/services/apexrest/Teams/'), indent=2)) | |
print(json.dumps(instance.api_call('/services/apexrest/Teams/a042v000027DqomAAC'), indent=2)) | |
# POST | |
data ={"name":"Team 7 by REST API"} | |
result = instance.api_call('/services/apexrest/Teams/',{}, 'post', data) | |
print(result['Id']) | |
print(json.dumps(result, indent=2)) | |
# PUT | |
data = { | |
"id": result['Id'], | |
"name":"Team 6 by REST part3" | |
} | |
print(json.dumps(instance.api_call('/services/apexrest/Teams/',{}, 'put', data), indent=2)) | |
# PATCH | |
data = { | |
"name":"Team 6 by REST part3" | |
} | |
print(json.dumps(instance.api_call('/services/apexrest/Teams/'+result['Id'],{}, 'patch', data), indent=2)) | |
# DELETE | |
print(json.dumps(instance.api_call('/services/apexrest/Teams/'+result['Id'],{},'delete',{}), indent=2)) | |
# QUERY | |
print(json.dumps(instance.api_call('/services/data/v39.0/query/', { | |
'q': 'SELECT Account.Name, Name, CloseDate from Opportunity where IsClosed = False order by CloseDate ASC LIMIT 10' | |
}), indent=2)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
reference https:// jereze.com/code/authentification-salesforce-rest-api-python