Created
October 5, 2020 19:13
-
-
Save maxpert/73afa69bf26fba38a44761594410b7cc to your computer and use it in GitHub Desktop.
Redis PING Implementation for JGroups
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package luna.lib | |
import io.lettuce.core.RedisClient | |
import io.lettuce.core.RedisException | |
import io.lettuce.core.cluster.RedisClusterClient | |
import io.lettuce.core.cluster.api.sync.RedisClusterCommands | |
import org.jgroups.Address | |
import org.jgroups.annotations.MBean | |
import org.jgroups.annotations.Property | |
import org.jgroups.conf.ClassConfigurator | |
import org.jgroups.protocols.Discovery | |
import org.jgroups.protocols.FILE_PING | |
import org.jgroups.protocols.PingData | |
import org.jgroups.util.Responses | |
import java.util.Base64 | |
@MBean(description = "Redis based discovery protocol") | |
open class REDIS_PING : FILE_PING() { | |
companion object { | |
private val base64Encoder = Base64.getEncoder() | |
private val base64Decoder = Base64.getDecoder() | |
init { | |
ClassConfigurator.addProtocol(2200, REDIS_PING::class.java) | |
} | |
} | |
@Property(description = "URL of Redis client") | |
protected var url: String = "redis://localhost:6379/0" | |
@Property(description = "Is redis cluster") | |
protected var cluster: Boolean = false | |
@Property(description = "Name of the group of pods") | |
protected var group: String = "jgroups" | |
@Property(description = "Don't allow cluster to operate without redis instance") | |
protected var strict: Boolean = false | |
private var clusterMembers: Map<String, String> = mapOf() | |
protected val redisCommands: RedisClusterCommands<String, String> by lazy { | |
if (cluster) | |
RedisClusterClient.create(url).connect().sync() | |
else | |
RedisClient.create(url).connect().sync() | |
} | |
override fun init() { | |
super.init() | |
failOpen { | |
redisCommands.ping() | |
} | |
} | |
override fun write(data: MutableList<PingData>, clustername: String) { | |
data.forEach { d -> | |
val serData = serializeWithoutView(d).toBase64() | |
failOpen { | |
redisCommands.hset(clustername, Discovery.addressAsString(d.address), serData) | |
} | |
} | |
} | |
override fun removeAll(clustername: String?) { | |
if (clustername == null) { | |
return | |
} | |
failOpen { redisCommands.del(clustername) } | |
} | |
override fun remove(clustername: String?, addr: Address?) { | |
if (clustername == null || addr == null) { | |
return | |
} | |
failOpen { redisCommands.hdel(clustername, Discovery.addressAsString(addr)) } | |
} | |
override fun readAll(members: MutableList<Address>?, clustername: String, responses: Responses) { | |
clusterMembers = failOpen (mapOf()) { redisCommands.hgetall(clustername) } | |
for ((ownAddr, serData) in clusterMembers) { | |
val data = try { | |
Discovery.deserialize(serData.fromBase64()) | |
} catch (e: Throwable) { | |
log.error("Unable to read member %s %s; error: %s", local_addr, ownAddr, e) | |
null | |
} | |
if (data == null) { | |
failOpen { | |
redisCommands.hdel(clustername, ownAddr) | |
} | |
continue | |
} | |
if (members != null && !members.contains(data.address)) { | |
continue | |
} | |
responses.addResponse(data, false) | |
if (local_addr != null && local_addr != data.address) { | |
addDiscoveryResponseToCaches(data.address, data.logicalName, data.physicalAddr) | |
} | |
} | |
} | |
private fun failOpen(call: () -> Unit) { | |
try { | |
call() | |
} catch (e: RedisException) { | |
if (strict) { | |
throw e | |
} | |
log.warn("Running in non-strict mode ignoring redis exception: ${e.message}") | |
} | |
} | |
private fun <T> failOpen(default: T, call: () -> T): T { | |
return try { | |
call() | |
} catch (e: RedisException) { | |
if (strict) { | |
throw e | |
} | |
log.warn("Running in non-strict mode ignoring redis exception", e) | |
default | |
} | |
} | |
fun ByteArray.toBase64(): String = base64Encoder.encodeToString(this) | |
fun String.fromBase64(): ByteArray = base64Decoder.decode(this) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment