Created
January 31, 2022 17:40
-
-
Save dharmab/e31af8a6c28a023b39409e3fafbb3ce8 to your computer and use it in GitHub Desktop.
k8s-proxy.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Optional, Any, Union, Dict, List, Set | |
import json | |
import kubernetes.client | |
import urllib3.response | |
from yarl import URL | |
import functools | |
import operator | |
import datetime | |
import dataclasses | |
class ProxiedResponse(urllib3.response.HTTPResponse): | |
"""Extends an HTTPResponse with a raise_for_status() method""" | |
def __init__(self, base_response: urllib3.response.HTTPResponse): | |
self._base_response = base_response | |
super().__init__(self) | |
def raise_for_status(self) -> None: | |
""" | |
https://requests.readthedocs.io/en/master/api/#requests.Response.raise_for_status | |
""" | |
if 400 <= self._base_response.status < 600: | |
raise urllib3.exceptions.HTTPError( | |
f"{self._base_response.status} Error: {self._base_response.reason} for url: {self._base_response.geturl()}" | |
) | |
def __getattribute__(self, item: str) -> Any: | |
# https://docs.python.org/3/reference/datamodel.html#object.__getattribute__ | |
if item in ["_base_response", "raise_for_status"]: | |
return object.__getattribute__(self, item) | |
return getattr(self._base_response, item) | |
JSONType = Union[str, int, float, bool, None, Dict, List] | |
def proxy_request( | |
*, | |
method: str, | |
core_client: kubernetes.client.CoreV1Api, | |
namespace: str, | |
service: str, | |
path: str, | |
params: Optional[Dict[str, Union[list, str]]] = None, | |
timeout: Optional[int] = None, | |
headers: Optional[Dict] = None, | |
json_request_body: Optional[JSONType] = None, | |
data: Optional[Union[str, Dict]] = None, | |
) -> ProxiedResponse: | |
""" | |
Proxies an HTTP request through the Kubernetes API server to a destination service. | |
:param method: HTTP method as string ("GET", "POST") | |
:param core_client: Kubernetes core client | |
:param namespace: Kubernetes service's Namespace | |
:param service: Target service in format "serviceName:port", e.g. "web:8080" | |
:param path: Target URL path | |
:param params: Target URL parameters | |
:param timeout: Request timeout | |
:param headers: Request HTTP headers | |
:param json_request_body: JSON request data | |
:param data: Non-JSON request data. See: https://requests.readthedocs.io/en/master/user/quickstart/#more-complicated-post-requests | |
:return: Response from the target service | |
""" | |
if headers is None: | |
headers = {} | |
if params is None: | |
params = {} | |
# Taken from kubernetes.client.CoreV1Api.connect_get_namespaced_service_proxy_with_path_with_http_info | |
base_path = URL("/api/v1/namespaces") / namespace / "services" / service / "proxy" | |
# core_client.api_client.call_api requires that the path have no queries on it. | |
resource_path = str(URL(f"{base_path}/{path.lstrip('/')}").with_query({})) | |
# core_client.api_client.call_api prefers lists of tuples over dictionaries. Using a dictionary leads to | |
# serialization issues in the path. | |
query_params: List[tuple] = functools.reduce( | |
operator.iconcat, | |
[ | |
[(key, value)] if isinstance(value, str) else [(key, v) for v in value] | |
for key, value in params.items() | |
], | |
[], | |
) | |
# See kubernetes.client.CoreV1Api.connect_get_namespaced_service_proxy_with_path_with_http_info | |
header_params = { | |
"Accept": core_client.api_client.select_header_accept(["*/*"]), | |
} | |
body: Optional[Union[JSONType, Union[str, Dict]]] = None | |
post_params = None | |
if data is not None: | |
if isinstance(data, str): | |
header_params["Content-Type"] = "application/octet-stream" | |
body = data | |
elif isinstance(data, dict): | |
header_params["Content-Type"] = "application/x-www-form-urlencoded" | |
post_params = list(data.items()) | |
else: | |
raise TypeError(f"Cannot POST data with type {type(data)}") | |
elif json_request_body is not None: | |
header_params[ | |
"Content-Type" | |
] = core_client.api_client.select_header_content_type(["*/*"]) | |
body = json_request_body | |
header_params = {**header_params, **headers} | |
return ProxiedResponse( | |
core_client.api_client.call_api( | |
resource_path=resource_path, | |
method=method, | |
query_params=query_params, | |
header_params=header_params, | |
post_params=post_params, | |
response_type="str", | |
auth_settings=["BearerToken"], | |
async_req=False, | |
_return_http_data_only=True, | |
_preload_content=False, | |
collection_formats={}, | |
_request_timeout=timeout, | |
body=body, | |
) | |
) | |
# Usage example: Types and functions to manage Alertmanager silences | |
@dataclasses.dataclass(eq=True, frozen=True) | |
class Matcher: | |
""" | |
Matchers are conditions for Silences in the Alertmanager API. | |
""" | |
name: str | |
value: str | |
is_value_regular_expresion: bool | |
def as_dict(self) -> Dict[str, Any]: | |
return { | |
"name": self.name, | |
"value": self.value, | |
"isRegex": self.is_value_regular_expresion, | |
} | |
def as_string(self) -> str: | |
return f'{self.name}="{self.value}"' | |
class AlertNameMatcher(Matcher): | |
""" | |
A Matcher which filters for the alerts with the given names. | |
""" | |
def __init__(self, alert_names: Set[str]): | |
super().__init__( | |
name="alertname", | |
is_value_regular_expresion=True, | |
value="({})".format("|".join(alert_names)), | |
) | |
class AlertLabelMatcher(Matcher): | |
""" | |
A Matcher which filters for the alerts with the given label name and value. | |
""" | |
def __init__(self, label_name: str, label_value: str): | |
super().__init__( | |
name=label_name, | |
value=label_value, | |
is_value_regular_expresion=True, | |
) | |
def _format_time(time: datetime.datetime) -> str: | |
""" | |
Format the given time in the format expected by the Alertmanager API. | |
""" | |
return time.replace(tzinfo=None).isoformat(timespec="seconds") | |
def silence_alertmanager( | |
*, | |
duration: datetime.timedelta, | |
matchers: Set[Matcher], | |
core_client: kubernetes.client.CoreV1Api, | |
) -> Optional[str]: | |
""" | |
Attempt to create an alert silence. | |
Silence creation may fail, e.g. if Alertmanager is down. If that occurs, | |
this function returns None. | |
:param duration: Silence lifetime | |
:param matchers: Matcher conditions which the silence will use to select alerts | |
:param core_client: Kubernetes API client | |
:return: The created silence's ID, or None if the creation failed | |
""" | |
start_time = datetime.datetime.utcnow() | |
end_time = start_time + duration | |
try: | |
response = proxy_request( | |
method="POST", | |
core_client=core_client, | |
namespace="monitoring", | |
path="api/v2/silences", | |
service="alertmanager-main:9093", | |
timeout=10, | |
json_request_body={ | |
"matchers": [m.as_dict() for m in matchers], | |
"startsAt": _format_time(start_time), | |
"endsAt": _format_time(end_time), | |
"createdBy": "k8s-infrastructure", | |
"comment": "k8s-infrastructure automated silence", | |
}, | |
) | |
response.raise_for_status() | |
silence_id = json.loads(response.read())["silenceID"] | |
return silence_id | |
except Exception: | |
return None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment