Skip to content

Instantly share code, notes, and snippets.

@adiralashiva8
Created September 9, 2025 03:38
Show Gist options
  • Save adiralashiva8/8a4277c5be66d282d757618095626060 to your computer and use it in GitHub Desktop.
Save adiralashiva8/8a4277c5be66d282d757618095626060 to your computer and use it in GitHub Desktop.
Upload robotframework test results into xray in parallel using atlassian-python-api
import sys
import re
from robot.api import ExecutionResult, ResultVisitor
class XrayIdChecker(ResultVisitor):
def __init__(self):
self.itdft_tags = []
self.non_duplicate_tags_list = []
def visit_test(self, test):
tags = test.tags
for tag in tags:
if "ITDFT" in str(tag).upper():
if str(tag) not in self.non_duplicate_tags_list:
self.non_duplicate_tags_list.append(tag)
result = [tag, test.status, test.message]
self.itdft_tags.append(result)
if __name__ == '__main__':
result = ExecutionResult("output.xml")
checker = XrayIdChecker()
result.visit(checker)
with open("test_results_with_xray_id.txt", 'w') as file:
for tag in checker.itdft_tags:
file.write(str(tag) + "\n")
import os
import re
import math
import time
from atlassian import Xray
import multiprocessing
class XrayUpload():
'''
Reference:
- Github: https://github.com/atlassian-api/atlassian-python-api
- Documentation: https://atlassian-python-api.readthedocs.io/xray.html
'''
def __init__(self):
self.JIRA_SERVER = "https://<jira-url>/"
self.JIRA_UNAME = "abcdef"
self.JIRA_PWD = "12345"
self.TEST_EXE_ID = 'IDTA-1345'
self.xray_object = Xray(url=self.JIRA_SERVER, username=self.JIRA_UNAME, password=self.JIRA_PWD)
def remove_html_tags(self, text):
# replace hr with new line
new_line_text = re.sub(r"<.r*?>", "\n", text)
# replace other html tags
new_line_text = re.sub(r"<.*?>", '', new_line_text)
return new_line_text
def update_test_into_text_execution(self, exe_id, robot_results_with_xray):
tests = []
for result_row in robot_results_with_xray:
row_data = eval(result_row)
tests.append(row_data[0])
try:
self.xray_object.update_test_execution(exe_id, add=tests)
except:
print("Failed to include " + str(tests) + " in " + exe_id)
def map_xray_id_with_test_id(self, xray_test_execution_list, robot_results_with_xray):
result_list = []
for result_row in robot_results_with_xray:
result_data = eval(result_row)
if result_data[0]:
for xray_result_row in xray_test_execution_list:
if result_data[0] == xray_result_row['key']:
status = 'ABORTED' if result_data[1] == 'SKIP' else result_data[1]
msg = self.remove_html_tags(result_data[2]) if result_data[1] != 'PASS' else ''
xray_result = [xray_result_row['id'], result_data[0], status, msg]
else:
continue
try:
result_list.append(xray_result)
except:
pass
return result_list
def update_xray_test_result_status(self, result):
try:
self.xray_object.update_test_run_status(result[0], result[2])
if result[2] != 'PASS':
self.xray_object.update_test_run_comment(result[0], result[-1])
except:
print(print("Failed to update status of " + result[1]))
if __name__ == '__main__':
upload_start_time = time.time()
xray_upload = XrayUpload()
robot_results_with_xray_info = open("test_results_with_xray_id.txt")
print("Adding robot results to test execution id...")
# add tests to test executions
start_time = time.time()
xray_upload.update_test_into_text_execution(xray_upload.TEST_EXE_ID, robot_results_with_xray_info)
elapsed_time = time.time() - start_time
print('Add test cases to test execution:', time.strftime("%H:%M:%S", time.gmtime(elapsed_time)))
robot_results_with_xray_info.close()
# get list of test cases associated with test execution
latest_tests_in_test_execution = []
for i in range(1, 50):
list_results = xray_upload.xray_object.get_tests_with_test_execution(xray_upload.TEST_EXE_ID, page=i, limit=200)
if len(list_results):
for result_item in list_results:
latest_tests_in_test_execution.append(result_item)
robot_results_with_xray_info = open("test_results_with_xray_id.txt")
result_list = xray_upload.map_xray_id_with_test_id(latest_tests_in_test_execution, robot_results_with_xray_info)
robot_results_with_xray_info.close()
# create a pool of processes
# What is the Maximum Number of Worker Processes in the Pool?
# The maximum number of worker processes may be limited by your operating system.
# For example, on windows, you will not be able to create more than 61 child processes in your Python program.
pool = multiprocessing.Pool()
# map the list to the pool of processes and execute them in parallel
start_time = time.time()
results = pool.map(xray_upload.update_xray_test_result_status, result_list)
elapsed_time = time.time() - start_time
print('Update test cases status/comment:', time.strftime("%H:%M:%S", time.gmtime(elapsed_time)))
robot_results_with_xray_info.close()
# close the pool of processes and wait for all processes to complete
pool.close()
pool.join()
upload_elapsed_time = time.time() - upload_start_time
print('XRAY Result Upload Total Time:', time.strftime("%H:%M:%S", time.gmtime(upload_elapsed_time)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment