Created
January 9, 2025 21:09
-
-
Save rajat-peloton/48d698ca785c177d18ef3b8dc5e5ef96 to your computer and use it in GitHub Desktop.
for each service in argoCD gets ingress information and categorizes to internal or external service. Also extracts host information.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import subprocess | |
import csv | |
import yaml | |
LOGIN_ARGOCD_CLI = "argocd login ui.argocd.build.k8s.pelotime.com --sso --grpc-web" | |
GET_CLUSTER_CLI = "argocd cluster list -o json --grpc-web" | |
GET_ALL_SERVICES_CLI = "argocd app list -o json --grpc-web" | |
GET_MANIFEST_FOR_APP= "argocd app manifests {app_name}" | |
# Login to ArgoCD | |
def login_to_argocd(): | |
# Call the CLI command and capture the output | |
print('Logging in to argocd') | |
command_output = subprocess.check_output([LOGIN_ARGOCD_CLI], shell=True) | |
# Print the output | |
print(command_output) | |
# Get the server and cluster information | |
def get_server_cluster_info(): | |
# returns a dictionary with server as key and cluster_name and aws_cluster_name dictionary as value. | |
# example: {'https://kubernetes.default.svc': {'clusterName': 'k8s-pelotime-com', 'AWSClusterName': 'k8s-pelotime-com'}} | |
# Call the CLI command and capture the output | |
try : | |
command_output = subprocess.check_output([GET_CLUSTER_CLI], shell=True) | |
except subprocess.CalledProcessError as e: | |
print(f"Error occurred: {e}") | |
return | |
# Parse the JSON output | |
data = json.loads(command_output) | |
# Create a dictionary to store the server and cluster information | |
server_cluster_dict = {} | |
# Extract the required fields and populate the dictionary | |
for item in data: | |
server = item.get("server") | |
cluster_name = item.get("name") | |
aws_cluster_name = item.get("config").get("awsAuthConfig", {'clusterName':'No awsAuthConfig'}).get("clusterName") | |
if server: | |
server_cluster_dict[server] = {"clusterName": cluster_name, "AWSClusterName": aws_cluster_name} | |
return server_cluster_dict | |
# get all services in the cluster with cluster_name dev, stage, and prod | |
def get_service_info(cluster_server_cluster_name_dict): | |
# Call the CLI command and capture the output | |
command_output = subprocess.check_output([GET_ALL_SERVICES_CLI], shell=True) | |
# Parse the JSON output | |
data = json.loads(command_output) | |
response = [] | |
# Go over each service and print the name | |
for item in data: | |
server = item.get("spec").get("destination").get("server") | |
cluster_name = cluster_server_cluster_name_dict.get(server,{'clusterName': "No Server Info"}).get("clusterName") | |
aws_cluster_name = cluster_server_cluster_name_dict.get(server, {'AWSclusterName': "No Server Info"}).get("AWSClusterName") | |
if cluster_name not in {'prod'}: | |
continue | |
name = item.get("metadata").get("name") | |
namespace = item.get("spec").get("destination").get("namespace") | |
creation_timestamp = item.get("metadata").get("creationTimestamp") | |
app_git_repo = item.get("metadata").get("annotations", {"app-repo-url": "No App-repo-url Info"}).get("app-repo-url") | |
external_url = item.get('status', {}).get('summary', {}).get('externalURLs', []) | |
images = item.get('status', {}).get('summary', {}).get('images',[]) | |
ingress_list = get_ingress_name_in_resource(item.get('status', {}).get('resources', [])) | |
# if cluster_name == 'prod-sre-k8s' and ingress_list: | |
# #get ingress info from manifest | |
# ingress_info = get_manifest_for_app(name) | |
# else: | |
# ingress_info = [] | |
service_dict = {"name": name, "namespace": namespace, "creation_timestamp": creation_timestamp, "app_git_repo": app_git_repo, "server": server, "cluster_name": cluster_name, "aws_cluster_name": aws_cluster_name, "external_url": external_url, "images":images, "ingress_list": ingress_list} | |
response.append(service_dict) | |
return response | |
def get_manifest_for_app(app_name): | |
try : | |
manifest_yaml = subprocess.check_output([GET_MANIFEST_FOR_APP.format(app_name = app_name)], shell=True) | |
except subprocess.CalledProcessError as e: | |
print(f"Error occurred: {e}") | |
return | |
ingress_info_list = [] | |
try: | |
yaml_parsed = yaml.safe_load_all(manifest_yaml) | |
for doc in yaml_parsed: | |
if doc is None: | |
continue | |
ingress_info ={} | |
if doc.get('kind') == 'Ingress': | |
metadata = doc.get('metadata', {}) | |
annotations = metadata.get('annotations', {}) | |
labels = metadata.get('labels', {}) | |
ingress_class = annotations.get('kubernetes.io/ingress.class') | |
service = labels.get('app.onepeloton.com/service', "No Service Name") | |
scm = labels.get('app.onepeloton.com/scm', 'No Service Name') | |
slack = labels.get('app.onepeloton.com/slack', 'No Slack info') | |
team = labels.get('app.onepeloton.com/team', 'No team info') | |
vhost = annotations.get('nginx.ingress.kubernetes.io/upstream-vhost', 'No upstream-vhost info') | |
ingress_info['ingress_name'] = metadata.get('name') | |
ingress_info['ingress_class'] = ingress_class | |
ingress_info['vhost'] = vhost | |
ingress_info['slack'] = slack | |
ingress_info['team'] = team | |
ingress_info['scm'] = scm | |
spec = doc.get('spec', {}) | |
ingress_class_from_spec = spec.get('ingressClassName') | |
if ingress_class_from_spec and not ingress_class: | |
ingress_info['ingress_class'] = ingress_class_from_spec | |
if ingress_info['ingress_class'] in {'public-nginx', 'kong', 'exposed-public-nginx'}: | |
ingress_info['is_external'] = True | |
else: | |
ingress_info['is_external'] = False | |
rules = spec.get('rules', []) | |
if not rules: | |
ingress_info_list.append(ingress_info) | |
for rule in rules: | |
rules_dict = {} | |
host = rule.get('host') | |
# if there is no host no need to add this info. | |
if not host: | |
continue | |
http = rule.get('http', {}) | |
paths = http.get('paths', []) | |
for path in paths: | |
backend = path.get('backend', {}) | |
path = path.get('path') | |
service = backend.get('service', {}) | |
port = service.get('port', {}) | |
port_name = port.get('name') | |
# for each rule add create a new dictionary and add it to the list | |
rules_dict = { | |
'host': host, | |
'service': service, | |
'port_name': port_name, | |
'path': path | |
} | |
ingress_info_list.append({**ingress_info, **rules_dict}) | |
except yaml.YAMLError as exc: | |
print(f"Error Parsing yaml file : {exc}") | |
return ingress_info_list | |
# write function to write give list of dictionary to csv file with headers | |
def write_dict_list_to_csv(response, file_name='services.csv'): | |
print(response[0].keys()) | |
with open(file_name, 'w', newline='') as file: | |
writer = csv.DictWriter(file, fieldnames=response[0].keys()) | |
writer.writeheader() | |
for row in response: | |
writer.writerow(row) | |
def get_ingress_name_in_resource(resources): | |
ingress_list = [] | |
if not resources: | |
return ingress_list | |
for resource in resources: | |
if resource['kind'] == 'Ingress': | |
ingress_dict = { | |
"namespace" : resource['namespace'], | |
"name" : resource['name'] | |
} | |
ingress_list.append(ingress_dict) | |
return ingress_list | |
def get_manifest_for_all_services(service_list): | |
all_ingress_for_services = [] | |
for service in service_list: | |
if not service.get('ingress_list'): | |
continue | |
ingress_info = get_manifest_for_app(service['name']) | |
if not ingress_info: | |
print(f"No ingress info found for {service['name']}") | |
continue | |
for ingress in ingress_info: | |
all_ingress_for_services.append({**service, **ingress}) | |
return all_ingress_for_services | |
''' | |
Description: | |
1. Login to ArgoCD | |
2. Get the server and cluster information | |
3. Get the service information. Map the service server info to cluster_name and aws_cluster_name | |
4. Get the manifest for each service, that has ingress information. | |
5. If ingress in {'public-nginx', 'kong', 'exposed-public-nginx'} then it is external. | |
6. Write the data to a csv file | |
Optimizations: | |
1. Get the cluster info in one go and store it in a dictionary. (1 call to argoCD) | |
2. Get the service info in one go and store it in a list of dictionaries. (1 call to argoCD) | |
3. for each service check if it has ingress info, if yes get the manifest for the app and extract the ingress info. (call per each service that has ingress info) | |
4. Write the data to a csv file. | |
''' | |
login_to_argocd() | |
cluster_server_cluster_name_dict = get_server_cluster_info() | |
service_list_dev_to_prod = get_service_info(cluster_server_cluster_name_dict) | |
all_ingress_for_services = get_manifest_for_all_services(service_list_dev_to_prod) | |
# for each service in the service_list_dev_to_prod, get the ingress info | |
write_dict_list_to_csv(all_ingress_for_services, 'ingress-upload-to-quicksight-with-all-hosts.csv') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment