Last active
December 16, 2022 08:14
-
-
Save AfvanMoopen/c7e82cc8034c530296bd5df28f9382be to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from google.oauth2 import service_account | |
from google.cloud import resourcemanager_v3 | |
import collections | |
import logging | |
import time | |
import re | |
from google.cloud import recommender | |
logger = logging.getLogger(__name__) | |
logging.basicConfig(level = logging.INFO) | |
logger.setLevel(logging.INFO) | |
SCOPE = ["https://www.googleapis.com/auth/cloud-platform"] | |
credentials = service_account.Credentials.from_service_account_file("secret.json", scopes = SCOPE) | |
resource_mgr = resourcemanager_v3.ProjectsClient(credentials=credentials) | |
rec_client = recommender.RecommenderClient(credentials=credentials) | |
final_recommendations = {"recommendations" : []} | |
def get_projects(): | |
'''Get the list of projects using resource manager - List projects that service accounts have access to.''' | |
projects = [] | |
for project in resource_mgr.search_projects(): | |
projects.append(project.project_id) | |
return projects | |
def recom_data(project_id): | |
'''Get the active recommendation using the recommender client''' | |
recomms = [] | |
try: | |
data = rec_client.list_recommendations(parent = f"projects/{project_id}/locations/global/recommenders/google.iam.policy.Recommender") | |
return data | |
except Exception as e: | |
return logger.info(e) | |
def verify_srv_acc(email , project_id): | |
'''verify if the service account is User managed one here ''' | |
success = re.search(('@' + project_id), email) | |
return success | |
def extract_details(data , project_id): | |
'''Extracting the required details from recommender response for its fine usage over updating role bindings''' | |
def_acc = set() | |
update_data = [] | |
for recommendation in data: | |
for op_groups in recommendation.content.operation_groups: | |
for op in op_groups.operations: | |
try: | |
srv_email = op.path_filters["/iamPolicy/bindings/*/members/*"] | |
except: | |
srv_email = op.value | |
if not verify_srv_acc(srv_email ,project_id): | |
def_acc.add(srv_email) | |
action = op.action | |
role = op.path_filters["/iamPolicy/bindings/*/role"] | |
final_recommendations["recommendations"].append({"action" : action , "role" : role , "service_account" : srv_email}) | |
update_data.append((recommendation.name , recommendation.etag)) | |
return final_recommendations , update_data | |
# print("The Default service accounts are %s" % ",".join(def_acc)) | |
if __name__ == "__main__": | |
proj_ids = get_projects() | |
for id in proj_ids: | |
rec_data = recom_data(id) | |
# logger.info(f"recommendations for the project {id} is {rec_data}") | |
rec , _ = extract_details(rec_data , id) | |
recomm = rec.get("recommendations") | |
for r in recomm: | |
srv_acc = r.get("service_account") | |
role = r.get("role") | |
action = r.get("action") | |
print(f"Service account - {srv_acc} - {action} the role {role}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment