Created
February 17, 2021 17:37
-
-
Save yogananda-muthaiah/4588305ae3ba6ae11df9608b31dfbb4a to your computer and use it in GitHub Desktop.
SAP Cloud Integration API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SAPCloudIntegration: | |
def __init__(self, session, api_endpoint, base64AuthString): | |
self.sess = session | |
self.api_endpoint = api_endpoint | |
self.base64AuthString = base64AuthString | |
self.CSRF_Token = self.get_csrf_token() | |
print(f"Token {self.CSRF_Token}") | |
def get_csrf_token(self): | |
headers = { | |
'Authorization': f"Basic {self.base64AuthString}", | |
'Content-Type': 'application/json', | |
'Accept': 'application/json', | |
'X-CSRF-Token': 'Fetch' | |
} | |
response = self.sess.get(self.api_endpoint, headers=headers) | |
token = response.headers['X-CSRF-Token'] | |
return token | |
def get_authentication_headers(self): | |
return { | |
'Authorization': f"Basic {self.base64AuthString}", | |
'Content-Type': 'application/json', | |
'Accept': 'application/json', | |
'X-CSRF-token': self.CSRF_Token | |
} | |
def create_iflow(self, name, iflow_id, package_id, zip_file_path): | |
headers = self.get_authentication_headers() | |
encodedStr = "" | |
with open(zip_file_path, "rb") as zip_file: | |
data = zip_file.read() | |
encodedStr = base64.b64encode(data).decode('utf-8') | |
payload = { | |
"Name": name, | |
"Id": iflow_id, | |
"PackageId": package_id, | |
"ArtifactContent": encodedStr | |
} | |
url = self.api_endpoint + "/IntegrationDesigntimeArtifacts" | |
response = sess.post(url, headers=headers, data=json.dumps(payload)) | |
print(f"create_iflow -> {response.status_code}") | |
return response | |
def delete_iflow(self, iflow_id, version): | |
headers = self.get_authentication_headers() | |
url = self.api_endpoint + f"/IntegrationDesigntimeArtifacts(Id='{iflow_id}',Version='{version}')" | |
response = sess.delete(url, headers=headers) | |
print(f"delete_iflow -> {response.status_code}") | |
return response | |
def get_iflow_configuration(self, iflow_id, version): | |
url = self.api_endpoint + f"/IntegrationDesigntimeArtifacts(Id='{iflow_id}',Version='{version}')/Configurations" | |
response = self.sess.get(url, headers=self.get_authentication_headers()) | |
print(f"get_iflow_configuration -> {response.status_code}") | |
return response | |
def update_iflow_configuration(self, iflow_config, new_config, iflow_id, version): | |
batch_id = str(uuid.uuid4()) | |
change_set_id = str(uuid.uuid4()) | |
headers = self.get_authentication_headers() | |
headers['Content-Type'] = f"multipart/mixed; boundary=batch_{batch_id}" | |
# print(f"Content-Type: {headers['Content-Type']}\n\n") | |
# Escaping all characters in the payload with \r\n as only using \n will return a 400 error | |
payload = f"--batch_{batch_id}\r\n" | |
payload += f"Content-Type: multipart/mixed; boundary=changeset_{change_set_id}\r\n\r\n" | |
for param in iflow_config: | |
key = param['ParameterKey'] | |
param_value_json = { | |
"ParameterKey" : key, | |
"ParameterValue" : f"{new_config[key]}", | |
"DataType" : param['DataType'] | |
} | |
# Construct payload for parameter change set | |
payload += f"--changeset_{change_set_id}\r\n" | |
payload += f"Content-Type: application/http\r\n" | |
payload += f"Content-Transfer-Encoding: binary\r\n\r\n" | |
payload += f"PUT IntegrationDesigntimeArtifacts(Id='{iflow_id}',Version='{version}')/$links/Configurations('{key}') HTTP/1.1\r\n" | |
payload += f"Accept: application/json\r\n" | |
payload += f"Content-Type: application/json\r\n\r\n" | |
payload += json.dumps(param_value_json, indent=0) + "\r\n\r\n" | |
# End change set and batch | |
payload += f"--changeset_{change_set_id}--\r\n" | |
payload += f"--batch_{batch_id}--\r\n" | |
# print(payload) | |
url = self.api_endpoint + f"/$batch" | |
response = self.sess.post(url, headers=headers, data=payload) | |
print(f"update_iflow_configuration -> {response.status_code}") | |
return response | |
def deploy_iflow(self, iflow_id, version): | |
url = self.api_endpoint + f"/DeployIntegrationDesigntimeArtifact?Id='{iflow_id}'&Version='{version}'" | |
response = sess.post(url, headers=self.get_authentication_headers()) | |
print(f"deploy_iflow -> {response.status_code}") | |
return response | |
def save_version_iflow(self, iflow_id, new_version): | |
url = self.api_endpoint + f"/IntegrationDesigntimeArtifactSaveAsVersion?SaveAsVersion='{new_version}'&Id='{iflow_id}'" | |
response = sess.post(url, headers=self.get_authentication_headers()) | |
print(f"save_version_iflow -> {response.status_code}") | |
return response | |
def undeploy_iflow(self, iflow_id): | |
url = self.api_endpoint + f"/IntegrationRuntimeArtifacts('{iflow_id}')" | |
response = sess.delete(url, headers=self.get_authentication_headers()) | |
print(f"undeploy_iflow -> {response.status_code}") | |
return response | |
def update_resource_in_iflow(self, iflow_id, version, resource, resource_type, file_path): | |
encodedStr = "" | |
with open(file_path, "rb") as file: | |
data = file.read() | |
encodedStr = base64.b64encode(data).decode('utf-8') | |
payload = { | |
"ResourceContent": encodedStr | |
} | |
url = self.api_endpoint + f"/IntegrationDesigntimeArtifacts(Id='{iflow_id}',Version='{version}')/$links/Resources(Name='{resource}',ResourceType='{resource_type}')" | |
response = sess.put(url, headers=self.get_authentication_headers(), data=json.dumps(payload)) | |
print(f"update_resource_in_iflow -> {response.status_code}") | |
return response |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment