Skip to content

Instantly share code, notes, and snippets.

@jcrist
Created February 9, 2021 15:18
Show Gist options
  • Save jcrist/237354299fe331e274e5f1ee691681a4 to your computer and use it in GitHub Desktop.
Save jcrist/237354299fe331e274e5f1ee691681a4 to your computer and use it in GitHub Desktop.
Garnet.ai client diff from dask-gateway 0.8.0
diff --git a/dask_gateway/client.py b/garnet/client.py
index db044b9..6f35ea1 100644
--- a/dask_gateway/client.py
+++ b/garnet/client.py
@@ -27,25 +27,25 @@ from .utils import format_template, cancel_task
del comm
-__all__ = ("Gateway", "GatewayCluster", "GatewayClusterError", "GatewayServerError")
+__all__ = ("Garnet", "GarnetCluster", "GarnetClusterError", "GarnetServerError")
-class GatewayClusterError(Exception):
+class GarnetClusterError(Exception):
"""Exception related to starting/stopping/scaling of a gateway cluster"""
-class GatewayServerError(Exception):
- """Exception related to the operation of the gateway server.
+class GarnetServerError(Exception):
+ """Exception related to the operation of the garnet-gateway server.
- Indicates an internal error in the gateway server.
+ Indicates an internal error in the garnet-gateway server.
"""
-class GatewayWarning(UserWarning):
- """Warnings raised by the Gateway client"""
+class GarnetWarning(UserWarning):
+ """Warnings raised by the Garnet client"""
-class GatewaySecurity(Security):
+class GarnetSecurity(Security):
"""A security implementation that temporarily stores credentials on disk.
The normal ``Security`` class assumes credentials already exist on disk,
@@ -58,7 +58,7 @@ class GatewaySecurity(Security):
self.tls_cert = tls_cert
def __repr__(self):
- return "GatewaySecurity<...>"
+ return "GarnetSecurity<...>"
def get_connection_args(self, role):
with tempfile.TemporaryDirectory() as tempdir:
@@ -187,7 +187,7 @@ class ClusterReport(object):
return (
None
if self.tls_key is None
- else GatewaySecurity(tls_key=self.tls_key, tls_cert=self.tls_cert)
+ else GarnetSecurity(tls_key=self.tls_key, tls_cert=self.tls_cert)
)
@classmethod
@@ -222,7 +222,7 @@ class ClusterReport(object):
def _get_default_request_kwargs(scheme):
proxy = proxy_auth = None
- http_proxy = format_template(dask.config.get("gateway.http-client.proxy"))
+ http_proxy = format_template(dask.config.get("garnet.http-client.proxy"))
if http_proxy is True:
proxies = aiohttp.helpers.proxies_from_env()
info = proxies.get(scheme)
@@ -236,23 +236,48 @@ def _get_default_request_kwargs(scheme):
return {"proxy": proxy, "proxy_auth": proxy_auth}
-class Gateway(object):
- """A client for a Dask Gateway Server.
+#New class for automatic initialization of garnet gateway client
+# class GarnetCloud:
+
+# def __init__(
+# self,
+# address=None,
+# proxy_address=None,
+# public_address=None,
+# auth=None,
+# asynchronous=False,
+# loop=None,
+# ):
+
+# self.address = address
+# self._public_address = public_address
+# self.proxy_address = proxy_address
+
+# self.auth = get_auth(auth)
+# self._session = None
+
+# self._asynchronous = asynchronous
+# self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous)
+# self._loop_runner.start()
+
+
+class PyNetes(object):
+ """A client for a Garnet gateway Server.
Parameters
----------
address : str, optional
- The address to the gateway server.
+ The address to the garnet-gateway server.
proxy_address : str, int, optional
The address of the scheduler proxy server. Defaults to `address` if not
provided. If an int, it's used as the port, with the host/ip taken from
``address``. Provide a full address if a different host/ip should be
used.
public_address : str, optional
- The address to the gateway server, as accessible from a web browser.
+ The address to the garnet-gateway server, as accessible from a web browser.
This will be used as the root of all browser-facing links (e.g. the
dask dashboard). Defaults to ``address`` if not provided.
- auth : GatewayAuth, optional
+ auth : GarnetAuth, optional
The authentication method to use.
asynchronous : bool, optional
If true, starts the client in asynchronous mode, where it can be used
@@ -272,22 +297,22 @@ class Gateway(object):
loop=None,
):
if address is None:
- address = format_template(dask.config.get("gateway.address"))
+ address = format_template(dask.config.get("garnet.address"))
if address is None:
raise ValueError(
- "No dask-gateway address provided or found in configuration"
+ "No garnet-gateway address provided or found in configuration"
)
address = address.rstrip("/")
if public_address is None:
- public_address = format_template(dask.config.get("gateway.public-address"))
+ public_address = format_template(dask.config.get("garnet.public-address"))
if public_address is None:
public_address = address
else:
public_address = public_address.rstrip("/")
if proxy_address is None:
- proxy_address = format_template(dask.config.get("gateway.proxy-address"))
+ proxy_address = format_template(dask.config.get("garnet.proxy-address"))
if proxy_address is None:
parsed = urlparse(address)
if parsed.netloc:
@@ -342,7 +367,7 @@ class Gateway(object):
raise
def close(self):
- """Close the gateway client"""
+ """Close the garnet client"""
if self.asynchronous:
return self._cleanup()
elif self.loop.asyncio_loop.is_running():
@@ -375,12 +400,12 @@ class Gateway(object):
self.close()
def __repr__(self):
- return "Gateway<%s>" % self.address
+ return "Garnet<%s>" % self.address
async def _request(self, method, url, json=None):
if self._session is None:
# "unsafe" allows cookies to be set for ip addresses, which can
- # commonly serve dask-gateway deployments. Since this client is
+ # commonly serve garnet-gateway deployments. Since this client is
# only ever used with a single endpoint, there is no danger of
# leaking cookies to a different server that happens to have the
# same ip.
@@ -408,9 +433,9 @@ class Gateway(object):
if resp.status in {404, 422}:
raise ValueError(msg)
elif resp.status == 409:
- raise GatewayClusterError(msg)
+ raise GarnetClusterError(msg)
elif resp.status == 500:
- raise GatewayServerError(msg)
+ raise GarnetServerError(msg)
else:
resp.raise_for_status()
else:
@@ -485,7 +510,7 @@ class Gateway(object):
return self.sync(self._get_versions)
def _config_cluster_options(self):
- opts = dask.config.get("gateway.cluster.options")
+ opts = dask.config.get("garnet.cluster.options")
return {k: format_template(v) for k, v in opts.items()}
async def _cluster_options(self, use_local_defaults=True):
@@ -508,13 +533,43 @@ class Gateway(object):
Returns
-------
- cluster_options : dask_gateway.options.Options
+ cluster_options : garnet.options.Options
A dict of cluster options.
"""
return self.sync(
self._cluster_options, use_local_defaults=use_local_defaults, **kwargs
)
+ # def cluster_configure(self, environment, worker_cores: int, worker_memory: str, scheduler_cores: int, scheduler_memory: str):
+ # """Configure cluster options based on user preferences.
+ # Usage:
+
+ # config = garnet.cluster_configure(worker_cores=1, worker_memory=0.5, scheduler_cores=1, scheduler_memory=0.5,
+ # environment={"EXTRA_CONDA_PACKAGES":"bokeh>=2.1.1 s3fs -c conda-forge"})
+ # """
+ # options = self.cluster_options(self)
+ # options.worker_cores = worker_cores
+ # options.worker_memory = worker_memory
+ # options.scheduler_cores = scheduler_cores
+ # options.scheduler_memory = scheduler_memory
+ # options.environment = environment
+ # return options
+
+ def cluster_configure(self, environment, minion_cores: int, minion_memory: str, master_cores: int, master_memory: str, **kwargs):
+ """Configure cluster options based on user preferences.
+ Usage:
+
+ config = garnet.cluster_configure(minion_cores=1, minion_memory=0.5, master_cores=1, master_memory=0.5,
+ environment={"EXTRA_CONDA_PACKAGES":"bokeh>=2.1.1 s3fs -c conda-forge"})
+ """
+ options = self.cluster_options(self)
+ options.worker_cores = minion_cores
+ options.worker_memory = minion_memory
+ options.scheduler_cores = master_cores
+ options.scheduler_memory = master_memory
+ options.environment = environment
+ return options
+
async def _submit(self, cluster_options=None, **kwargs):
url = "%s/api/v1/clusters/" % self.address
if cluster_options is not None:
@@ -540,12 +595,12 @@ class Gateway(object):
Parameters
----------
- cluster_options : dask_gateway.options.Options, optional
+ cluster_options : garnet.options.Options, optional
An ``Options`` object describing the desired cluster configuration.
**kwargs :
Additional cluster configuration options. If ``cluster_options`` is
provided, these are applied afterwards as overrides. Available
- options are specific to each deployment of dask-gateway, see
+ options are specific to each deployment of garnet-gateway, see
``cluster_options`` for more information.
Returns
@@ -573,12 +628,12 @@ class Gateway(object):
if report.status is ClusterStatus.RUNNING:
return report
elif report.status is ClusterStatus.FAILED:
- raise GatewayClusterError(
+ raise GarnetClusterError(
"Cluster %r failed to start, see logs for "
"more information" % cluster_name
)
elif report.status is ClusterStatus.STOPPED:
- raise GatewayClusterError(
+ raise GarnetClusterError(
"Cluster %r is already stopped" % cluster_name
)
# Not started yet, try again later
@@ -597,9 +652,9 @@ class Gateway(object):
Returns
-------
- cluster : GatewayCluster
+ cluster : GarnetCluster
"""
- return GatewayCluster.from_name(
+ return GarnetCluster.from_name(
cluster_name,
shutdown_on_close=shutdown_on_close,
address=self.address,
@@ -609,14 +664,14 @@ class Gateway(object):
loop=self.loop,
)
- def new_cluster(self, cluster_options=None, shutdown_on_close=True, **kwargs):
+ def cluster_launch(self, cluster_options=None, shutdown_on_close=True, **kwargs):
"""Submit a new cluster to the gateway, and wait for it to be started.
Same as calling ``submit`` and ``connect`` in one go.
Parameters
----------
- cluster_options : dask_gateway.options.Options, optional
+ cluster_options : garnet.options.Options, optional
An ``Options`` object describing the desired cluster configuration.
shutdown_on_close : bool, optional
If True (default), the cluster will be automatically shutdown on
@@ -625,14 +680,14 @@ class Gateway(object):
**kwargs :
Additional cluster configuration options. If ``cluster_options`` is
provided, these are applied afterwards as overrides. Available
- options are specific to each deployment of dask-gateway, see
+ options are specific to each deployment of garnet-gateway, see
``cluster_options`` for more information.
Returns
-------
- cluster : GatewayCluster
+ cluster : GarnetCluster
"""
- return GatewayCluster(
+ return GarnetCluster(
address=self.address,
proxy_address=self.proxy_address,
auth=self.auth,
@@ -665,7 +720,7 @@ class Gateway(object):
except Exception:
msg = {}
if not msg.get("ok", True) and msg.get("msg"):
- warnings.warn(GatewayWarning(msg["msg"]))
+ warnings.warn(GarnetWarning(msg["msg"]))
def scale_cluster(self, cluster_name, n, **kwargs):
"""Scale a cluster to n workers.
@@ -692,7 +747,7 @@ class Gateway(object):
except Exception:
msg = {}
if not msg.get("ok", True) and msg.get("msg"):
- warnings.warn(GatewayWarning(msg["msg"]))
+ warnings.warn(GarnetWarning(msg["msg"]))
def adapt_cluster(
self, cluster_name, minimum=None, maximum=None, active=True, **kwargs
@@ -747,7 +802,7 @@ _widget_status_template = """
@atexit.register
def cleanup_lingering_clusters():
- for o in list(GatewayCluster._instances):
+ for o in list(GarnetCluster._instances):
try:
if not o.asynchronous:
o.close()
@@ -757,8 +812,8 @@ def cleanup_lingering_clusters():
warnings.warn("".join(lines))
-class GatewayCluster(object):
- """A dask-gateway cluster.
+class GarnetCluster(object):
+ """A Garnet Cloud cluster.
Parameters
----------
@@ -772,7 +827,7 @@ class GatewayCluster(object):
The address to the gateway server, as accessible from a web browser.
This will be used as the root of all browser-facing links (e.g. the
dask dashboard). Defaults to ``address`` if not provided.
- auth : GatewayAuth, optional
+ auth : GarnetAuth, optional
The authentication method to use.
cluster_options : mapping, optional
A mapping of cluster options to use to start the cluster.
@@ -788,8 +843,8 @@ class GatewayCluster(object):
**kwargs :
Additional cluster configuration options. If ``cluster_options`` is
provided, these are applied afterwards as overrides. Available options
- are specific to each deployment of dask-gateway, see
- ``Gateway.cluster_options`` for more information.
+ are specific to each deployment of garnet, see
+ ``Garnet.cluster_options`` for more information.
"""
_instances = weakref.WeakSet()
@@ -840,12 +895,12 @@ class GatewayCluster(object):
If True, the cluster will be automatically shutdown on close.
Default is False.
**kwargs :
- Additional parameters to pass to the ``GatewayCluster``
+ Additional parameters to pass to the ``GarnetCluster``
constructor. See the docstring for more information.
Returns
-------
- cluster : GatewayCluster
+ cluster : GarnetCluster
"""
self = object.__new__(cls)
self._init_internal(
@@ -879,7 +934,7 @@ class GatewayCluster(object):
self._instances.add(self)
- self.gateway = Gateway(
+ self.garnet = PyNetes(
address=address,
proxy_address=proxy_address,
public_address=public_address,
@@ -911,15 +966,15 @@ class GatewayCluster(object):
if name is not None:
self.status = "starting"
if not self.asynchronous:
- self.gateway.sync(self._start_internal)
+ self.garnet.sync(self._start_internal)
@property
def loop(self):
- return self.gateway.loop
+ return self.garnet.loop
@property
def asynchronous(self):
- return self.gateway.asynchronous
+ return self.garnet.asynchronous
async def _start_internal(self):
if self._start_task is None:
@@ -938,13 +993,13 @@ class GatewayCluster(object):
# Start cluster if not already started
if self.status == "created":
self.status = "starting"
- self.name = await self.gateway._submit(
+ self.name = await self.garnet._submit(
cluster_options=self._cluster_options, **self._cluster_kwargs
)
# Connect to cluster
try:
- report = await self.gateway._wait_for_start(self.name)
- except GatewayClusterError:
+ report = await self.garnet._wait_for_start(self.name)
+ except GarnetClusterError:
raise
self.scheduler_address = report.scheduler_address
self.dashboard_link = report.dashboard_link
@@ -980,9 +1035,9 @@ class GatewayCluster(object):
shutdown = self.shutdown_on_close
if shutdown and self.name is not None:
- await self.gateway._stop_cluster(self.name)
+ await self.garnet._stop_cluster(self.name)
- await self.gateway._cleanup()
+ await self.garnet._cleanup()
async def _stop_async(self):
if self._start_task is not None:
@@ -1027,10 +1082,10 @@ class GatewayCluster(object):
if self.status == "closed":
return
if self.loop.asyncio_loop.is_running():
- self.gateway.sync(self._stop_internal, shutdown=shutdown)
- self.gateway.close()
+ self.garnet.sync(self._stop_internal, shutdown=shutdown)
+ self.garnet.close()
- def shutdown(self):
+ def teardown(self):
"""Shutdown this cluster. Alias for ``close(shutdown=True)``."""
return self.close(shutdown=True)
@@ -1042,7 +1097,7 @@ class GatewayCluster(object):
self.close()
def __del__(self):
- if not hasattr(self, "gateway"):
+ if not hasattr(self, "garnet"):
return
if self.asynchronous:
# No del for async mode
@@ -1051,7 +1106,7 @@ class GatewayCluster(object):
self.close()
def __repr__(self):
- return "GatewayCluster<%s, status=%s>" % (self.name, self.status)
+ return "GarnetCluster<%s, status=%s>" % (self.name, self.status)
def get_client(self, set_as_default=True):
"""Get a ``Client`` for this cluster.
@@ -1071,7 +1126,7 @@ class GatewayCluster(object):
self._clients.add(client)
return client
- def scale(self, n, **kwargs):
+ def resize(self, n, **kwargs):
"""Scale the cluster to ``n`` workers.
Parameters
@@ -1079,7 +1134,7 @@ class GatewayCluster(object):
n : int
The number of workers to scale to.
"""
- return self.gateway.scale_cluster(self.name, n, **kwargs)
+ return self.garnet.scale_cluster(self.name, n, **kwargs)
def adapt(self, minimum=None, maximum=None, active=True, **kwargs):
"""Configure adaptive scaling for the cluster.
@@ -1094,7 +1149,7 @@ class GatewayCluster(object):
If ``True`` (default), adaptive scaling is activated. Set to
``False`` to deactivate adaptive scaling.
"""
- return self.gateway.adapt_cluster(
+ return self.garnet.adapt_cluster(
self.name, minimum=minimum, maximum=maximum, active=active, **kwargs
)
@@ -1157,7 +1212,7 @@ class GatewayCluster(object):
layout = Layout(width="150px")
- title = HTML("<h2>GatewayCluster</h2>")
+ title = HTML("<h2>GarnetCluster</h2>")
status = HTML(self._widget_status(), layout=Layout(min_width="150px"))
@@ -1221,7 +1276,7 @@ class GatewayCluster(object):
return (
"<div style='background-color: #f2f2f2; display: inline-block; "
"padding: 10px; border: 1px solid #999999;'>\n"
- " <h3>GatewayCluster</h3>\n"
+ " <h3>GarnetCluster</h3>\n"
" <ul>\n"
" <li><b>Name: </b>{name}\n"
" <li><b>Dashboard: </b>{dashboard}\n"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment