Last active
December 13, 2024 07:21
-
-
Save Lowess/3a71792d2d09e38bf8f524644bbf8349 to your computer and use it in GitHub Desktop.
Databricks Prometheus Integration
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
### Functions | |
function setup_databricks_prometheus() { | |
echo "Showing files in /databricks/spark/conf/*" | |
ls -al /databricks/spark/conf/ | |
cat /databricks/spark/conf/spark.properties | |
echo "Showing content of /databricks/spark/conf/metrics.properties" | |
sudo touch /databricks/spark/conf/metrics.properties | |
cat <<EOF | sudo tee /databricks/spark/conf/metrics.properties | |
# Enable Ganglia metrics | |
driver.sink.ganglia.class=org.apache.spark.metrics.sink.GangliaSink | |
*.sink.ganglia.port=8649 | |
*.sink.ganglia.mode=unicast | |
# Enable Prometheus metrics | |
*.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet | |
*.sink.prometheusServlet.path=/metrics/prometheus | |
master.sink.prometheusServlet.path=/metrics/master/prometheus | |
applications.sink.prometheusServlet.path=/metrics/applications/prometheus | |
EOF | |
sudo touch /databricks/spark/dbconf/log4j/master-worker/metrics.properties | |
cat <<EOF | sudo tee /databricks/spark/dbconf/log4j/master-worker/metrics.properties | |
# Enable Prometheus metrics | |
*.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet | |
*.sink.prometheusServlet.path=/metrics/prometheus | |
master.sink.prometheusServlet.path=/metrics/master/prometheus | |
applications.sink.prometheusServlet.path=/metrics/applications/prometheus | |
EOF | |
echo "Showing content of /databricks/spark/dbconf/log4j/master-worker/metrics.properties" | |
cat /databricks/spark/dbconf/log4j/master-worker/metrics.properties | |
echo "Showing SPARK_ related envvars" | |
env | grep "SPARK_" | |
echo "Local spark ip is: ${SPARK_LOCAL_IP:-'NONE'}" | |
} | |
### Main | |
setup_databricks_prometheus% |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import threading | |
import urllib.request | |
import logging | |
from time import sleep | |
__author__ = "Florian Dambrine <[email protected]>" | |
class DatabricksPushgatewayExporter: | |
""" | |
Pushgateway exporter to be used in Databricks notebooks | |
""" | |
__logger = logging.getLogger(__name__) | |
def __init__(self, job="python-test", frequency=2, pushgateway_endpoint="http://pushgateway.local"): | |
self._job = job | |
self._instance = os.getenv("SPARK_LOCAL_IP", "unknown") | |
self._frequency = frequency | |
# Configure pushgateway endpoint with job and instance | |
self._pushgateway = f"{pushgateway_endpoint}/metrics/job/{self._job}/instance/{self._instance}" | |
self._prometheus = self._get_local_prometheus() | |
# Threading | |
self._closed = False | |
self._reporter = threading.Thread(target=self._send_to_pushgateway) | |
self._reporter.start() | |
def __repr__(self) -> str: | |
return f"{__class__.__name__} initialiazed with prometheus endpoint {self._prometheus}" | |
def _get_local_prometheus(self): | |
ui_ip = os.getenv("SPARK_LOCAL_IP") | |
ui_port = sc.getConf().get('spark.ui.port') | |
return f"http://{ui_ip}:{ui_port}" | |
def _send_to_pushgateway(self): | |
while not self._closed: | |
metrics = None | |
try: | |
with urllib.request.urlopen(f"{self._prometheus}/metrics/prometheus") as response: | |
metrics = response.read() | |
except Exception: | |
self.__logger.exception(f"Failed retrieving metrics from {self._prometheus}") | |
if metrics: | |
self.__logger.debug(metrics) | |
req = urllib.request.Request(url=self._pushgateway, method='PUT', data=metrics) | |
try: | |
with urllib.request.urlopen(req) as f: | |
self.__logger.info(f"Successfully reported {len(metrics)} of bytes - {f.status} - {f.reason}") | |
except Exception: | |
self.__logger.exception(f"Failed reporting metrics to {self._pushgateway}") | |
sleep(self._frequency) | |
def shutdown(self) -> None: | |
""" | |
Shutdown and terminate the DatabricksPushgatewayReporter. | |
""" | |
self._closed = True | |
try: | |
self._reporter.join() | |
self.__logger.info(f"{self.__class__.__name__} thread {self._reporter} successfully joint") | |
except RuntimeError: | |
self.__logger.warning( | |
f"{self.__class__.__name__} future {self._reporter} shutdown timed out" | |
) | |
if __name__ == '__main__': | |
logging.basicConfig(format="%(asctime)s,%(msecs)03d %(levelname)8s %(name)s %(filename)s:%(lineno)d - %(message)s", level=logging.ERROR) | |
logging.getLogger('py4j').setLevel(logging.ERROR) | |
logging.getLogger(__name__).setLevel(logging.INFO) | |
dpe = DatabricksPushgatewayExporter() | |
print(dpe) | |
sleep(10) | |
dpe.shutdown() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment