Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Created October 13, 2016 06:33
Show Gist options
  • Save nsivabalan/8cab287d589835cedcb647c8e59dc7f1 to your computer and use it in GitHub Desktop.
Save nsivabalan/8cab287d589835cedcb647c8e59dc7f1 to your computer and use it in GitHub Desktop.
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.config.StoreConfig;
import com.github.ambry.store.BlobStore;
import com.github.ambry.utils.Time;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
* Manages Scrubbing triggers and decides when to srub a particular store
*/
class ScrubbingManager {
private final Map<PartitionId, BlobStore> stores;
private final StoreConfig storeConfig;
private final Time time;
private ScheduledExecutorService scheduledExecutorService;
// the ScrubbingManager can evolve to receive objects denoting server state or triggers so as to decide when
// is the right time to trigger scrubbing
ScrubbingManager(Map<PartitionId, BlobStore> stores, StoreConfig storeConfig, Time time) {
this.stores = stores;
this.storeConfig = storeConfig;
this.time = time;
}
/**
* Starts the ScrubbingManager and schedules scrubbing jobs if required.
*/
void start(){
int numThreads = 1; // pick up from config.
// schedule compaction jobs.
scheduledExecutorService = Executors.newScheduledThreadPool(numThreads);
for(final Map.Entry<PartitionId, BlobStore> storeEntry : stores.entrySet()) {
final long stagger = 0;
scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
storeEntry.getValue().scrub(0);
scheduledExecutorService.schedule(this, storeConfig.MinInternalToScrubInMs +
ThreadLocalRandom.current().nextInt(storeConfig.MinInternalToScrubInMs) - time.milliseconds(), TimeUnit.MILLISECONDS);
}
}, stagger, TimeUnit.MILLISECONDS);
}
}
/**
* Shuts down the {@link ScrubbingManager}.
*/
void shutdown() {
if (scheduledExecutorService != null) {
try {
scheduledExecutorService.shutdown();
scheduledExecutorService.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment