Skip to content

Instantly share code, notes, and snippets.

@rajat-peloton
Created March 25, 2025 22:55
Show Gist options
  • Save rajat-peloton/b1644331a3109b966f89a35aa8a0e3f6 to your computer and use it in GitHub Desktop.
Save rajat-peloton/b1644331a3109b966f89a35aa8a0e3f6 to your computer and use it in GitHub Desktop.
Extracts ingress info for each application in argocd in the multicluster
import json
import subprocess
import csv
import yaml
LOGIN_ARGOCD_CLI = "argocd login argocd.fleet.k8s.pelotime.com --sso --grpc-web"
GET_CLUSTER_CLI = "argocd cluster list -o json --grpc-web"
GET_ALL_SERVICES_CLI = "argocd app list -o json --grpc-web"
GET_MANIFEST_FOR_APP= "argocd app manifests {app_name}"
def login_to_argocd():
# Call the CLI command and capture the output
print('Logging in to argocd')
command_output = subprocess.check_output([LOGIN_ARGOCD_CLI], shell=True)
# Print the output
print(command_output)
def get_server_cluster_info():
try:
command_output = subprocess.check_output([GET_CLUSTER_CLI], shell=True)
except subprocess.CalledProcessError as e:
print(f"Error occurred: {e}")
return
data = json.loads(command_output)
server_cluster_dict = {}
for item in data:
server = item.get("server")
cluster = item.get("name")
cluster_name = item.get("config", {}).get("awsAuthConfig", {}).get("clusterName")
labels = item.get("labels", {})
annotations = item.get("annotations", {})
env = annotations.get("env") or labels.get("env")
cluster_alias = annotations.get("cluster_alias") or labels.get("cluster_alias")
route53_domains_raw = annotations.get("route53_domains")
route53_domains = [d.strip() for d in route53_domains_raw.split(',')] if route53_domains_raw else []
applications_count = item.get("info", {}).get("applicationsCount")
if server:
server_cluster_dict[server] = {
"cluster": cluster,
"clusterName": cluster_name,
"env": env,
"cluster_alias": cluster_alias,
"route53_domains": route53_domains,
"applicationsCount": applications_count
}
return server_cluster_dict
def get_service_info(cluster_server_cluster_name_dict):
# Call the CLI command and capture the output
command_output = subprocess.check_output([GET_ALL_SERVICES_CLI], shell=True)
# Parse the JSON output
data = json.loads(command_output)
response = []
for item in data:
metadata = item.get("metadata", {})
spec = item.get("spec", {})
status = item.get("status", {})
name = metadata.get("name")
namespace = spec.get("destination", {}).get("namespace")
creation_timestamp = metadata.get("creationTimestamp")
# This field does not exist in the new JSON, so we mark it as None
app_git_repo = None
server = spec.get("destination", {}).get("server")
cluster = cluster_server_cluster_name_dict.get(server, {}).get("cluster", None)
cluster_name = cluster_server_cluster_name_dict.get(server, {}).get("clusterName", None)
summary = status.get("summary", {})
external_url = summary.get("externalURLs", None) # Not present in sample JSON
images = summary.get("images", None) # Not present in sample JSON
service = metadata.get("labels", {}).get("service", None)
# ingress_list = find_ingress_name_in_resource(status.get("resources", [])) # Not present in sample JSON
ingress_info = get_manifest_for_app(name)
service_dict = {
"name": name,
"namespace": namespace,
"service": service,
"creation_timestamp": creation_timestamp,
"app_git_repo": app_git_repo,
"server": server,
"cluster": cluster,
"cluster_name": cluster_name,
"external_url": external_url,
"images": images,
# "ingress": ingress_list,
"ingress_info": ingress_info
}
response.append(service_dict)
return response
import subprocess
import yaml
def get_manifest_for_app(app_name):
try:
manifest_yaml = subprocess.check_output([GET_MANIFEST_FOR_APP.format(app_name=app_name)], shell=True)
except subprocess.CalledProcessError as e:
print(f"Error occurred: {e}")
return []
ingress_info_list = []
try:
yaml_parsed = yaml.safe_load_all(manifest_yaml)
ingress_info = {}
for doc in yaml_parsed:
if doc is None:
continue
kind = doc.get("kind")
if doc.get('kind') == 'ConfigMap':
metadata = doc.get("metadata", {})
labels = metadata.get("labels", {})
scm = labels.get('app.onepeloton.com/scm', 'No SCM Info')
slack = labels.get('app.onepeloton.com/slack', 'No Slack Info')
team = labels.get('app.onepeloton.com/team', 'No Team Info')
service = labels.get('app.onepeloton.com/service', 'No Service Name')
repo_url = f"https://github.com/pelotoncycle/{scm}" if scm != 'No SCM Info' else None
ingress_info.update({
'scm': scm,
'repo_url': repo_url,
'slack': slack,
'team': team,
'service': service,
})
if kind == 'Weave':
spec = doc.get("spec", {})
gateways = spec.get('ingress', {}).get('gateways', [])
hosts = spec.get('ingress', {}).get('hosts', 'No Host Info')
ingress_info['public'] = 'public' in gateways
ingress_info['private'] = 'private' in gateways
ingress_info['internal'] = 'internal' in gateways
ingress_info['host'] = hosts # to do create hosts for each domain.
ingress_info_list.append(ingress_info)
except yaml.YAMLError as exc:
print(f"Error parsing YAML: {exc}")
return ingress_info_list
# Call the function to get the server and cluster information
# Note: comment out this line if you have already logged in
login_to_argocd()
cluster_server_cluster_name_dict = get_server_cluster_info()
response = get_service_info(cluster_server_cluster_name_dict)
print("Response in memory")
print(response[0])
# Define the file path for the CSV file
csv_file_path = "inventory_march.csv"
# Define the field names for the CSV file
field_names = ["name", "namespace", "service", "creation_timestamp", "app_git_repo", "server", "cluster", "cluster_name", "external_url", "images", "ingress_info"]
# Write the response dictionary to the CSV file
with open(csv_file_path, mode='w', newline='') as file:
writer = csv.DictWriter(file, fieldnames=field_names)
writer.writeheader()
writer.writerows(response)
# Print a success message
print(f"CSV file created successfully! : {csv_file_path}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment