Last active
December 16, 2022 08:26
-
-
Save AfvanMoopen/ea6cae793d4efcfa63eca5b97d995e8b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import re | |
import logging | |
from google.cloud import resourcemanager_v3 | |
from google.oauth2 import service_account | |
from google.cloud import recommender | |
from google.api_core import exceptions | |
logger = logging.getLogger(__name__) | |
logging.basicConfig(level=logging.INFO) | |
logger.setLevel(logging.INFO) | |
SCOPE = ["https://www.googleapis.com/auth/cloud-platform"] | |
credentials = service_account.Credentials.from_service_account_file( | |
"say.json", scopes=SCOPE | |
) | |
resource_mgr = resourcemanager_v3.ProjectsClient(credentials=credentials) | |
def get_insights(projects): | |
recommender_client = recommender.RecommenderClient(credentials=credentials) | |
insights = [] | |
for project in projects: | |
try: | |
project_insights = recommender_client.list_insights( | |
parent=f"projects/{project}/locations/global/insightTypes/google.iam.serviceAccount.Insight" | |
) | |
for insight in project_insights: | |
if not insight.insight_subtype == "SERVICE_ACCOUNT_USAGE": | |
continue | |
email = insight.content["email"] | |
inactive_sa = json.dumps( | |
{"serviceAccountEmail": email, "project_ID": project} | |
) | |
insights.append(inactive_sa) | |
except exceptions.PermissionDenied as e: | |
logger.info(e) | |
return insights | |
def get_projects(): | |
projects = [] | |
for project in resource_mgr.search_projects(): | |
projects.append(project.project_id) | |
return projects | |
def find_user_service_account(insights): | |
acc = [] | |
for insight in insights: | |
json_data = json.loads(insight) | |
email = json_data["serviceAccountEmail"] | |
project_id = json_data["project_ID"] | |
find_svc_acc = re.search("@" + project_id, email) | |
if find_svc_acc: | |
acc.append(email) | |
return acc | |
def main(): | |
projects = get_projects() | |
insights = get_insights(projects) | |
acc = find_user_service_account(insights) | |
acc = ",".join(acc) | |
print(f"User managed service account which is not active currently are : \n {acc}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment