Created
June 26, 2024 18:00
-
-
Save rajat-peloton/5471959454d4b55f2fbaa520c7f01799 to your computer and use it in GitHub Desktop.
Service to Cluster Mapping Using ArgoCD CLI
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import subprocess | |
import csv | |
LOGIN_ARGOCD_CLI = "argocd login ui.argocd.build.k8s.pelotime.com --sso --grpc-web" | |
GET_CLUSTER_CLI = "argocd cluster list -o json --grpc-web" | |
GET_ALL_SERVICES_CLI = "argocd app list -o json --grpc-web" | |
def login_to_argocd(): | |
# Call the CLI command and capture the output | |
print('Logging in to argocd') | |
command_output = subprocess.check_output([LOGIN_ARGOCD_CLI], shell=True) | |
# Print the output | |
print(command_output) | |
def get_server_cluster_info(): | |
# Call the CLI command and capture the output | |
try : | |
command_output = subprocess.check_output([GET_CLUSTER_CLI], shell=True) | |
except subprocess.CalledProcessError as e: | |
print(f"Error occurred: {e}") | |
return | |
# Parse the JSON output | |
data = json.loads(command_output) | |
# Create a dictionary to store the server and cluster information | |
server_cluster_dict = {} | |
# Extract the required fields and populate the dictionary | |
for item in data: | |
server = item.get("server") | |
cluster = item.get("name") | |
cluster_name = item.get("config").get("awsAuthConfig", {'clusterName':'No awsAuthConfig'}).get("clusterName") | |
if server: | |
server_cluster_dict[server] = {"cluster": cluster, "clusterName": cluster_name} | |
return server_cluster_dict | |
def get_service_info(cluster_server_cluster_name_dict): | |
# Call the CLI command and capture the output | |
command_output = subprocess.check_output([GET_ALL_SERVICES_CLI], shell=True) | |
# Parse the JSON output | |
data = json.loads(command_output) | |
response = [] | |
# Go over each service and print the name | |
for item in data: | |
name = item.get("metadata").get("name") | |
namespace = item.get("spec").get("destination").get("namespace") | |
creation_timestamp = item.get("metadata").get("creationTimestamp") | |
app_git_repo = item.get("metadata").get("annotations", {"app-repo-url": "No App-repo-url Info"}).get("app-repo-url") | |
server = item.get("spec").get("destination").get("server") | |
cluster = cluster_server_cluster_name_dict.get(server,{'cluster': "No Server Info"}).get("cluster") | |
cluster_name = cluster_server_cluster_name_dict.get(server, {'clusterName': "No Server Info"}).get("clusterName") | |
external_url = item.get('status', {}).get('summary', {}).get('externalURLs', []) | |
service_dict = {"name": name, "namespace": namespace, "creation_timestamp": creation_timestamp, "app_git_repo": app_git_repo, "server": server, "cluster": cluster, "cluster_name": cluster_name, "external_url": external_url} | |
response.append(service_dict) | |
return response | |
# Call the function to get the server and cluster information | |
# Note: comment out this line if you have already logged in | |
login_to_argocd() | |
cluster_server_cluster_name_dict = get_server_cluster_info() | |
response = get_service_info(cluster_server_cluster_name_dict) | |
# Define the file path for the CSV file | |
csv_file_path = "inventory.csv" | |
# Define the field names for the CSV file | |
field_names = ["name", "namespace", "creation_timestamp", "app_git_repo", "server", "cluster", "cluster_name", "external_url"] | |
# Write the response dictionary to the CSV file | |
with open(csv_file_path, mode='w', newline='') as file: | |
writer = csv.DictWriter(file, fieldnames=field_names) | |
writer.writeheader() | |
writer.writerows(response) | |
# Print a success message | |
print(f"CSV file created successfully! : {csv_file_path}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment