Created
September 25, 2012 20:43
-
-
Save Randgalt/3784312 to your computer and use it in GitHub Desktop.
InterProcessSemaphoreV2.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2012 Netflix, Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package com.netflix.curator.framework.recipes.locks; | |
import com.google.common.base.Preconditions; | |
import com.google.common.collect.ImmutableList; | |
import com.google.common.io.Closeables; | |
import com.netflix.curator.framework.CuratorFramework; | |
import com.netflix.curator.framework.recipes.shared.SharedCountListener; | |
import com.netflix.curator.framework.recipes.shared.SharedCountReader; | |
import com.netflix.curator.framework.state.ConnectionState; | |
import com.netflix.curator.utils.ZKPaths; | |
import org.apache.zookeeper.CreateMode; | |
import org.apache.zookeeper.KeeperException; | |
import org.apache.zookeeper.WatchedEvent; | |
import org.apache.zookeeper.Watcher; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.IOException; | |
import java.util.Collection; | |
import java.util.List; | |
import java.util.concurrent.TimeUnit; | |
public class InterProcessSemaphoreV2 | |
{ | |
private final Logger log = LoggerFactory.getLogger(getClass()); | |
private final InterProcessMutex lock; | |
private final CuratorFramework client; | |
private final String leasesPath; | |
private final Watcher watcher = new Watcher() | |
{ | |
@Override | |
public void process(WatchedEvent event) | |
{ | |
notifyFromWatcher(); | |
} | |
}; | |
private volatile int maxLeases; | |
private static final String LOCK_PARENT = "locks"; | |
private static final String LEASE_PARENT = "leases"; | |
private static final String LEASE_BASE_NAME = "lease-"; | |
/** | |
* @param client the client | |
* @param path path for the semaphore | |
* @param maxLeases the max number of leases to allow for this instance | |
*/ | |
public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases) | |
{ | |
this(client, path, maxLeases, null); | |
} | |
/** | |
* @param client the client | |
* @param path path for the semaphore | |
* @param count the shared count to use for the max leases | |
*/ | |
public InterProcessSemaphoreV2(CuratorFramework client, String path, SharedCountReader count) | |
{ | |
this(client, path, 0, count); | |
} | |
private InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases, SharedCountReader count) | |
{ | |
this.client = client; | |
lock = new InterProcessMutex(client, ZKPaths.makePath(path, LOCK_PARENT)); | |
this.maxLeases = (count != null) ? count.getCount() : maxLeases; | |
leasesPath = ZKPaths.makePath(path, LEASE_PARENT); | |
if ( count != null ) | |
{ | |
count.addListener | |
( | |
new SharedCountListener() | |
{ | |
@Override | |
public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception | |
{ | |
InterProcessSemaphoreV2.this.maxLeases = newCount; | |
} | |
@Override | |
public void stateChanged(CuratorFramework client, ConnectionState newState) | |
{ | |
// no need to handle this here - clients should set their own connection state listener | |
} | |
} | |
); | |
} | |
} | |
/** | |
* Convenience method. Closes all leases in the given collection of leases | |
* | |
* @param leases leases to close | |
*/ | |
public void returnAll(Collection<Lease> leases) | |
{ | |
for ( Lease l : leases ) | |
{ | |
Closeables.closeQuietly(l); | |
} | |
} | |
/** | |
* Convenience method. Closes the lease | |
* | |
* @param lease lease to close | |
*/ | |
public void returnLease(Lease lease) | |
{ | |
Closeables.closeQuietly(lease); | |
} | |
/** | |
* <p>Acquire a lease. If no leases are available, this method blocks until either the maximum | |
* number of leases is increased or another client/process closes a lease.</p> | |
* | |
* <p>The client must close the lease when it is done with it. You should do this in a | |
* <code>finally</code> block.</p> | |
* | |
* @return the new lease | |
* @throws Exception ZK errors, interruptions, etc. | |
*/ | |
public Lease acquire() throws Exception | |
{ | |
Collection<Lease> leases = acquire(1, 0, null); | |
return leases.iterator().next(); | |
} | |
/** | |
* <p>Acquire <code>qty</code> leases. If there are not enough leases available, this method | |
* blocks until either the maximum number of leases is increased enough or other clients/processes | |
* close enough leases.</p> | |
* | |
* <p>The client must close the leases when it is done with them. You should do this in a | |
* <code>finally</code> block. NOTE: You can use {@link #returnAll(Collection)} for this.</p> | |
* | |
* @param qty number of leases to acquire | |
* @return the new leases | |
* @throws Exception ZK errors, interruptions, etc. | |
*/ | |
public Collection<Lease> acquire(int qty) throws Exception | |
{ | |
return acquire(qty, 0, null); | |
} | |
/** | |
* <p>Acquire a lease. If no leases are available, this method blocks until either the maximum | |
* number of leases is increased or another client/process closes a lease. However, this method | |
* will only block to a maximum of the time parameters given.</p> | |
* | |
* <p>The client must close the lease when it is done with it. You should do this in a | |
* <code>finally</code> block.</p> | |
* | |
* @param time time to wait | |
* @param unit time unit | |
* @return the new lease or null if time ran out | |
* @throws Exception ZK errors, interruptions, etc. | |
*/ | |
public Lease acquire(long time, TimeUnit unit) throws Exception | |
{ | |
Collection<Lease> leases = acquire(1, time, unit); | |
return (leases != null) ? leases.iterator().next() : null; | |
} | |
/** | |
* <p>Acquire <code>qty</code> leases. If there are not enough leases available, this method | |
* blocks until either the maximum number of leases is increased enough or other clients/processes | |
* close enough leases. However, this method will only block to a maximum of the time | |
* parameters given. If time expires before all leases are acquired, the subset of acquired | |
* leases are automatically closed.</p> | |
* | |
* <p>The client must close the leases when it is done with them. You should do this in a | |
* <code>finally</code> block. NOTE: You can use {@link #returnAll(Collection)} for this.</p> | |
* | |
* @param qty number of leases to acquire | |
* @param time time to wait | |
* @param unit time unit | |
* @return the new leases or null if time ran out | |
* @throws Exception ZK errors, interruptions, etc. | |
*/ | |
public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception | |
{ | |
long startMs = System.currentTimeMillis(); | |
boolean hasWait = (unit != null); | |
long waitMs = hasWait ? TimeUnit.MILLISECONDS.convert(time, unit) : 0; | |
Preconditions.checkArgument(qty > 0, "qty cannot be 0"); | |
ImmutableList.Builder<Lease> builder = ImmutableList.builder(); | |
boolean success = false; | |
try | |
{ | |
while ( qty-- > 0 ) | |
{ | |
if ( hasWait ) | |
{ | |
long thisWaitMs = getThisWaitMs(startMs, waitMs); | |
if ( !lock.acquire(thisWaitMs, TimeUnit.MILLISECONDS) ) | |
{ | |
return null; | |
} | |
} | |
else | |
{ | |
lock.acquire(); | |
} | |
try | |
{ | |
String path = client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME)); | |
builder.add(makeLease(path)); | |
synchronized(this) | |
{ | |
for(;;) | |
{ | |
List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath); | |
if ( children.size() <= maxLeases ) | |
{ | |
break; | |
} | |
if ( hasWait ) | |
{ | |
long thisWaitMs = getThisWaitMs(startMs, waitMs); | |
if ( thisWaitMs <= 0 ) | |
{ | |
return null; | |
} | |
wait(thisWaitMs); | |
} | |
else | |
{ | |
wait(); | |
} | |
} | |
} | |
} | |
finally | |
{ | |
lock.release(); | |
} | |
} | |
success = true; | |
} | |
finally | |
{ | |
if ( !success ) | |
{ | |
returnAll(builder.build()); | |
} | |
} | |
return builder.build(); | |
} | |
private long getThisWaitMs(long startMs, long waitMs) | |
{ | |
long elapsedMs = System.currentTimeMillis() - startMs; | |
return waitMs - elapsedMs; | |
} | |
private Lease makeLease(final String path) | |
{ | |
return new Lease() | |
{ | |
@Override | |
public void close() throws IOException | |
{ | |
try | |
{ | |
client.delete().guaranteed().forPath(path); | |
} | |
catch ( KeeperException.NoNodeException e ) | |
{ | |
log.warn("Lease already released", e); | |
} | |
catch ( Exception e ) | |
{ | |
throw new IOException(e); | |
} | |
} | |
}; | |
} | |
private synchronized void notifyFromWatcher() | |
{ | |
notifyAll(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment