Skip to content

Instantly share code, notes, and snippets.

@johnou
Created February 27, 2017 17:41
Show Gist options
  • Save johnou/896a862c5612698b0679ea97d3a8d453 to your computer and use it in GitHub Desktop.
Save johnou/896a862c5612698b0679ea97d3a8d453 to your computer and use it in GitHub Desktop.
import cloud.orbit.actors.extensions.AbstractStorageExtension;
import cloud.orbit.actors.extensions.json.ActorReferenceModule;
import cloud.orbit.actors.runtime.DefaultDescriptorFactory;
import cloud.orbit.actors.runtime.RemoteReference;
import cloud.orbit.concurrent.Task;
import cloud.orbit.exception.UncheckedException;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
import com.lambdaworks.redis.ReadFrom;
import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.api.StatefulRedisConnection;
import com.lambdaworks.redis.api.async.RedisAsyncCommands;
import com.lambdaworks.redis.codec.Utf8StringCodec;
import com.lambdaworks.redis.masterslave.MasterSlave;
import com.lambdaworks.redis.masterslave.StatefulRedisMasterSlaveConnection;
import com.lambdaworks.redis.resource.ClientResources;
import com.lambdaworks.redis.resource.DefaultClientResources;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author Johno Crawford ([email protected])
*/
@Component
public class RedisStorageProvider extends AbstractStorageExtension {
private ExecutorService executorService;
private RedisClient client;
/**
* Lettuce itself does not block until a command is processed so we benefit
* from pipe-lining as commands are just written to the Redis server,
* as soon as Redis comes back with an response the command is completed.
*
* Lettuce auto-reconnects (transparently) so the connection is never "broken".
*/
private StatefulRedisConnection<String, String> connection;
private ObjectMapper mapper;
@Value("${redis.nodes:localhost:6379}")
private String redisNodes;
@Value("${redis.timeout:8000}")
private int timeout = 8000;
@Value("${orbit.forkJoinPool.parallelism:16}")
private int parallelism = 16;
@Override
public Task<Void> start() {
mapper = new ObjectMapper();
mapper.registerModule(new ActorReferenceModule(DefaultDescriptorFactory.get()));
mapper.registerModule(new AfterburnerModule());
mapper.setVisibility(mapper.getSerializationConfig().getDefaultVisibilityChecker().
withFieldVisibility(JsonAutoDetect.Visibility.ANY)
.withGetterVisibility(JsonAutoDetect.Visibility.NONE)
.withIsGetterVisibility(JsonAutoDetect.Visibility.NONE)
.withSetterVisibility(JsonAutoDetect.Visibility.NONE)
.withCreatorVisibility(JsonAutoDetect.Visibility.NONE));
ClientResources resources = DefaultClientResources.builder().ioThreadPoolSize(parallelism).computationThreadPoolSize(parallelism).build();
client = RedisClient.create(resources);
if (redisNodes.indexOf(',') > -1) {
List<RedisURI> nodes = new ArrayList<>();
for (String node : StringUtils.tokenizeToStringArray(redisNodes, ",", true, true)) {
nodes.add(buildRedisUri(node));
}
connection = MasterSlave.connect(client, new Utf8StringCodec(), nodes);
((StatefulRedisMasterSlaveConnection) connection).setReadFrom(ReadFrom.MASTER_PREFERRED);
} else {
client = RedisClient.create(resources, buildRedisUri(redisNodes));
connection = client.connect();
}
return Task.done();
}
@NotNull
private RedisURI buildRedisUri(String node) {
RedisURI uri = RedisURI.create("redis://" + node);
uri.setTimeout(timeout);
uri.setUnit(TimeUnit.MILLISECONDS);
return uri;
}
@Override
public Task<Void> clearState(final RemoteReference<?> reference, final Object state) {
RedisAsyncCommands<String, String> async = connection.async();
return Task.from(async.del(asKey(reference)).thenApplyAsync(result -> null, executorService));
}
@Override
public Task<Void> stop() {
try {
connection.close();
} finally {
client.shutdown(0, 15, TimeUnit.SECONDS);
}
return Task.done();
}
@Override
@SuppressWarnings("unchecked")
public Task<Boolean> readState(final RemoteReference<?> reference, final Object state) {
RedisAsyncCommands<String, String> async = connection.async();
return Task.from(async.get(asKey(reference)).thenApplyAsync(data -> {
if (data != null) {
try {
mapper.readerForUpdating(state).readValue(data);
return true;
} catch (Exception e) {
throw new UncheckedException("Error parsing Redis response: " + data, e);
}
}
return false;
}, executorService)); // ensure actor activation continues in the Orbit thread pool.
}
@Override
@SuppressWarnings("unchecked")
public Task<Void> writeState(final RemoteReference<?> reference, final Object state) {
String data;
try {
data = mapper.writeValueAsString(state);
} catch (JsonProcessingException e) {
throw new UncheckedException(e);
}
RedisAsyncCommands<String, String> async = connection.async();
return Task.from(async.set(asKey(reference), data).thenApplyAsync(result -> null, executorService));
}
private static final String KEY_SEPARATOR = "|";
private String asKey(final RemoteReference<?> reference) {
return RemoteReference.getInterfaceClass(reference).getSimpleName() + KEY_SEPARATOR + String.valueOf(RemoteReference.getId(reference));
}
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment