This Gist serves the sole purpose of condensing the Ray documentation in regards to Ray Core, Tune and RLlib. It was originally written for my personal use: as some quick notes to skim through. Moreover, the code is heavily if not entirely based on the Documentation snippets which can be found here
Allows for easy multiprocessing and use of resources.
During the initialisation of Ray you can specify the resources you wish to allocate for the main
or driver
process.
import ray
ray.init(num_cpus=8, num_gpus=1)
Ray introduces remote
objects such as classes and functions which are asynchronous and nature.
These objects run on separate processes and can share data using the Ray store.
Remote functions are called through the .remote()
method and return an ObjectId which refers to an element in the Ray store much like a promise in javascript.
Remote functions are reffered to as Tasks
and remote classes as Actors
Calls like ray.get()
or ray.wait()
are used to wait for a "promise" or "future" to resolve.
A complete account of this can be found here
import ray
ray.init()
@ray.remote
function myRemoteFunction(value):
return value + 2
objectId = myRemoteFunction.remote()
value = ray.get(objectId)
The snippet above fetches the return value of the function.
import ray
ray.init() # Only call this once.
@ray.remote
class Counter(object):
def __init__(self):
self.n = 0
def increment(self):
self.n += 1
def read(self):
return self.n
counters = [Counter.remote() for i in range(4)]
[c.increment.remote() for c in counters]
futures = [c.read.remote() for c in counters]
print(ray.get(futures)) # [1, 1, 1, 1]
The snippet above shows how to use python classes in Ray. We parallelize this class using actors. Finally this is an implementation from the Ray docs of a messaging system between processes.
import ray
import time
ray.init()
@ray.remote
class MessageActor(object):
def __init__(self):
self.messages = []
def add_message(self, message):
self.messages.append(message)
def get_and_clear_messages(self):
messages = self.messages
self.messages = []
return messages
@ray.remote
def worker(message_actor, j):
for i in range(100):
time.sleep(1)
message_actor.add_message.remote(
"Message {} from worker {}.".format(i, j))
message_actor = MessageActor.remote()
[worker.remote(message_actor, j) for j in range(3)]
for _ in range(100):
new_messages = ray.get(message_actor.get_and_clear_messages.remote())
print("New messages:", new_messages)
time.sleep(1)
Tune is a library built on top of Ray for experiment execution and hyperparameter tuning at any scale. Hyperparameter refers to the experiment parameters which are not determined by learning. For example the weights of a Neural Network are not hyperparams but the number of CPUs used is.
Tune has built in support for pytorch
and tensorflow
as well as a collection of built in algorithms.
Tune is mainly used to conduct reports and identify the optimal hyperparameters also known as Hyperparameter Tuning.
The image above depicts a simple Tune workflow where you can setup an experiment, run it, optimise it and analyse the results.
To conduct an experiment with Tune you must use the tune.run()
call.
As a paramater, it takes a Tune Trainable
.
Here is the most basic example of a Tune trainable which takes a config
object and reports
data to during "training".
def trainable(config):
# config (dict): A dict of hyperparameters.
for x in range(20):
score = objective(x, config["a"], config["b"])
tune.report(score=score) # This sends the score to Tune.
You can use Search algorithms
to find the optimal hyperparameters. Tune also supports external optimisation algorithms.
configs {
"a": tune.grid_search([0, 1, 2, 3]),
"b": hp.uniform(0, 1),
}
You can use Tune Schedulers
to filter ineffective or non-rewarding trials
from ray.tune.schedulers import HyperBandScheduler
# Create HyperBand scheduler and maximize score
hyperband = HyperBandScheduler(metric="score", mode="max")
# Execute 20 trials using HyperBand using a search space
configs = {"a": tune.uniform(0, 1), "b": tune.uniform(0, 1)}
tune.run(
MyTrainableClass,
config=configs,
num_samples=20,
scheduler=hyperband
)
The tune.run
call returns an Analysis
object from which you can extract valuable information.
analysis = tune.run(...)
# Obtain the best hyperparamaters from the trials ran
print(analysis.get_best_config(metric="score", mode="max"))
df = analysis.dataframe(metric="score", mode="max")
RLlib provides a scalable approach to Reinforcement Learning techniques.
At its core, Rollout Workers
query user-defined Policies
to determin the action to be taken on the environment.
Those Policies
are defined through Python Classes in symbiosis with RL frameworks such as Tensorflow and Pytorch
The code below is entirely drawn from the Ray / Tune docs. With this example we can see that Tune stopped the processes with the least promising results, hence, optimising speed and resource use.
"""Tune / Pytorch implementation of Hyperparamter tuning"""
import numpy as np
import torch
import torch.optim as optim
import torch.nn as nn
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import torch.nn.functional as F
from ray import tune
from ray.tune.schedulers import ASHAScheduler
# Definition of the Policy from a Pytorch nn.Module
class ConvNet(nn.Module):
def __init__(self):
super(ConvNet, self).__init__()
self.conv1 = nn.Conv2d(1, 3, kernel_size=3)
self.fc = nn.Linear(192, 10)
def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 3))
x = x.view(-1, 192)
x = self.fc(x)
return F.log_softmax(x, dim=1)
# Change these values if you want the training to run quicker or slower.
EPOCH_SIZE = 512
TEST_SIZE = 256
def train(model, optimizer, train_loader):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
# We set this just for the example to run quickly.
if batch_idx * len(data) > EPOCH_SIZE:
return
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
def test(model, data_loader):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.eval()
correct = 0
total = 0
with torch.no_grad():
for batch_idx, (data, target) in enumerate(data_loader):
# We set this just for the example to run quickly.
if batch_idx * len(data) > TEST_SIZE:
break
data, target = data.to(device), target.to(device)
outputs = model(data)
_, predicted = torch.max(outputs.data, 1)
total += target.size(0)
correct += (predicted == target).sum().item()
return correct / total
def train_mnist(config):
# Data Setup
mnist_transforms = transforms.Compose(
[transforms.ToTensor(),
transforms.Normalize((0.1307, ), (0.3081, ))])
train_loader = DataLoader(
datasets.MNIST("~/data", train=True, download=True,
transform=mnist_transforms),
batch_size=64,
shuffle=True)
test_loader = DataLoader(
datasets.MNIST("~/data", train=False, transform=mnist_transforms),
batch_size=64,
shuffle=True)
model = ConvNet()
optimizer = optim.SGD(
model.parameters(), lr=config["lr"], momentum=config["momentum"])
for i in range(10):
train(model, optimizer, train_loader)
acc = test(model, test_loader)
# Send the current training result back to Tune
tune.report(mean_accuracy=acc)
if i % 5 == 0:
# This saves the model to the trial directory
torch.save(model.state_dict(), "./model.pth")
search_space = {
"lr": tune.sample_from(lambda spec: 10**(-10 * np.random.rand())),
"momentum": tune.uniform(0.1, 0.9)
}
# Uncomment this to enable distributed execution
# `ray.init(address="auto")`
# Download the dataset first
datasets.MNIST("~/data", train=True, download=True)
analysis = tune.run(
train_mnist,
num_samples=20,
scheduler=ASHAScheduler(metric="mean_accuracy", mode="max"),
config=search_space)
# Obtain a trial dataframe from all run trials of this `tune.run` call.
dfs = analysis.trial_dataframes