Created
April 1, 2012 17:16
-
-
Save rocketraman/2277155 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.vivosys.cluster.hazelcast; | |
import com.vivosys.cluster.api.Cluster; | |
import com.vivosys.cluster.api.ClusterException; | |
import com.google.common.collect.ImmutableList; | |
import com.hazelcast.core.*; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.util.Iterator; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.locks.Lock; | |
/** | |
* An implementation of Cluster using Hazelcast. | |
*/ | |
public class HazelcastCluster implements Cluster { | |
private static final Logger log = LoggerFactory.getLogger(HazelcastCluster.class); | |
private HazelcastInstance hazelcastInstance; | |
private IMap<Object, Member> lockMap; | |
public void init() { | |
lockMap = hazelcastInstance.getMap("lockMap"); | |
hazelcastInstance.getCluster().addMembershipListener(new MembershipListener() { | |
@Override | |
public void memberAdded(MembershipEvent membershipEvent) { | |
log.info("{} is now part of the cluster.", membershipEvent.getMember()); | |
} | |
@Override | |
public void memberRemoved(MembershipEvent membershipEvent) { | |
final Member member = membershipEvent.getMember(); | |
log.info("{} is no longer part of the cluster.", member); | |
try { | |
clearTokenLocks(member); | |
} catch (Exception e) { | |
/* ignore */ | |
} | |
} | |
}); | |
} | |
public void setHazelcastInstance(HazelcastInstance hazelcastInstance) { | |
this.hazelcastInstance = hazelcastInstance; | |
} | |
public LifecycleService getLifecycleService() { | |
return hazelcastInstance.getLifecycleService(); | |
} | |
@Override | |
public Lock getThreadLock(Object object) { | |
return hazelcastInstance.getLock(object); | |
} | |
@Override | |
public boolean tryTokenLock(final Object object) { | |
return safeMapOp(new Callable<Boolean>() { | |
@Override | |
public Boolean call() { | |
Member value = lockMap.get(object); | |
if (value == null) { | |
lockMap.put(object, thisMember()); | |
return true; | |
} | |
return false; | |
} | |
}, lockMap); | |
} | |
@Override | |
public void tokenUnlock(final Object object) { | |
safeMapOp(new Callable<Void>() { | |
@Override | |
public Void call() { | |
lockMap.remove(object); | |
return null; | |
} | |
}, lockMap); | |
} | |
@Override | |
public void clearAllTokenLocks() { | |
safeMapOp(new Callable<Void>() { | |
@Override | |
public Void call() { | |
for (Iterator keyIterator = lockMap.keySet().iterator(); keyIterator.hasNext(); keyIterator.next()) { | |
keyIterator.remove(); | |
} | |
return null; | |
} | |
}, lockMap); | |
} | |
@Override | |
public ImmutableList<Object> getThisMemberTokenLocks() { | |
return safeMapOp(new Callable<ImmutableList<Object>>() { | |
@Override | |
public ImmutableList<Object> call() { | |
final ImmutableList.Builder<Object> lockListBuilder = new ImmutableList.Builder<>(); | |
for (final Object lockedObject : lockMap.keySet()) { | |
if (lockMap.get(lockedObject).equals(thisMember())) { | |
lockListBuilder.add(lockedObject); | |
} | |
} | |
return lockListBuilder.build(); | |
} | |
}, lockMap); | |
} | |
@Override | |
public void clearThisMemberTokenLocks() { | |
clearTokenLocks(thisMember()); | |
} | |
private void clearTokenLocks(final Member member) { | |
safeMapOp(new Callable<Void>() { | |
@Override | |
public Void call() { | |
for (Iterator keyIterator = lockMap.keySet().iterator(); keyIterator.hasNext(); ) { | |
final Member lockMember = lockMap.get(keyIterator.next()); | |
if (lockMember.equals(member)) { | |
keyIterator.remove(); | |
} | |
} | |
return null; | |
} | |
}, lockMap); | |
} | |
private <T> T safeMapOp(Callable<T> callable, IMap<Object, Member> iMap) { | |
try { | |
if(! iMap.lockMap(10, TimeUnit.SECONDS)) { | |
throw new ClusterException("Unable to lock distributed map " + iMap.getName() + | |
" in cluster " + thisMember() + "."); | |
} | |
} catch (Exception e) { | |
log.warn("Exception while locking distributed map, perhaps a node was shutting down?", e); | |
} | |
try { | |
try { | |
return callable.call(); | |
} catch (Exception e) { | |
throw new ClusterException(e); | |
} | |
} finally { | |
try { | |
iMap.unlockMap(); | |
} catch (Exception e) { | |
log.warn("Exception while unlocking distributed map, perhaps a node was shutting down?", e); | |
} | |
} | |
} | |
private Member thisMember() { | |
return hazelcastInstance.getCluster().getLocalMember(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment