Skip to content

Instantly share code, notes, and snippets.

@smurching
Last active June 1, 2020 05:50
Show Gist options
  • Save smurching/f00e3b771b2956da7281c427b5f5a681 to your computer and use it in GitHub Desktop.
Save smurching/f00e3b771b2956da7281c427b5f5a681 to your computer and use it in GitHub Desktop.
Model deployment API example
"""
Core deployment plugin API methods.
These methods declared under ``mlflow.deployments``, with an analogous ``mlflow deployments`` CLI
"""
from abc import ABC
class BaseDeploymentClient(ABC):
"""
Base class exposing Python model deployment APIs. Plugin implementors should define target-specific
deployment logic via a subclass of ``BaseDeploymentClient`` named "DeploymentClient" within their plugin
module, and customize the method docstrings of their subclass so that users can understand target-specific
config via help(client.create_deployment) etc.
"""
def __init__(self, target_uri):
self.target_uri = target_uri
def create_deployment(self, name, model_uri, flavor=None, config=None):
"""
Deploy a model to the specified target. By default, this method should block until
deployment completes (i.e. until it's possible to perform inference with the deployment).
In the case of conflicts (e.g. if it's not possible to create the specified deployment without
due to conflict with an existing deployment), raises a `:py:class:mlflow.exceptions.MlflowException`
See target-specific plugin documentation for additional detail on support for asynchronous
deployment and other configuration.
:param name: Unique name to use for deployment. If another deployment exists with the same name,
create_deployment will raise a `:py:class:mlflow.exceptions.MlflowException`
:param model_uri: URI of model to deploy
:param flavor: (optional) Model flavor to deploy. If unspecified, a default flavor will be chosen.
:param config: (optional) Dict containing updated target-specific configuration for the deployment
:return: Dict corresponding to created deployment, which must contain the 'name' key.
"""
pass
def update_deployment(self, target, name, model_uri=None, flavor=None, config=None):
"""
Update the deployment with the specified name. You can update the
URI of the model, the flavor of the deployed model (in which case the model URI must also
be specified), and/or any target-specific attributes of the deployment (via `config`).
By default, this method should block until deployment completes (i.e. until it's possible
to perform inference with the updated deployment).
See target-specific plugin documentation for additional detail on support for asynchronous
deployment and other configuration.
:param name: Unique name of deployment to update
:param model_uri: URI of a new model to deploy.
:param flavor: (optional) new model flavor to use for deployment. If provided, `model_uri` must also be
specified. If `flavor` is unspecified but `model_uri` is specified, a default flavor
will be chosen and the deployment will be updated using that flavor.
:param config: (optional) dict containing updated target-specific configuration for the deployment
:return: None
"""
pass
def delete_deployment(self, name):
"""
Delete the deployment with name `name` from the specified target. Deletion should be
idempotent (i.e. deletion should not fail if retried on a non-existent deployment).
:param name: Name of deployment to delete
:return: None
"""
pass
def get_deployment(self, name):
"""
Returns a dictionary describing the specified deployment, throwing a
`py:class:mlflow.exception.MlflowException` if no deployment exists with the provided
ID.
The dict is guaranteed to contain an 'name' key containing the deployment name.
The other fields of the returned dictionary and their types may vary across
deployment targets.
:param name: ID of deployment to fetch
"""
pass
def list_deployments(self):
"""
List deployments. This method is expected to return an unpaginated list of all
deployments (an alternative would be to return a dict with a 'deployments' field
containing the actual deployments, with plugins able to specify other fields, e.g.
a next_page_token field, in the returned dictionary for pagination, and to accept
a `pagination_args` argument to this method for passing pagination-related args).
:return: A list of dicts corresponding to deployments. Each dict is guaranteed to
contain a 'name' key containing the deployment name. The other fields of
the returned dictionary and their types may vary across deployment targets.
"""
pass
def predict(self, deployment_name, df):
"""
Compute predictions on the pandas DataFrame ``df`` using the specified deployment. Note that the
input/output types of this method matches that of `mlflow pyfunc predict` (we accept
a pandas DataFrame as input and return either a pandas DataFrame, pandas Series,
or numpy array as output).
:param deployment_name: Name of deployment to predict against
:param df: Pandas DataFrame to use for inference
:return: A pandas DataFrame, pandas Series, or numpy array
"""
pass
# Exposed separately from the client object, as models deployed this way cannot be
# managed via the client update/get/delete/list APIs.
def run_local(model_uri, flavor=None, config=None):
"""
Deploys the specified model locally, for testing.
:param model_uri: URI of model to deploy
:param flavor: (optional) Model flavor to deploy. If unspecified, a default flavor will be chosen.
:param config: (optional) Dict containing updated target-specific configuration for the deployment
:return: None
"""
pass
def target_help():
"""
Return a string containing detailed documentation on the current deployment target, to be displayed
when users invoke the ``mlflow deployments help -t <target-name>`` CLI. This method should be defined
within the module specified by the plugin author.
The string should contain:
* An explanation of target-specific fields in the ``config`` passed to ``create_deployment``,
``update_deployment``
* How to specify a ``target_uri`` (e.g. for AWS SageMaker, ``target_uri``s have a scheme of
"sagemaker://<aws-cli-profile-name>", where aws-cli-profile-name is the name of an AWS
CLI profile https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html)
* Any other target-specific details.
"""
pass
def get_deploy_client(target_uri):
"""
Return an instance of a subclass of ``:py:class:BaseDeploymentClient`` that can be used to
deploy models to the specified target
:param: target_uri: URI of target to deploy to. Run ``mlflow deployments --help`` via the CLI
for more information on supported deployment targets
"""
pass
"""
Example of model deployment with Sagemaker
Setup steps (note that `create_deployment` will attempt to run steps 4-6 for the user if no mlflow-sagemaker
docker image is detected in the current user's ECR repository, see code below for details)
1. Install AWS CLI via `pip install awscli`
2. Configure AWS credentials locally via `aws configure`
3. Log into dockerhub via `docker login`
4. Run `docker pull mlflow/mlflow-sagemaker` (to pull a docker image for MLflow model deployment to Sagemaker locally)
5. Run `aws configure get region` to get the current region, and `aws sts get-caller-identity` to get the default AWS account name
6. Run `docker push <aws_account_id>.dkr.ecr.<region>.amazonaws.com/mlflow-sagemaker` to upload the image to your ECR repository
"""
# Test deployment locally first
from mlflow.deployments import run_local
run_local(target="sagemaker", model_uri="models:/faceToPotatoModel/prod")
# Deploy the latest prod version of a model that transforms human faces to
# potatos to Sagemaker. If no Sagemaker image is detected in ECR for the current user's AWS
# account, the `create_deployment` method will attempt to push an image to the current user's ECR
# account. If the user lacks permissions to push to ECR, create_deployment will fail & instruct the user
# to reach out to their AWS account admin to manually perform steps 4-6 above.
from mlflow.deployments import get_deploy_client
# Alternatively, get_deploy_client("sagemaker://aws-profile-name") to get a client for a non-default
# host
client = get_deploy_client("sagemaker")
help(client)
deployment = client.create_deployment(
name="faceToPotato", model_uri="models:/faceToPotatoModel/prod", config={"instance_type": "i3.xlarge"})
# Load & score a pandas DF with our newly created deployment
pandas_df = pd.read_csv("...")
prediction_df = client.predict(deployment_name="faceToPotato", df=pandas_df)
# Replace the model served by our existing sagemaker model with a new one
client.update_deployment(deployment_name="faceToPotato", model_uri="models:/anotherModel/prod")
# You can also use `update_deployment` to add a model to an existing Sagemaker deployment
# (analog of existing mlflow sagemaker deploy -m add)
client.update_deployment(name="faceToPotato", model_uri="models:/anotherModel/prod", config={"mode": "add"})
client.update_deployment(name="faceToPotato",
model_uri="models:/newPotatoModelForAbTest/prod",
config={"mode": "add"})
# Get our endpoint to double-check its properties (e.g. that it's running) before deleting it
deployment = client.get_deployment(name="faceToPotato")
print("Deployment status: %s" % deployment["EndpointStatus"])
# Delete the endpoint
client.delete_deployment(name="faceToPotato")
# List and delete all endpoints created via the deploy plugin.
# We'll use Sagemaker tags (e.g. a tag like "createdByMlflowDeployPlugin=true")
# to label endpoints created via the deploy plugin at creation time, and filter results while listing
all_deployments = client.list_deployments()
for dep in all_deployments:
client.delete_deployment(name=dep["name"])
"""Bare-bones example showcasing the deployment APIs with a hypothetical RedisAI plugin"""
from mlflow.deployments import get_deploy_client
client = get_deploy_client(target_uri="redisai")
# Deploy a model to RedisAI & get ID of resulting deployment
deployment = client.create_deployment(
name="faceToPotatoTransformer", model_uri="models:/...", config={"numReplicas": 2})
# Get a dictionary representation of the deployment we created earlier. The
# dictionary must contain an "name" field, and will likely contain other
# deployment-target-specific fields, e.g. the number of replicas in the deployment
deployment_dict = client.get_deployment(name="faceToPotatoTransformer")
# Load & score a pandas DF with our newly created deployment
pandas_df = pd.read_csv("...")
prediction_df = client.predict(deployment_name="faceToPotatoTransformer", df=pandas_df)
# List deployments to my RedisAI cluster, obtaining a list of dictionaries (one per deployment)
all_redis_deployments = client.list_deployments()
# Update our RedisAI deployment with a new model
client.update_deployment(target="redisai", name="faceToPotatoTransformer",
model_uri="models:/mynewmodel/1")
# Update our RedisAI deployment with a new model & flavor
client.update_deployment(
name="faceToPotatoTransformer",
model_uri="models:/mynewmodel/2", flavor="tensorflow")
# Update the number of replicas in our RedisAI deployment
mlflow.deployments.update_deployment(
name="faceToPotatoTransformer", config={"numReplicas": 3})
# Delete the deployment
mlflow.deployments.delete_deployment(name="faceToPotatoTransformer")
# CLI usage is pretty similar to the Python API. The main notable item is that we
# pass custom config options via repeated `-C key0=value0 -C key1=value` syntax`, e.g. one
# -C per key-value pair. This mimics how we pass parameters to the `mlflow run` CLI
mlflow deployments create -t redisai -m models:/myPotatoModel/production -C numReplicas=3
mlflow deployments help -t target
# For comparison:
mlflow run http://github.com/mlflow/mlflow-example -P alpha=0.1
mlflow deployments create -t kube -n spamDetector -m models:/spam/production
mlflow deployments predict -t kube -n spamDetector -f new-emails.json
"""
A 'typical' example showing use cases with an example kube plugin
Prereqs: Assumes the current user has credentials to talk to various kubernetes clusters
"""
from mlflow.deployments import get_deploy_client
# Deploy the latest staging version of a model that transforms human faces to
# potatos to staging k8s. Note that in the `config` argument we pass a YAML template
# to use for deployment - in practice, we may be able to define some default template for k8s deployments.
staging_client = get_deploy_client(target_uri="kube://staging-aws-us-west-2")
deployment = staging_client.create_deployment(
name="potato",
model_uri="models:/faceToPotatoModel/staging",
config={"numReplicas": 2, "template": "/mlflow-template.yaml"})
# Run a loop that repeatedly deploys the latest prod version of our face-to-potato
# model to staging k8s. This enables deployment rollouts/rollbacks based on the state
# of the model registry.
prod_client = get_deploy_client(target_uri="kube://prod-aws-us-west-2")
prod_deployment = prod_client.create_deployment(
name="potato", model_uri="models:/faceToPotatoModel/prod", config={"numReplicas": 2})
while True:
# sleep 5s then redeploy
time.sleep(5)
prod_client.update_deployment(
name="potato",
model_uri="models:/faceToPotatoModel/prod",
config={"numReplicas": 2})
# Find a deployment
all_deployments = prod_client.list_deployments()
my_potato_deployment = [dep in all_deployments if dep["numReplicas"] == 2][0]
### Deployment & rollbacks
### In practice, we might recommend users perform deployment & rollbacks via the model registry - deployment
### logic can simply apply the necessary model updates, with the state of which model versions are
### in production managed by the registry
# Get the deployment info to enable rollbacks
deployment = prod_client.get_deployment(name="potato")
old_config = dep_info["config"] # plugins may be encouraged to return config to enable this
# Update the deployment
new_config = old_config.copy()
new_config["numReplicas"] = 3
prod_client.update_deployment(name="potato", config=new_config)
# Now roll it back
prod_client.update_deployment(name="potato", config=old_config)

Sid's notes after writing out these code samples

  • API is a little verbose
    • Methods like create_deployment, list_deployments could just be named create, list, but then we'd risk shadowing Python's native list when users run from mlflow.deployments import *
  • Should we accept config arguments as kwargs or as a dict? Dictionary makes it easier to map from Python API to CLI command (i.e. you won't get confused about which kwargs should be passed via -C to the CLI), but is less friendly to the Python user
"""
Minimal example showing creating & predicting on a deployment
"""
from mlflow.deployments import get_deploy_client
# Deploy a model to kubernetes and use it to classify spam emails in real time
client = get_deploy_client("kube")
client.create_deployment(name="spamDetector", model_uri="models:/spam/production")
emails_df = pd.DataFrame([{"sender": "[email protected]", "contents": "..."}])
predictions = client.predict_deployment("spamDetector", emails_df)
print(predictions["isSpam"])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment