Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save rajat-peloton/48d698ca785c177d18ef3b8dc5e5ef96 to your computer and use it in GitHub Desktop.
Save rajat-peloton/48d698ca785c177d18ef3b8dc5e5ef96 to your computer and use it in GitHub Desktop.
for each service in argoCD gets ingress information and categorizes to internal or external service. Also extracts host information.
import json
import subprocess
import csv
import yaml
LOGIN_ARGOCD_CLI = "argocd login ui.argocd.build.k8s.pelotime.com --sso --grpc-web"
GET_CLUSTER_CLI = "argocd cluster list -o json --grpc-web"
GET_ALL_SERVICES_CLI = "argocd app list -o json --grpc-web"
GET_MANIFEST_FOR_APP= "argocd app manifests {app_name}"
# Login to ArgoCD
def login_to_argocd():
# Call the CLI command and capture the output
print('Logging in to argocd')
command_output = subprocess.check_output([LOGIN_ARGOCD_CLI], shell=True)
# Print the output
print(command_output)
# Get the server and cluster information
def get_server_cluster_info():
# returns a dictionary with server as key and cluster_name and aws_cluster_name dictionary as value.
# example: {'https://kubernetes.default.svc': {'clusterName': 'k8s-pelotime-com', 'AWSClusterName': 'k8s-pelotime-com'}}
# Call the CLI command and capture the output
try :
command_output = subprocess.check_output([GET_CLUSTER_CLI], shell=True)
except subprocess.CalledProcessError as e:
print(f"Error occurred: {e}")
return
# Parse the JSON output
data = json.loads(command_output)
# Create a dictionary to store the server and cluster information
server_cluster_dict = {}
# Extract the required fields and populate the dictionary
for item in data:
server = item.get("server")
cluster_name = item.get("name")
aws_cluster_name = item.get("config").get("awsAuthConfig", {'clusterName':'No awsAuthConfig'}).get("clusterName")
if server:
server_cluster_dict[server] = {"clusterName": cluster_name, "AWSClusterName": aws_cluster_name}
return server_cluster_dict
# get all services in the cluster with cluster_name dev, stage, and prod
def get_service_info(cluster_server_cluster_name_dict):
# Call the CLI command and capture the output
command_output = subprocess.check_output([GET_ALL_SERVICES_CLI], shell=True)
# Parse the JSON output
data = json.loads(command_output)
response = []
# Go over each service and print the name
for item in data:
server = item.get("spec").get("destination").get("server")
cluster_name = cluster_server_cluster_name_dict.get(server,{'clusterName': "No Server Info"}).get("clusterName")
aws_cluster_name = cluster_server_cluster_name_dict.get(server, {'AWSclusterName': "No Server Info"}).get("AWSClusterName")
if cluster_name not in {'prod'}:
continue
name = item.get("metadata").get("name")
namespace = item.get("spec").get("destination").get("namespace")
creation_timestamp = item.get("metadata").get("creationTimestamp")
app_git_repo = item.get("metadata").get("annotations", {"app-repo-url": "No App-repo-url Info"}).get("app-repo-url")
external_url = item.get('status', {}).get('summary', {}).get('externalURLs', [])
images = item.get('status', {}).get('summary', {}).get('images',[])
ingress_list = get_ingress_name_in_resource(item.get('status', {}).get('resources', []))
# if cluster_name == 'prod-sre-k8s' and ingress_list:
# #get ingress info from manifest
# ingress_info = get_manifest_for_app(name)
# else:
# ingress_info = []
service_dict = {"name": name, "namespace": namespace, "creation_timestamp": creation_timestamp, "app_git_repo": app_git_repo, "server": server, "cluster_name": cluster_name, "aws_cluster_name": aws_cluster_name, "external_url": external_url, "images":images, "ingress_list": ingress_list}
response.append(service_dict)
return response
def get_manifest_for_app(app_name):
try :
manifest_yaml = subprocess.check_output([GET_MANIFEST_FOR_APP.format(app_name = app_name)], shell=True)
except subprocess.CalledProcessError as e:
print(f"Error occurred: {e}")
return
ingress_info_list = []
try:
yaml_parsed = yaml.safe_load_all(manifest_yaml)
for doc in yaml_parsed:
if doc is None:
continue
ingress_info ={}
if doc.get('kind') == 'Ingress':
metadata = doc.get('metadata', {})
annotations = metadata.get('annotations', {})
labels = metadata.get('labels', {})
ingress_class = annotations.get('kubernetes.io/ingress.class')
service = labels.get('app.onepeloton.com/service', "No Service Name")
scm = labels.get('app.onepeloton.com/scm', 'No Service Name')
slack = labels.get('app.onepeloton.com/slack', 'No Slack info')
team = labels.get('app.onepeloton.com/team', 'No team info')
vhost = annotations.get('nginx.ingress.kubernetes.io/upstream-vhost', 'No upstream-vhost info')
ingress_info['ingress_name'] = metadata.get('name')
ingress_info['ingress_class'] = ingress_class
ingress_info['vhost'] = vhost
ingress_info['slack'] = slack
ingress_info['team'] = team
ingress_info['scm'] = scm
spec = doc.get('spec', {})
ingress_class_from_spec = spec.get('ingressClassName')
if ingress_class_from_spec and not ingress_class:
ingress_info['ingress_class'] = ingress_class_from_spec
if ingress_info['ingress_class'] in {'public-nginx', 'kong', 'exposed-public-nginx'}:
ingress_info['is_external'] = True
else:
ingress_info['is_external'] = False
rules = spec.get('rules', [])
if not rules:
ingress_info_list.append(ingress_info)
for rule in rules:
rules_dict = {}
host = rule.get('host')
# if there is no host no need to add this info.
if not host:
continue
http = rule.get('http', {})
paths = http.get('paths', [])
for path in paths:
backend = path.get('backend', {})
path = path.get('path')
service = backend.get('service', {})
port = service.get('port', {})
port_name = port.get('name')
# for each rule add create a new dictionary and add it to the list
rules_dict = {
'host': host,
'service': service,
'port_name': port_name,
'path': path
}
ingress_info_list.append({**ingress_info, **rules_dict})
except yaml.YAMLError as exc:
print(f"Error Parsing yaml file : {exc}")
return ingress_info_list
# write function to write give list of dictionary to csv file with headers
def write_dict_list_to_csv(response, file_name='services.csv'):
print(response[0].keys())
with open(file_name, 'w', newline='') as file:
writer = csv.DictWriter(file, fieldnames=response[0].keys())
writer.writeheader()
for row in response:
writer.writerow(row)
def get_ingress_name_in_resource(resources):
ingress_list = []
if not resources:
return ingress_list
for resource in resources:
if resource['kind'] == 'Ingress':
ingress_dict = {
"namespace" : resource['namespace'],
"name" : resource['name']
}
ingress_list.append(ingress_dict)
return ingress_list
def get_manifest_for_all_services(service_list):
all_ingress_for_services = []
for service in service_list:
if not service.get('ingress_list'):
continue
ingress_info = get_manifest_for_app(service['name'])
if not ingress_info:
print(f"No ingress info found for {service['name']}")
continue
for ingress in ingress_info:
all_ingress_for_services.append({**service, **ingress})
return all_ingress_for_services
'''
Description:
1. Login to ArgoCD
2. Get the server and cluster information
3. Get the service information. Map the service server info to cluster_name and aws_cluster_name
4. Get the manifest for each service, that has ingress information.
5. If ingress in {'public-nginx', 'kong', 'exposed-public-nginx'} then it is external.
6. Write the data to a csv file
Optimizations:
1. Get the cluster info in one go and store it in a dictionary. (1 call to argoCD)
2. Get the service info in one go and store it in a list of dictionaries. (1 call to argoCD)
3. for each service check if it has ingress info, if yes get the manifest for the app and extract the ingress info. (call per each service that has ingress info)
4. Write the data to a csv file.
'''
login_to_argocd()
cluster_server_cluster_name_dict = get_server_cluster_info()
service_list_dev_to_prod = get_service_info(cluster_server_cluster_name_dict)
all_ingress_for_services = get_manifest_for_all_services(service_list_dev_to_prod)
# for each service in the service_list_dev_to_prod, get the ingress info
write_dict_list_to_csv(all_ingress_for_services, 'ingress-upload-to-quicksight-with-all-hosts.csv')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment