Skip to content

Instantly share code, notes, and snippets.

@NitriKx
Last active July 18, 2023 13:31
Show Gist options
  • Save NitriKx/95dfc55a0bf213222a3f0373830632b3 to your computer and use it in GitHub Desktop.
Save NitriKx/95dfc55a0bf213222a3f0373830632b3 to your computer and use it in GitHub Desktop.
Delete all the Wiz Kubernetes connector in Error
import logging
import requests
from builtins import print, str, len, int, range, enumerate
from python_graphql_client import GraphqlClient
from requests import HTTPError
from typing import List, Optional, Dict
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("wiz-client")
client_id = "TOBECHANGED"
client_secret = "TOBECHANGED"
# You can find this value in https://app.wiz.io/user/tenant
wiz_api_url = "https://api.eu7.app.wiz.io/graphql"
def list_kubernetes_clusters(client, first: int = 100, page_info: Optional[Dict] = None):
query = """
query ClustersPage(
$filterBy: KubernetesClusterFilters
$first: Int
$after: String
) {
kubernetesClusters(filterBy: $filterBy, first: $first, after: $after) {
nodes {
id
externalId
name
kind
status
isConnectedUsingBroker
criticalSystemHealthIssueCount
highSystemHealthIssueCount
mediumSystemHealthIssueCount
lowSystemHealthIssueCount
__typename
}
pageInfo {
endCursor
hasNextPage
__typename
}
totalCount
__typename
}
}
"""
variables = {"first": first,"filterBy":{}}
if page_info is not None and page_info.get("hasNextPage", False):
variables["after"] = page_info.get("endCursor")
cluster_list = client.execute(query=query, variables=variables, operation_name="ClustersPage")
return cluster_list.get("data").get("kubernetesClusters")
def list_connectors(client, source_ids: List[str], first: int = 100, page_info: Optional[Dict] = None):
query = """
query DeploymentsPageTable(
$after: String
$first: Int
$filterBy: DeploymentFilters
) {
deployments(after: $after, first: $first, filterBy: $filterBy) {
nodes {
...DeploymentPageDetails
__typename
}
pageInfo {
endCursor
hasNextPage
__typename
}
totalCount
__typename
}
}
fragment DeploymentPageDetails on Deployment {
id
name
type
status
version
criticalSystemHealthIssueCount
highSystemHealthIssueCount
__typename
}
"""
variables = {
"first": first,
"filterBy":{"sourceIds": source_ids}
}
if page_info is not None and page_info.get("hasNextPage", False):
variables["after"] = page_info.get("endCursor")
result = client.execute(query=query, variables=variables, operation_name="DeploymentsPageTable")
return result.get("data").get("deployments")
def list_connectors(client, source_ids: List[str], first: int = 100, page_info: Optional[Dict] = None):
query = """
query DeploymentsPageTable(
$after: String
$first: Int
$filterBy: DeploymentFilters
) {
deployments(after: $after, first: $first, filterBy: $filterBy) {
nodes {
...DeploymentPageDetails
__typename
}
pageInfo {
endCursor
hasNextPage
__typename
}
totalCount
__typename
}
}
fragment DeploymentPageDetails on Deployment {
id
name
type
status
version
criticalSystemHealthIssueCount
highSystemHealthIssueCount
__typename
}
"""
variables = {
"first": first,
"filterBy":{"sourceIds": source_ids}
}
if page_info is not None and page_info.get("hasNextPage", False):
variables["after"] = page_info.get("endCursor")
result = client.execute(query=query, variables=variables, operation_name="DeploymentsPageTable")
return result.get("data").get("deployments")
# Set the API endpoint
auth_endpoint = "https://auth.app.wiz.io/oauth/token"
# Set the request headers
headers = {
"Content-Type": "application/x-www-form-urlencoded",
}
# Set the request body
body = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
"audience": "wiz-api"
}
# Make the request
response = requests.post(auth_endpoint, headers=headers, data=body)
# Check the response status code
if response.status_code == 200:
# The request was successful
data = response.json()
access_token = data["access_token"]
refresh_token = data["refresh_token"]
token_type = data["token_type"]
client = GraphqlClient(endpoint=wiz_api_url, headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": 'application/json'
})
try:
# Find all the Kubernetes clusters
kubernetes_clusters = []
kubernetes_clusters_page = list_kubernetes_clusters(client)
kubernetes_clusters = kubernetes_clusters + kubernetes_clusters_page.get("nodes")
while kubernetes_clusters_page.get("pageInfo").get("hasNextPage"):
kubernetes_clusters_page = list_kubernetes_clusters(client, page_info=kubernetes_clusters_page.get("pageInfo"))
kubernetes_clusters = kubernetes_clusters + kubernetes_clusters_page.get("nodes")
# Find all the associated connectors
source_ids = [cluster.get("id") for cluster in kubernetes_clusters]
connectors = []
connectors_page = list_connectors(client, source_ids=source_ids)
connectors = connectors + connectors_page.get("nodes")
while connectors_page.get("pageInfo").get("hasNextPage"):
connectors_page = list_connectors(client, source_ids=source_ids, page_info=connectors_page.get("pageInfo"))
connectors = connectors + connectors_page.get("nodes")
# Build the list of connectors to delete
connectors_with_issues = []
for connector in connectors:
if connector.get('criticalSystemHealthIssueCount') >= 1:
connectors_with_issues.append(connector)
logger.info(f"{len(connectors_with_issues)} connectors to delete")
# Delete the connectors by chunk of 20
n = 20
for chunk in [connectors_with_issues[i:i + n] for i in range(0, len(connectors_with_issues), n)]:
query = "mutation {\n"
connector_ids = []
for index, connector_to_delete in enumerate(chunk):
connector_ids.append(connector_to_delete.get("id"))
query += ' index' + str(index) + ': deleteConnector(input: {id: "' + connector_to_delete.get("id") + '"}) {\n _stub\n __typename\n }\n'
query += "}"
logger.info(f"Deleting {connector_ids}...")
logger.debug(query)
result = client.execute(query=query)
except HTTPError as e:
logger.exception("Error while deleting the duplicated connectors")
logger.debug(e.response.content)
else:
# The request failed
print("Error:", response.status_code)
print(response.content)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment