Skip to content

Instantly share code, notes, and snippets.

@adiralashiva8
Created September 9, 2025 03:23
Show Gist options
  • Save adiralashiva8/acff3e667492118c1fbfcc879c45b97a to your computer and use it in GitHub Desktop.
Save adiralashiva8/acff3e667492118c1fbfcc879c45b97a to your computer and use it in GitHub Desktop.
import os
import json
from time import sleep
import multiprocessing
import requests
class BuildJob():
def __init__(self):
self.JENKINS_URL = "http://<jenkins-url>:8081"
self.UNAME = "sadirala"
self.FULL_JOB_PATH = self.JENKINS_URL + "/job/ServiceNow/job/Playwright"
self.JOB_NAME = "snow-smoke-suite-parameters"
self.TOKEN = "11fca1bb0bb44bae7dd4652ea6cca824ef"
self.SLEEP_TIME = 10
def build_jenkin_param_job(self, suite):
headers = {
'Content-Type': 'application/json',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36',
"Connection": "Keep-Alive",
"Keep-Alive": "timeout=100, max=1000"
}
session = requests.Session()
session.verify = False
session.trust_env = False
os.environ['CURL_CA_BUNDLE']=""
suite_name = suite.replace("SNOW\\","")
suite_name = suite_name.replace("\\",".")
suite_name = suite_name.replace(".robot","")
try:
response = session.post(f"{self.FULL_JOB_PATH}/job/{self.JOB_NAME}/buildWithParameters?token={self.TOKEN}&suite_path={suite}&suite={suite_name}",
auth=(self.UNAME, self.TOKEN),headers=headers)
sleep(5)
if response.status_code == 201:
print(f"Started execution of suite: {suite}")
queue_item_info_end_point = response.headers['Location'] + "api/json"
print(queue_item_info_end_point)
try:
session = requests.Session()
session.verify = False
session.trust_env = False
response = session.get(queue_item_info_end_point, auth=(self.UNAME, self.TOKEN),headers=headers)
if response.status_code == 200:
result_dict = json.loads(response.text)
while 'executable' not in result_dict:
sleep(self.SLEEP_TIME)
session = requests.Session()
session.verify = False
session.trust_env = False
response = session.get(queue_item_info_end_point, auth=(self.UNAME, self.TOKEN),headers=headers)
result_dict = json.loads(response.text)
build_info_end_point = json.loads(response.text)['executable']['url'] + "api/json"
print(build_info_end_point)
try:
session = requests.Session()
session.verify = False
session.trust_env = False
response = session.get(build_info_end_point, auth=(self.UNAME, self.TOKEN),headers=headers)
if response.status_code == 200:
result_dict = json.loads(response.text)
while 'result' not in result_dict:
sleep(self.SLEEP_TIME)
session = requests.Session()
session.verify = False
session.trust_env = False
response = session.get(build_info_end_point, auth=(self.UNAME, self.TOKEN),headers=headers)
result_dict = json.loads(response.text)
except Exception as error:
print(error)
print(f"Unable to get suite: {suite} build details on fly")
while (result_dict['result'] == None):
sleep(self.SLEEP_TIME)
try:
session = requests.Session()
session.verify = False
session.trust_env = False
response = session.get(build_info_end_point, auth=(self.UNAME, self.TOKEN),headers=headers)
result_dict = json.loads(response.text)
except Exception as error:
print(error)
print(f"Unable to get suite: {suite} execution details on fly")
break
print(f"Completed execution of suite: {suite}")
print(f"{suite} - Status: {result_dict['result']}")
else:
print(f"Failed to get suite: {suite} execution details")
except Exception as error:
print(error)
print(f"Unable to get suite: {suite} execution details")
else:
print(f"Failed to start execution for suite: {suite}")
except Exception as error:
print(error)
print(f"Unable to start execution for suite: {suite}")
if __name__ == '__main__':
try:
folder = r'Playwright\Smoke'
ignore_paths = []
final_suites = set()
list_of_suites = set()
# r=root, d=directories, f = files
for r, d, f in os.walk(folder):
for file in f:
if file.lower().endswith(".robot"):
list_of_suites.add(os.path.join(r, file))
for suite in list_of_suites:
split_path = suite.split("\\")
suite_status = [True if _path not in ignore_paths else False for _path in split_path]
if all(suite_status):
final_suites.add(suite)
build_obj = BuildJob()
pool_obj = multiprocessing.Pool(60)
ans = pool_obj.map(build_obj.build_jenkin_param_job,final_suites)
pool_obj.close()
pool_obj.join()
except Exception as e:
print(e)
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment