Created
March 25, 2025 22:55
-
-
Save rajat-peloton/b1644331a3109b966f89a35aa8a0e3f6 to your computer and use it in GitHub Desktop.
Extracts ingress info for each application in argocd in the multicluster
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import subprocess | |
import csv | |
import yaml | |
LOGIN_ARGOCD_CLI = "argocd login argocd.fleet.k8s.pelotime.com --sso --grpc-web" | |
GET_CLUSTER_CLI = "argocd cluster list -o json --grpc-web" | |
GET_ALL_SERVICES_CLI = "argocd app list -o json --grpc-web" | |
GET_MANIFEST_FOR_APP= "argocd app manifests {app_name}" | |
def login_to_argocd(): | |
# Call the CLI command and capture the output | |
print('Logging in to argocd') | |
command_output = subprocess.check_output([LOGIN_ARGOCD_CLI], shell=True) | |
# Print the output | |
print(command_output) | |
def get_server_cluster_info(): | |
try: | |
command_output = subprocess.check_output([GET_CLUSTER_CLI], shell=True) | |
except subprocess.CalledProcessError as e: | |
print(f"Error occurred: {e}") | |
return | |
data = json.loads(command_output) | |
server_cluster_dict = {} | |
for item in data: | |
server = item.get("server") | |
cluster = item.get("name") | |
cluster_name = item.get("config", {}).get("awsAuthConfig", {}).get("clusterName") | |
labels = item.get("labels", {}) | |
annotations = item.get("annotations", {}) | |
env = annotations.get("env") or labels.get("env") | |
cluster_alias = annotations.get("cluster_alias") or labels.get("cluster_alias") | |
route53_domains_raw = annotations.get("route53_domains") | |
route53_domains = [d.strip() for d in route53_domains_raw.split(',')] if route53_domains_raw else [] | |
applications_count = item.get("info", {}).get("applicationsCount") | |
if server: | |
server_cluster_dict[server] = { | |
"cluster": cluster, | |
"clusterName": cluster_name, | |
"env": env, | |
"cluster_alias": cluster_alias, | |
"route53_domains": route53_domains, | |
"applicationsCount": applications_count | |
} | |
return server_cluster_dict | |
def get_service_info(cluster_server_cluster_name_dict): | |
# Call the CLI command and capture the output | |
command_output = subprocess.check_output([GET_ALL_SERVICES_CLI], shell=True) | |
# Parse the JSON output | |
data = json.loads(command_output) | |
response = [] | |
for item in data: | |
metadata = item.get("metadata", {}) | |
spec = item.get("spec", {}) | |
status = item.get("status", {}) | |
name = metadata.get("name") | |
namespace = spec.get("destination", {}).get("namespace") | |
creation_timestamp = metadata.get("creationTimestamp") | |
# This field does not exist in the new JSON, so we mark it as None | |
app_git_repo = None | |
server = spec.get("destination", {}).get("server") | |
cluster = cluster_server_cluster_name_dict.get(server, {}).get("cluster", None) | |
cluster_name = cluster_server_cluster_name_dict.get(server, {}).get("clusterName", None) | |
summary = status.get("summary", {}) | |
external_url = summary.get("externalURLs", None) # Not present in sample JSON | |
images = summary.get("images", None) # Not present in sample JSON | |
service = metadata.get("labels", {}).get("service", None) | |
# ingress_list = find_ingress_name_in_resource(status.get("resources", [])) # Not present in sample JSON | |
ingress_info = get_manifest_for_app(name) | |
service_dict = { | |
"name": name, | |
"namespace": namespace, | |
"service": service, | |
"creation_timestamp": creation_timestamp, | |
"app_git_repo": app_git_repo, | |
"server": server, | |
"cluster": cluster, | |
"cluster_name": cluster_name, | |
"external_url": external_url, | |
"images": images, | |
# "ingress": ingress_list, | |
"ingress_info": ingress_info | |
} | |
response.append(service_dict) | |
return response | |
import subprocess | |
import yaml | |
def get_manifest_for_app(app_name): | |
try: | |
manifest_yaml = subprocess.check_output([GET_MANIFEST_FOR_APP.format(app_name=app_name)], shell=True) | |
except subprocess.CalledProcessError as e: | |
print(f"Error occurred: {e}") | |
return [] | |
ingress_info_list = [] | |
try: | |
yaml_parsed = yaml.safe_load_all(manifest_yaml) | |
ingress_info = {} | |
for doc in yaml_parsed: | |
if doc is None: | |
continue | |
kind = doc.get("kind") | |
if doc.get('kind') == 'ConfigMap': | |
metadata = doc.get("metadata", {}) | |
labels = metadata.get("labels", {}) | |
scm = labels.get('app.onepeloton.com/scm', 'No SCM Info') | |
slack = labels.get('app.onepeloton.com/slack', 'No Slack Info') | |
team = labels.get('app.onepeloton.com/team', 'No Team Info') | |
service = labels.get('app.onepeloton.com/service', 'No Service Name') | |
repo_url = f"https://github.com/pelotoncycle/{scm}" if scm != 'No SCM Info' else None | |
ingress_info.update({ | |
'scm': scm, | |
'repo_url': repo_url, | |
'slack': slack, | |
'team': team, | |
'service': service, | |
}) | |
if kind == 'Weave': | |
spec = doc.get("spec", {}) | |
gateways = spec.get('ingress', {}).get('gateways', []) | |
hosts = spec.get('ingress', {}).get('hosts', 'No Host Info') | |
ingress_info['public'] = 'public' in gateways | |
ingress_info['private'] = 'private' in gateways | |
ingress_info['internal'] = 'internal' in gateways | |
ingress_info['host'] = hosts # to do create hosts for each domain. | |
ingress_info_list.append(ingress_info) | |
except yaml.YAMLError as exc: | |
print(f"Error parsing YAML: {exc}") | |
return ingress_info_list | |
# Call the function to get the server and cluster information | |
# Note: comment out this line if you have already logged in | |
login_to_argocd() | |
cluster_server_cluster_name_dict = get_server_cluster_info() | |
response = get_service_info(cluster_server_cluster_name_dict) | |
print("Response in memory") | |
print(response[0]) | |
# Define the file path for the CSV file | |
csv_file_path = "inventory_march.csv" | |
# Define the field names for the CSV file | |
field_names = ["name", "namespace", "service", "creation_timestamp", "app_git_repo", "server", "cluster", "cluster_name", "external_url", "images", "ingress_info"] | |
# Write the response dictionary to the CSV file | |
with open(csv_file_path, mode='w', newline='') as file: | |
writer = csv.DictWriter(file, fieldnames=field_names) | |
writer.writeheader() | |
writer.writerows(response) | |
# Print a success message | |
print(f"CSV file created successfully! : {csv_file_path}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment