Created
February 27, 2017 17:41
-
-
Save johnou/896a862c5612698b0679ea97d3a8d453 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cloud.orbit.actors.extensions.AbstractStorageExtension; | |
import cloud.orbit.actors.extensions.json.ActorReferenceModule; | |
import cloud.orbit.actors.runtime.DefaultDescriptorFactory; | |
import cloud.orbit.actors.runtime.RemoteReference; | |
import cloud.orbit.concurrent.Task; | |
import cloud.orbit.exception.UncheckedException; | |
import com.fasterxml.jackson.annotation.JsonAutoDetect; | |
import com.fasterxml.jackson.core.JsonProcessingException; | |
import com.fasterxml.jackson.databind.ObjectMapper; | |
import com.fasterxml.jackson.module.afterburner.AfterburnerModule; | |
import com.lambdaworks.redis.ReadFrom; | |
import com.lambdaworks.redis.RedisClient; | |
import com.lambdaworks.redis.RedisURI; | |
import com.lambdaworks.redis.api.StatefulRedisConnection; | |
import com.lambdaworks.redis.api.async.RedisAsyncCommands; | |
import com.lambdaworks.redis.codec.Utf8StringCodec; | |
import com.lambdaworks.redis.masterslave.MasterSlave; | |
import com.lambdaworks.redis.masterslave.StatefulRedisMasterSlaveConnection; | |
import com.lambdaworks.redis.resource.ClientResources; | |
import com.lambdaworks.redis.resource.DefaultClientResources; | |
import org.jetbrains.annotations.NotNull; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.stereotype.Component; | |
import org.springframework.util.StringUtils; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.TimeUnit; | |
/** | |
* @author Johno Crawford ([email protected]) | |
*/ | |
@Component | |
public class RedisStorageProvider extends AbstractStorageExtension { | |
private ExecutorService executorService; | |
private RedisClient client; | |
/** | |
* Lettuce itself does not block until a command is processed so we benefit | |
* from pipe-lining as commands are just written to the Redis server, | |
* as soon as Redis comes back with an response the command is completed. | |
* | |
* Lettuce auto-reconnects (transparently) so the connection is never "broken". | |
*/ | |
private StatefulRedisConnection<String, String> connection; | |
private ObjectMapper mapper; | |
@Value("${redis.nodes:localhost:6379}") | |
private String redisNodes; | |
@Value("${redis.timeout:8000}") | |
private int timeout = 8000; | |
@Value("${orbit.forkJoinPool.parallelism:16}") | |
private int parallelism = 16; | |
@Override | |
public Task<Void> start() { | |
mapper = new ObjectMapper(); | |
mapper.registerModule(new ActorReferenceModule(DefaultDescriptorFactory.get())); | |
mapper.registerModule(new AfterburnerModule()); | |
mapper.setVisibility(mapper.getSerializationConfig().getDefaultVisibilityChecker(). | |
withFieldVisibility(JsonAutoDetect.Visibility.ANY) | |
.withGetterVisibility(JsonAutoDetect.Visibility.NONE) | |
.withIsGetterVisibility(JsonAutoDetect.Visibility.NONE) | |
.withSetterVisibility(JsonAutoDetect.Visibility.NONE) | |
.withCreatorVisibility(JsonAutoDetect.Visibility.NONE)); | |
ClientResources resources = DefaultClientResources.builder().ioThreadPoolSize(parallelism).computationThreadPoolSize(parallelism).build(); | |
client = RedisClient.create(resources); | |
if (redisNodes.indexOf(',') > -1) { | |
List<RedisURI> nodes = new ArrayList<>(); | |
for (String node : StringUtils.tokenizeToStringArray(redisNodes, ",", true, true)) { | |
nodes.add(buildRedisUri(node)); | |
} | |
connection = MasterSlave.connect(client, new Utf8StringCodec(), nodes); | |
((StatefulRedisMasterSlaveConnection) connection).setReadFrom(ReadFrom.MASTER_PREFERRED); | |
} else { | |
client = RedisClient.create(resources, buildRedisUri(redisNodes)); | |
connection = client.connect(); | |
} | |
return Task.done(); | |
} | |
@NotNull | |
private RedisURI buildRedisUri(String node) { | |
RedisURI uri = RedisURI.create("redis://" + node); | |
uri.setTimeout(timeout); | |
uri.setUnit(TimeUnit.MILLISECONDS); | |
return uri; | |
} | |
@Override | |
public Task<Void> clearState(final RemoteReference<?> reference, final Object state) { | |
RedisAsyncCommands<String, String> async = connection.async(); | |
return Task.from(async.del(asKey(reference)).thenApplyAsync(result -> null, executorService)); | |
} | |
@Override | |
public Task<Void> stop() { | |
try { | |
connection.close(); | |
} finally { | |
client.shutdown(0, 15, TimeUnit.SECONDS); | |
} | |
return Task.done(); | |
} | |
@Override | |
@SuppressWarnings("unchecked") | |
public Task<Boolean> readState(final RemoteReference<?> reference, final Object state) { | |
RedisAsyncCommands<String, String> async = connection.async(); | |
return Task.from(async.get(asKey(reference)).thenApplyAsync(data -> { | |
if (data != null) { | |
try { | |
mapper.readerForUpdating(state).readValue(data); | |
return true; | |
} catch (Exception e) { | |
throw new UncheckedException("Error parsing Redis response: " + data, e); | |
} | |
} | |
return false; | |
}, executorService)); // ensure actor activation continues in the Orbit thread pool. | |
} | |
@Override | |
@SuppressWarnings("unchecked") | |
public Task<Void> writeState(final RemoteReference<?> reference, final Object state) { | |
String data; | |
try { | |
data = mapper.writeValueAsString(state); | |
} catch (JsonProcessingException e) { | |
throw new UncheckedException(e); | |
} | |
RedisAsyncCommands<String, String> async = connection.async(); | |
return Task.from(async.set(asKey(reference), data).thenApplyAsync(result -> null, executorService)); | |
} | |
private static final String KEY_SEPARATOR = "|"; | |
private String asKey(final RemoteReference<?> reference) { | |
return RemoteReference.getInterfaceClass(reference).getSimpleName() + KEY_SEPARATOR + String.valueOf(RemoteReference.getId(reference)); | |
} | |
public void setExecutorService(ExecutorService executorService) { | |
this.executorService = executorService; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment