Created
September 9, 2025 03:23
-
-
Save adiralashiva8/acff3e667492118c1fbfcc879c45b97a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
from time import sleep | |
import multiprocessing | |
import requests | |
class BuildJob(): | |
def __init__(self): | |
self.JENKINS_URL = "http://<jenkins-url>:8081" | |
self.UNAME = "sadirala" | |
self.FULL_JOB_PATH = self.JENKINS_URL + "/job/ServiceNow/job/Playwright" | |
self.JOB_NAME = "snow-smoke-suite-parameters" | |
self.TOKEN = "11fca1bb0bb44bae7dd4652ea6cca824ef" | |
self.SLEEP_TIME = 10 | |
def build_jenkin_param_job(self, suite): | |
headers = { | |
'Content-Type': 'application/json', | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36', | |
"Connection": "Keep-Alive", | |
"Keep-Alive": "timeout=100, max=1000" | |
} | |
session = requests.Session() | |
session.verify = False | |
session.trust_env = False | |
os.environ['CURL_CA_BUNDLE']="" | |
suite_name = suite.replace("SNOW\\","") | |
suite_name = suite_name.replace("\\",".") | |
suite_name = suite_name.replace(".robot","") | |
try: | |
response = session.post(f"{self.FULL_JOB_PATH}/job/{self.JOB_NAME}/buildWithParameters?token={self.TOKEN}&suite_path={suite}&suite={suite_name}", | |
auth=(self.UNAME, self.TOKEN),headers=headers) | |
sleep(5) | |
if response.status_code == 201: | |
print(f"Started execution of suite: {suite}") | |
queue_item_info_end_point = response.headers['Location'] + "api/json" | |
print(queue_item_info_end_point) | |
try: | |
session = requests.Session() | |
session.verify = False | |
session.trust_env = False | |
response = session.get(queue_item_info_end_point, auth=(self.UNAME, self.TOKEN),headers=headers) | |
if response.status_code == 200: | |
result_dict = json.loads(response.text) | |
while 'executable' not in result_dict: | |
sleep(self.SLEEP_TIME) | |
session = requests.Session() | |
session.verify = False | |
session.trust_env = False | |
response = session.get(queue_item_info_end_point, auth=(self.UNAME, self.TOKEN),headers=headers) | |
result_dict = json.loads(response.text) | |
build_info_end_point = json.loads(response.text)['executable']['url'] + "api/json" | |
print(build_info_end_point) | |
try: | |
session = requests.Session() | |
session.verify = False | |
session.trust_env = False | |
response = session.get(build_info_end_point, auth=(self.UNAME, self.TOKEN),headers=headers) | |
if response.status_code == 200: | |
result_dict = json.loads(response.text) | |
while 'result' not in result_dict: | |
sleep(self.SLEEP_TIME) | |
session = requests.Session() | |
session.verify = False | |
session.trust_env = False | |
response = session.get(build_info_end_point, auth=(self.UNAME, self.TOKEN),headers=headers) | |
result_dict = json.loads(response.text) | |
except Exception as error: | |
print(error) | |
print(f"Unable to get suite: {suite} build details on fly") | |
while (result_dict['result'] == None): | |
sleep(self.SLEEP_TIME) | |
try: | |
session = requests.Session() | |
session.verify = False | |
session.trust_env = False | |
response = session.get(build_info_end_point, auth=(self.UNAME, self.TOKEN),headers=headers) | |
result_dict = json.loads(response.text) | |
except Exception as error: | |
print(error) | |
print(f"Unable to get suite: {suite} execution details on fly") | |
break | |
print(f"Completed execution of suite: {suite}") | |
print(f"{suite} - Status: {result_dict['result']}") | |
else: | |
print(f"Failed to get suite: {suite} execution details") | |
except Exception as error: | |
print(error) | |
print(f"Unable to get suite: {suite} execution details") | |
else: | |
print(f"Failed to start execution for suite: {suite}") | |
except Exception as error: | |
print(error) | |
print(f"Unable to start execution for suite: {suite}") | |
if __name__ == '__main__': | |
try: | |
folder = r'Playwright\Smoke' | |
ignore_paths = [] | |
final_suites = set() | |
list_of_suites = set() | |
# r=root, d=directories, f = files | |
for r, d, f in os.walk(folder): | |
for file in f: | |
if file.lower().endswith(".robot"): | |
list_of_suites.add(os.path.join(r, file)) | |
for suite in list_of_suites: | |
split_path = suite.split("\\") | |
suite_status = [True if _path not in ignore_paths else False for _path in split_path] | |
if all(suite_status): | |
final_suites.add(suite) | |
build_obj = BuildJob() | |
pool_obj = multiprocessing.Pool(60) | |
ans = pool_obj.map(build_obj.build_jenkin_param_job,final_suites) | |
pool_obj.close() | |
pool_obj.join() | |
except Exception as e: | |
print(e) | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment