Skip to content

Instantly share code, notes, and snippets.

@dhbrojas
Last active July 6, 2020 14:14
Show Gist options
  • Save dhbrojas/29cce628c315f72d661d39f3c7587eae to your computer and use it in GitHub Desktop.
Save dhbrojas/29cce628c315f72d661d39f3c7587eae to your computer and use it in GitHub Desktop.

Ray - Distributed training in Python

This Gist serves the sole purpose of condensing the Ray documentation in regards to Ray Core, Tune and RLlib. It was originally written for my personal use: as some quick notes to skim through. Moreover, the code is heavily if not entirely based on the Documentation snippets which can be found here

Ray Core

Description

Allows for easy multiprocessing and use of resources.

Code examples

During the initialisation of Ray you can specify the resources you wish to allocate for the main or driver process.

import ray

ray.init(num_cpus=8, num_gpus=1)

Ray introduces remote objects such as classes and functions which are asynchronous and nature. These objects run on separate processes and can share data using the Ray store. Remote functions are called through the .remote() method and return an ObjectId which refers to an element in the Ray store much like a promise in javascript. Remote functions are reffered to as Tasks and remote classes as Actors Calls like ray.get() or ray.wait() are used to wait for a "promise" or "future" to resolve.

A complete account of this can be found here

import ray

ray.init()

@ray.remote
function myRemoteFunction(value):
    return value + 2
    
objectId = myRemoteFunction.remote()

value = ray.get(objectId)

The snippet above fetches the return value of the function.

import ray
ray.init() # Only call this once.

@ray.remote
class Counter(object):
    def __init__(self):
        self.n = 0

    def increment(self):
        self.n += 1

    def read(self):
        return self.n

counters = [Counter.remote() for i in range(4)]
[c.increment.remote() for c in counters]
futures = [c.read.remote() for c in counters]
print(ray.get(futures)) # [1, 1, 1, 1]

The snippet above shows how to use python classes in Ray. We parallelize this class using actors. Finally this is an implementation from the Ray docs of a messaging system between processes.

import ray
import time

ray.init()

@ray.remote
class MessageActor(object):
    def __init__(self):
        self.messages = []

    def add_message(self, message):
        self.messages.append(message)

    def get_and_clear_messages(self):
        messages = self.messages
        self.messages = []
        return messages

@ray.remote
def worker(message_actor, j):
    for i in range(100):
        time.sleep(1)
        message_actor.add_message.remote(
            "Message {} from worker {}.".format(i, j))

message_actor = MessageActor.remote()

[worker.remote(message_actor, j) for j in range(3)]

for _ in range(100):
    new_messages = ray.get(message_actor.get_and_clear_messages.remote())
    print("New messages:", new_messages)
    time.sleep(1)

Tune

Description

Tune is a library built on top of Ray for experiment execution and hyperparameter tuning at any scale. Hyperparameter refers to the experiment parameters which are not determined by learning. For example the weights of a Neural Network are not hyperparams but the number of CPUs used is.

Tune has built in support for pytorch and tensorflow as well as a collection of built in algorithms. Tune is mainly used to conduct reports and identify the optimal hyperparameters also known as Hyperparameter Tuning.

alt text

The image above depicts a simple Tune workflow where you can setup an experiment, run it, optimise it and analyse the results.

Code examples

To conduct an experiment with Tune you must use the tune.run() call. As a paramater, it takes a Tune Trainable.

Here is the most basic example of a Tune trainable which takes a config object and reports data to during "training".

def trainable(config):
    # config (dict): A dict of hyperparameters.

    for x in range(20):
        score = objective(x, config["a"], config["b"])

        tune.report(score=score)  # This sends the score to Tune.

You can use Search algorithms to find the optimal hyperparameters. Tune also supports external optimisation algorithms.

configs {
    "a": tune.grid_search([0, 1, 2, 3]),
    "b": hp.uniform(0, 1),
}

You can use Tune Schedulers to filter ineffective or non-rewarding trials

from ray.tune.schedulers import HyperBandScheduler

# Create HyperBand scheduler and maximize score
hyperband = HyperBandScheduler(metric="score", mode="max")

# Execute 20 trials using HyperBand using a search space
configs = {"a": tune.uniform(0, 1), "b": tune.uniform(0, 1)}

tune.run(
    MyTrainableClass,
    config=configs,
    num_samples=20,
    scheduler=hyperband
)

The tune.run call returns an Analysis object from which you can extract valuable information.

analysis = tune.run(...)

# Obtain the best hyperparamaters from the trials ran
print(analysis.get_best_config(metric="score", mode="max"))
df = analysis.dataframe(metric="score", mode="max")

RLlib

Description

RLlib provides a scalable approach to Reinforcement Learning techniques. At its core, Rollout Workers query user-defined Policies to determin the action to be taken on the environment.

Those Policies are defined through Python Classes in symbiosis with RL frameworks such as Tensorflow and Pytorch

RL Workflow using RLlib

Code examples

The code below is entirely drawn from the Ray / Tune docs. With this example we can see that Tune stopped the processes with the least promising results, hence, optimising speed and resource use.

"""Tune / Pytorch implementation of Hyperparamter tuning"""

import numpy as np
import torch
import torch.optim as optim
import torch.nn as nn

from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import torch.nn.functional as F

from ray import tune
from ray.tune.schedulers import ASHAScheduler

# Definition of the Policy from a Pytorch nn.Module
class ConvNet(nn.Module):
    def __init__(self):
        super(ConvNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 3, kernel_size=3)
        self.fc = nn.Linear(192, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 3))
        x = x.view(-1, 192)
        x = self.fc(x)
        return F.log_softmax(x, dim=1)


# Change these values if you want the training to run quicker or slower.
EPOCH_SIZE = 512
TEST_SIZE = 256


def train(model, optimizer, train_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        # We set this just for the example to run quickly.
        if batch_idx * len(data) > EPOCH_SIZE:
            return
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()


def test(model, data_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(data_loader):
            # We set this just for the example to run quickly.
            if batch_idx * len(data) > TEST_SIZE:
                break
            data, target = data.to(device), target.to(device)
            outputs = model(data)
            _, predicted = torch.max(outputs.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()

    return correct / total


def train_mnist(config):
    # Data Setup
    mnist_transforms = transforms.Compose(
        [transforms.ToTensor(),
         transforms.Normalize((0.1307, ), (0.3081, ))])

    train_loader = DataLoader(
        datasets.MNIST("~/data", train=True, download=True,
                       transform=mnist_transforms),
        batch_size=64,
        shuffle=True)
    test_loader = DataLoader(
        datasets.MNIST("~/data", train=False, transform=mnist_transforms),
        batch_size=64,
        shuffle=True)

    model = ConvNet()
    optimizer = optim.SGD(
        model.parameters(), lr=config["lr"], momentum=config["momentum"])
    for i in range(10):
        train(model, optimizer, train_loader)
        acc = test(model, test_loader)

        # Send the current training result back to Tune
        tune.report(mean_accuracy=acc)

        if i % 5 == 0:
            # This saves the model to the trial directory
            torch.save(model.state_dict(), "./model.pth")


search_space = {
    "lr": tune.sample_from(lambda spec: 10**(-10 * np.random.rand())),
    "momentum": tune.uniform(0.1, 0.9)
}

# Uncomment this to enable distributed execution
# `ray.init(address="auto")`

# Download the dataset first
datasets.MNIST("~/data", train=True, download=True)

analysis = tune.run(
    train_mnist,
    num_samples=20,
    scheduler=ASHAScheduler(metric="mean_accuracy", mode="max"),
    config=search_space)

# Obtain a trial dataframe from all run trials of this `tune.run` call.
dfs = analysis.trial_dataframes
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment