Created
September 9, 2025 03:38
-
-
Save adiralashiva8/8a4277c5be66d282d757618095626060 to your computer and use it in GitHub Desktop.
Upload robotframework test results into xray in parallel using atlassian-python-api
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import re | |
from robot.api import ExecutionResult, ResultVisitor | |
class XrayIdChecker(ResultVisitor): | |
def __init__(self): | |
self.itdft_tags = [] | |
self.non_duplicate_tags_list = [] | |
def visit_test(self, test): | |
tags = test.tags | |
for tag in tags: | |
if "ITDFT" in str(tag).upper(): | |
if str(tag) not in self.non_duplicate_tags_list: | |
self.non_duplicate_tags_list.append(tag) | |
result = [tag, test.status, test.message] | |
self.itdft_tags.append(result) | |
if __name__ == '__main__': | |
result = ExecutionResult("output.xml") | |
checker = XrayIdChecker() | |
result.visit(checker) | |
with open("test_results_with_xray_id.txt", 'w') as file: | |
for tag in checker.itdft_tags: | |
file.write(str(tag) + "\n") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
import math | |
import time | |
from atlassian import Xray | |
import multiprocessing | |
class XrayUpload(): | |
''' | |
Reference: | |
- Github: https://github.com/atlassian-api/atlassian-python-api | |
- Documentation: https://atlassian-python-api.readthedocs.io/xray.html | |
''' | |
def __init__(self): | |
self.JIRA_SERVER = "https://<jira-url>/" | |
self.JIRA_UNAME = "abcdef" | |
self.JIRA_PWD = "12345" | |
self.TEST_EXE_ID = 'IDTA-1345' | |
self.xray_object = Xray(url=self.JIRA_SERVER, username=self.JIRA_UNAME, password=self.JIRA_PWD) | |
def remove_html_tags(self, text): | |
# replace hr with new line | |
new_line_text = re.sub(r"<.r*?>", "\n", text) | |
# replace other html tags | |
new_line_text = re.sub(r"<.*?>", '', new_line_text) | |
return new_line_text | |
def update_test_into_text_execution(self, exe_id, robot_results_with_xray): | |
tests = [] | |
for result_row in robot_results_with_xray: | |
row_data = eval(result_row) | |
tests.append(row_data[0]) | |
try: | |
self.xray_object.update_test_execution(exe_id, add=tests) | |
except: | |
print("Failed to include " + str(tests) + " in " + exe_id) | |
def map_xray_id_with_test_id(self, xray_test_execution_list, robot_results_with_xray): | |
result_list = [] | |
for result_row in robot_results_with_xray: | |
result_data = eval(result_row) | |
if result_data[0]: | |
for xray_result_row in xray_test_execution_list: | |
if result_data[0] == xray_result_row['key']: | |
status = 'ABORTED' if result_data[1] == 'SKIP' else result_data[1] | |
msg = self.remove_html_tags(result_data[2]) if result_data[1] != 'PASS' else '' | |
xray_result = [xray_result_row['id'], result_data[0], status, msg] | |
else: | |
continue | |
try: | |
result_list.append(xray_result) | |
except: | |
pass | |
return result_list | |
def update_xray_test_result_status(self, result): | |
try: | |
self.xray_object.update_test_run_status(result[0], result[2]) | |
if result[2] != 'PASS': | |
self.xray_object.update_test_run_comment(result[0], result[-1]) | |
except: | |
print(print("Failed to update status of " + result[1])) | |
if __name__ == '__main__': | |
upload_start_time = time.time() | |
xray_upload = XrayUpload() | |
robot_results_with_xray_info = open("test_results_with_xray_id.txt") | |
print("Adding robot results to test execution id...") | |
# add tests to test executions | |
start_time = time.time() | |
xray_upload.update_test_into_text_execution(xray_upload.TEST_EXE_ID, robot_results_with_xray_info) | |
elapsed_time = time.time() - start_time | |
print('Add test cases to test execution:', time.strftime("%H:%M:%S", time.gmtime(elapsed_time))) | |
robot_results_with_xray_info.close() | |
# get list of test cases associated with test execution | |
latest_tests_in_test_execution = [] | |
for i in range(1, 50): | |
list_results = xray_upload.xray_object.get_tests_with_test_execution(xray_upload.TEST_EXE_ID, page=i, limit=200) | |
if len(list_results): | |
for result_item in list_results: | |
latest_tests_in_test_execution.append(result_item) | |
robot_results_with_xray_info = open("test_results_with_xray_id.txt") | |
result_list = xray_upload.map_xray_id_with_test_id(latest_tests_in_test_execution, robot_results_with_xray_info) | |
robot_results_with_xray_info.close() | |
# create a pool of processes | |
# What is the Maximum Number of Worker Processes in the Pool? | |
# The maximum number of worker processes may be limited by your operating system. | |
# For example, on windows, you will not be able to create more than 61 child processes in your Python program. | |
pool = multiprocessing.Pool() | |
# map the list to the pool of processes and execute them in parallel | |
start_time = time.time() | |
results = pool.map(xray_upload.update_xray_test_result_status, result_list) | |
elapsed_time = time.time() - start_time | |
print('Update test cases status/comment:', time.strftime("%H:%M:%S", time.gmtime(elapsed_time))) | |
robot_results_with_xray_info.close() | |
# close the pool of processes and wait for all processes to complete | |
pool.close() | |
pool.join() | |
upload_elapsed_time = time.time() - upload_start_time | |
print('XRAY Result Upload Total Time:', time.strftime("%H:%M:%S", time.gmtime(upload_elapsed_time))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment