Created
February 9, 2021 15:18
-
-
Save jcrist/237354299fe331e274e5f1ee691681a4 to your computer and use it in GitHub Desktop.
Garnet.ai client diff from dask-gateway 0.8.0
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/dask_gateway/client.py b/garnet/client.py | |
index db044b9..6f35ea1 100644 | |
--- a/dask_gateway/client.py | |
+++ b/garnet/client.py | |
@@ -27,25 +27,25 @@ from .utils import format_template, cancel_task | |
del comm | |
-__all__ = ("Gateway", "GatewayCluster", "GatewayClusterError", "GatewayServerError") | |
+__all__ = ("Garnet", "GarnetCluster", "GarnetClusterError", "GarnetServerError") | |
-class GatewayClusterError(Exception): | |
+class GarnetClusterError(Exception): | |
"""Exception related to starting/stopping/scaling of a gateway cluster""" | |
-class GatewayServerError(Exception): | |
- """Exception related to the operation of the gateway server. | |
+class GarnetServerError(Exception): | |
+ """Exception related to the operation of the garnet-gateway server. | |
- Indicates an internal error in the gateway server. | |
+ Indicates an internal error in the garnet-gateway server. | |
""" | |
-class GatewayWarning(UserWarning): | |
- """Warnings raised by the Gateway client""" | |
+class GarnetWarning(UserWarning): | |
+ """Warnings raised by the Garnet client""" | |
-class GatewaySecurity(Security): | |
+class GarnetSecurity(Security): | |
"""A security implementation that temporarily stores credentials on disk. | |
The normal ``Security`` class assumes credentials already exist on disk, | |
@@ -58,7 +58,7 @@ class GatewaySecurity(Security): | |
self.tls_cert = tls_cert | |
def __repr__(self): | |
- return "GatewaySecurity<...>" | |
+ return "GarnetSecurity<...>" | |
def get_connection_args(self, role): | |
with tempfile.TemporaryDirectory() as tempdir: | |
@@ -187,7 +187,7 @@ class ClusterReport(object): | |
return ( | |
None | |
if self.tls_key is None | |
- else GatewaySecurity(tls_key=self.tls_key, tls_cert=self.tls_cert) | |
+ else GarnetSecurity(tls_key=self.tls_key, tls_cert=self.tls_cert) | |
) | |
@classmethod | |
@@ -222,7 +222,7 @@ class ClusterReport(object): | |
def _get_default_request_kwargs(scheme): | |
proxy = proxy_auth = None | |
- http_proxy = format_template(dask.config.get("gateway.http-client.proxy")) | |
+ http_proxy = format_template(dask.config.get("garnet.http-client.proxy")) | |
if http_proxy is True: | |
proxies = aiohttp.helpers.proxies_from_env() | |
info = proxies.get(scheme) | |
@@ -236,23 +236,48 @@ def _get_default_request_kwargs(scheme): | |
return {"proxy": proxy, "proxy_auth": proxy_auth} | |
-class Gateway(object): | |
- """A client for a Dask Gateway Server. | |
+#New class for automatic initialization of garnet gateway client | |
+# class GarnetCloud: | |
+ | |
+# def __init__( | |
+# self, | |
+# address=None, | |
+# proxy_address=None, | |
+# public_address=None, | |
+# auth=None, | |
+# asynchronous=False, | |
+# loop=None, | |
+# ): | |
+ | |
+# self.address = address | |
+# self._public_address = public_address | |
+# self.proxy_address = proxy_address | |
+ | |
+# self.auth = get_auth(auth) | |
+# self._session = None | |
+ | |
+# self._asynchronous = asynchronous | |
+# self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous) | |
+# self._loop_runner.start() | |
+ | |
+ | |
+class PyNetes(object): | |
+ """A client for a Garnet gateway Server. | |
Parameters | |
---------- | |
address : str, optional | |
- The address to the gateway server. | |
+ The address to the garnet-gateway server. | |
proxy_address : str, int, optional | |
The address of the scheduler proxy server. Defaults to `address` if not | |
provided. If an int, it's used as the port, with the host/ip taken from | |
``address``. Provide a full address if a different host/ip should be | |
used. | |
public_address : str, optional | |
- The address to the gateway server, as accessible from a web browser. | |
+ The address to the garnet-gateway server, as accessible from a web browser. | |
This will be used as the root of all browser-facing links (e.g. the | |
dask dashboard). Defaults to ``address`` if not provided. | |
- auth : GatewayAuth, optional | |
+ auth : GarnetAuth, optional | |
The authentication method to use. | |
asynchronous : bool, optional | |
If true, starts the client in asynchronous mode, where it can be used | |
@@ -272,22 +297,22 @@ class Gateway(object): | |
loop=None, | |
): | |
if address is None: | |
- address = format_template(dask.config.get("gateway.address")) | |
+ address = format_template(dask.config.get("garnet.address")) | |
if address is None: | |
raise ValueError( | |
- "No dask-gateway address provided or found in configuration" | |
+ "No garnet-gateway address provided or found in configuration" | |
) | |
address = address.rstrip("/") | |
if public_address is None: | |
- public_address = format_template(dask.config.get("gateway.public-address")) | |
+ public_address = format_template(dask.config.get("garnet.public-address")) | |
if public_address is None: | |
public_address = address | |
else: | |
public_address = public_address.rstrip("/") | |
if proxy_address is None: | |
- proxy_address = format_template(dask.config.get("gateway.proxy-address")) | |
+ proxy_address = format_template(dask.config.get("garnet.proxy-address")) | |
if proxy_address is None: | |
parsed = urlparse(address) | |
if parsed.netloc: | |
@@ -342,7 +367,7 @@ class Gateway(object): | |
raise | |
def close(self): | |
- """Close the gateway client""" | |
+ """Close the garnet client""" | |
if self.asynchronous: | |
return self._cleanup() | |
elif self.loop.asyncio_loop.is_running(): | |
@@ -375,12 +400,12 @@ class Gateway(object): | |
self.close() | |
def __repr__(self): | |
- return "Gateway<%s>" % self.address | |
+ return "Garnet<%s>" % self.address | |
async def _request(self, method, url, json=None): | |
if self._session is None: | |
# "unsafe" allows cookies to be set for ip addresses, which can | |
- # commonly serve dask-gateway deployments. Since this client is | |
+ # commonly serve garnet-gateway deployments. Since this client is | |
# only ever used with a single endpoint, there is no danger of | |
# leaking cookies to a different server that happens to have the | |
# same ip. | |
@@ -408,9 +433,9 @@ class Gateway(object): | |
if resp.status in {404, 422}: | |
raise ValueError(msg) | |
elif resp.status == 409: | |
- raise GatewayClusterError(msg) | |
+ raise GarnetClusterError(msg) | |
elif resp.status == 500: | |
- raise GatewayServerError(msg) | |
+ raise GarnetServerError(msg) | |
else: | |
resp.raise_for_status() | |
else: | |
@@ -485,7 +510,7 @@ class Gateway(object): | |
return self.sync(self._get_versions) | |
def _config_cluster_options(self): | |
- opts = dask.config.get("gateway.cluster.options") | |
+ opts = dask.config.get("garnet.cluster.options") | |
return {k: format_template(v) for k, v in opts.items()} | |
async def _cluster_options(self, use_local_defaults=True): | |
@@ -508,13 +533,43 @@ class Gateway(object): | |
Returns | |
------- | |
- cluster_options : dask_gateway.options.Options | |
+ cluster_options : garnet.options.Options | |
A dict of cluster options. | |
""" | |
return self.sync( | |
self._cluster_options, use_local_defaults=use_local_defaults, **kwargs | |
) | |
+ # def cluster_configure(self, environment, worker_cores: int, worker_memory: str, scheduler_cores: int, scheduler_memory: str): | |
+ # """Configure cluster options based on user preferences. | |
+ # Usage: | |
+ | |
+ # config = garnet.cluster_configure(worker_cores=1, worker_memory=0.5, scheduler_cores=1, scheduler_memory=0.5, | |
+ # environment={"EXTRA_CONDA_PACKAGES":"bokeh>=2.1.1 s3fs -c conda-forge"}) | |
+ # """ | |
+ # options = self.cluster_options(self) | |
+ # options.worker_cores = worker_cores | |
+ # options.worker_memory = worker_memory | |
+ # options.scheduler_cores = scheduler_cores | |
+ # options.scheduler_memory = scheduler_memory | |
+ # options.environment = environment | |
+ # return options | |
+ | |
+ def cluster_configure(self, environment, minion_cores: int, minion_memory: str, master_cores: int, master_memory: str, **kwargs): | |
+ """Configure cluster options based on user preferences. | |
+ Usage: | |
+ | |
+ config = garnet.cluster_configure(minion_cores=1, minion_memory=0.5, master_cores=1, master_memory=0.5, | |
+ environment={"EXTRA_CONDA_PACKAGES":"bokeh>=2.1.1 s3fs -c conda-forge"}) | |
+ """ | |
+ options = self.cluster_options(self) | |
+ options.worker_cores = minion_cores | |
+ options.worker_memory = minion_memory | |
+ options.scheduler_cores = master_cores | |
+ options.scheduler_memory = master_memory | |
+ options.environment = environment | |
+ return options | |
+ | |
async def _submit(self, cluster_options=None, **kwargs): | |
url = "%s/api/v1/clusters/" % self.address | |
if cluster_options is not None: | |
@@ -540,12 +595,12 @@ class Gateway(object): | |
Parameters | |
---------- | |
- cluster_options : dask_gateway.options.Options, optional | |
+ cluster_options : garnet.options.Options, optional | |
An ``Options`` object describing the desired cluster configuration. | |
**kwargs : | |
Additional cluster configuration options. If ``cluster_options`` is | |
provided, these are applied afterwards as overrides. Available | |
- options are specific to each deployment of dask-gateway, see | |
+ options are specific to each deployment of garnet-gateway, see | |
``cluster_options`` for more information. | |
Returns | |
@@ -573,12 +628,12 @@ class Gateway(object): | |
if report.status is ClusterStatus.RUNNING: | |
return report | |
elif report.status is ClusterStatus.FAILED: | |
- raise GatewayClusterError( | |
+ raise GarnetClusterError( | |
"Cluster %r failed to start, see logs for " | |
"more information" % cluster_name | |
) | |
elif report.status is ClusterStatus.STOPPED: | |
- raise GatewayClusterError( | |
+ raise GarnetClusterError( | |
"Cluster %r is already stopped" % cluster_name | |
) | |
# Not started yet, try again later | |
@@ -597,9 +652,9 @@ class Gateway(object): | |
Returns | |
------- | |
- cluster : GatewayCluster | |
+ cluster : GarnetCluster | |
""" | |
- return GatewayCluster.from_name( | |
+ return GarnetCluster.from_name( | |
cluster_name, | |
shutdown_on_close=shutdown_on_close, | |
address=self.address, | |
@@ -609,14 +664,14 @@ class Gateway(object): | |
loop=self.loop, | |
) | |
- def new_cluster(self, cluster_options=None, shutdown_on_close=True, **kwargs): | |
+ def cluster_launch(self, cluster_options=None, shutdown_on_close=True, **kwargs): | |
"""Submit a new cluster to the gateway, and wait for it to be started. | |
Same as calling ``submit`` and ``connect`` in one go. | |
Parameters | |
---------- | |
- cluster_options : dask_gateway.options.Options, optional | |
+ cluster_options : garnet.options.Options, optional | |
An ``Options`` object describing the desired cluster configuration. | |
shutdown_on_close : bool, optional | |
If True (default), the cluster will be automatically shutdown on | |
@@ -625,14 +680,14 @@ class Gateway(object): | |
**kwargs : | |
Additional cluster configuration options. If ``cluster_options`` is | |
provided, these are applied afterwards as overrides. Available | |
- options are specific to each deployment of dask-gateway, see | |
+ options are specific to each deployment of garnet-gateway, see | |
``cluster_options`` for more information. | |
Returns | |
------- | |
- cluster : GatewayCluster | |
+ cluster : GarnetCluster | |
""" | |
- return GatewayCluster( | |
+ return GarnetCluster( | |
address=self.address, | |
proxy_address=self.proxy_address, | |
auth=self.auth, | |
@@ -665,7 +720,7 @@ class Gateway(object): | |
except Exception: | |
msg = {} | |
if not msg.get("ok", True) and msg.get("msg"): | |
- warnings.warn(GatewayWarning(msg["msg"])) | |
+ warnings.warn(GarnetWarning(msg["msg"])) | |
def scale_cluster(self, cluster_name, n, **kwargs): | |
"""Scale a cluster to n workers. | |
@@ -692,7 +747,7 @@ class Gateway(object): | |
except Exception: | |
msg = {} | |
if not msg.get("ok", True) and msg.get("msg"): | |
- warnings.warn(GatewayWarning(msg["msg"])) | |
+ warnings.warn(GarnetWarning(msg["msg"])) | |
def adapt_cluster( | |
self, cluster_name, minimum=None, maximum=None, active=True, **kwargs | |
@@ -747,7 +802,7 @@ _widget_status_template = """ | |
@atexit.register | |
def cleanup_lingering_clusters(): | |
- for o in list(GatewayCluster._instances): | |
+ for o in list(GarnetCluster._instances): | |
try: | |
if not o.asynchronous: | |
o.close() | |
@@ -757,8 +812,8 @@ def cleanup_lingering_clusters(): | |
warnings.warn("".join(lines)) | |
-class GatewayCluster(object): | |
- """A dask-gateway cluster. | |
+class GarnetCluster(object): | |
+ """A Garnet Cloud cluster. | |
Parameters | |
---------- | |
@@ -772,7 +827,7 @@ class GatewayCluster(object): | |
The address to the gateway server, as accessible from a web browser. | |
This will be used as the root of all browser-facing links (e.g. the | |
dask dashboard). Defaults to ``address`` if not provided. | |
- auth : GatewayAuth, optional | |
+ auth : GarnetAuth, optional | |
The authentication method to use. | |
cluster_options : mapping, optional | |
A mapping of cluster options to use to start the cluster. | |
@@ -788,8 +843,8 @@ class GatewayCluster(object): | |
**kwargs : | |
Additional cluster configuration options. If ``cluster_options`` is | |
provided, these are applied afterwards as overrides. Available options | |
- are specific to each deployment of dask-gateway, see | |
- ``Gateway.cluster_options`` for more information. | |
+ are specific to each deployment of garnet, see | |
+ ``Garnet.cluster_options`` for more information. | |
""" | |
_instances = weakref.WeakSet() | |
@@ -840,12 +895,12 @@ class GatewayCluster(object): | |
If True, the cluster will be automatically shutdown on close. | |
Default is False. | |
**kwargs : | |
- Additional parameters to pass to the ``GatewayCluster`` | |
+ Additional parameters to pass to the ``GarnetCluster`` | |
constructor. See the docstring for more information. | |
Returns | |
------- | |
- cluster : GatewayCluster | |
+ cluster : GarnetCluster | |
""" | |
self = object.__new__(cls) | |
self._init_internal( | |
@@ -879,7 +934,7 @@ class GatewayCluster(object): | |
self._instances.add(self) | |
- self.gateway = Gateway( | |
+ self.garnet = PyNetes( | |
address=address, | |
proxy_address=proxy_address, | |
public_address=public_address, | |
@@ -911,15 +966,15 @@ class GatewayCluster(object): | |
if name is not None: | |
self.status = "starting" | |
if not self.asynchronous: | |
- self.gateway.sync(self._start_internal) | |
+ self.garnet.sync(self._start_internal) | |
@property | |
def loop(self): | |
- return self.gateway.loop | |
+ return self.garnet.loop | |
@property | |
def asynchronous(self): | |
- return self.gateway.asynchronous | |
+ return self.garnet.asynchronous | |
async def _start_internal(self): | |
if self._start_task is None: | |
@@ -938,13 +993,13 @@ class GatewayCluster(object): | |
# Start cluster if not already started | |
if self.status == "created": | |
self.status = "starting" | |
- self.name = await self.gateway._submit( | |
+ self.name = await self.garnet._submit( | |
cluster_options=self._cluster_options, **self._cluster_kwargs | |
) | |
# Connect to cluster | |
try: | |
- report = await self.gateway._wait_for_start(self.name) | |
- except GatewayClusterError: | |
+ report = await self.garnet._wait_for_start(self.name) | |
+ except GarnetClusterError: | |
raise | |
self.scheduler_address = report.scheduler_address | |
self.dashboard_link = report.dashboard_link | |
@@ -980,9 +1035,9 @@ class GatewayCluster(object): | |
shutdown = self.shutdown_on_close | |
if shutdown and self.name is not None: | |
- await self.gateway._stop_cluster(self.name) | |
+ await self.garnet._stop_cluster(self.name) | |
- await self.gateway._cleanup() | |
+ await self.garnet._cleanup() | |
async def _stop_async(self): | |
if self._start_task is not None: | |
@@ -1027,10 +1082,10 @@ class GatewayCluster(object): | |
if self.status == "closed": | |
return | |
if self.loop.asyncio_loop.is_running(): | |
- self.gateway.sync(self._stop_internal, shutdown=shutdown) | |
- self.gateway.close() | |
+ self.garnet.sync(self._stop_internal, shutdown=shutdown) | |
+ self.garnet.close() | |
- def shutdown(self): | |
+ def teardown(self): | |
"""Shutdown this cluster. Alias for ``close(shutdown=True)``.""" | |
return self.close(shutdown=True) | |
@@ -1042,7 +1097,7 @@ class GatewayCluster(object): | |
self.close() | |
def __del__(self): | |
- if not hasattr(self, "gateway"): | |
+ if not hasattr(self, "garnet"): | |
return | |
if self.asynchronous: | |
# No del for async mode | |
@@ -1051,7 +1106,7 @@ class GatewayCluster(object): | |
self.close() | |
def __repr__(self): | |
- return "GatewayCluster<%s, status=%s>" % (self.name, self.status) | |
+ return "GarnetCluster<%s, status=%s>" % (self.name, self.status) | |
def get_client(self, set_as_default=True): | |
"""Get a ``Client`` for this cluster. | |
@@ -1071,7 +1126,7 @@ class GatewayCluster(object): | |
self._clients.add(client) | |
return client | |
- def scale(self, n, **kwargs): | |
+ def resize(self, n, **kwargs): | |
"""Scale the cluster to ``n`` workers. | |
Parameters | |
@@ -1079,7 +1134,7 @@ class GatewayCluster(object): | |
n : int | |
The number of workers to scale to. | |
""" | |
- return self.gateway.scale_cluster(self.name, n, **kwargs) | |
+ return self.garnet.scale_cluster(self.name, n, **kwargs) | |
def adapt(self, minimum=None, maximum=None, active=True, **kwargs): | |
"""Configure adaptive scaling for the cluster. | |
@@ -1094,7 +1149,7 @@ class GatewayCluster(object): | |
If ``True`` (default), adaptive scaling is activated. Set to | |
``False`` to deactivate adaptive scaling. | |
""" | |
- return self.gateway.adapt_cluster( | |
+ return self.garnet.adapt_cluster( | |
self.name, minimum=minimum, maximum=maximum, active=active, **kwargs | |
) | |
@@ -1157,7 +1212,7 @@ class GatewayCluster(object): | |
layout = Layout(width="150px") | |
- title = HTML("<h2>GatewayCluster</h2>") | |
+ title = HTML("<h2>GarnetCluster</h2>") | |
status = HTML(self._widget_status(), layout=Layout(min_width="150px")) | |
@@ -1221,7 +1276,7 @@ class GatewayCluster(object): | |
return ( | |
"<div style='background-color: #f2f2f2; display: inline-block; " | |
"padding: 10px; border: 1px solid #999999;'>\n" | |
- " <h3>GatewayCluster</h3>\n" | |
+ " <h3>GarnetCluster</h3>\n" | |
" <ul>\n" | |
" <li><b>Name: </b>{name}\n" | |
" <li><b>Dashboard: </b>{dashboard}\n" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment