|
import com.yammer.dropwizard.util.Duration; |
|
|
|
import com.google.common.cache.*; |
|
import org.slf4j.Logger; |
|
import org.slf4j.LoggerFactory; |
|
import java.util.concurrent.*; |
|
|
|
import static com.google.common.base.Throwables.propagateIfInstanceOf; |
|
|
|
public class LocalAppConfigStorage implements AppConfigStorage { |
|
private static final Logger log = LoggerFactory.getLogger(LocalAppConfigStorage.class); |
|
|
|
private final LoadingCache<AppConfigParams, AppConfig> cache; |
|
|
|
public LocalAppConfigStorage(final AppConfigStorage appConfigUpstream, |
|
final ScheduledExecutorService scheduledExecutor, |
|
final Duration refreshInterval, final Duration expireAfterAccess) { |
|
|
|
cache = CacheBuilder.newBuilder() |
|
.expireAfterAccess(expireAfterAccess.getQuantity(), expireAfterAccess.getUnit()) |
|
.recordStats() |
|
.build(new Loader(appConfigUpstream, scheduledExecutor, refreshInterval)); |
|
} |
|
|
|
private class Loader extends CacheLoader<AppConfigParams, AppConfig> { |
|
|
|
private final AppConfigStorage upstream; |
|
private final ScheduledExecutorService scheduledExecutor; |
|
private final Duration refreshInterval; |
|
|
|
public Loader(AppConfigStorage appConfigUpstream, |
|
ScheduledExecutorService scheduledExecutor, |
|
Duration refreshInterval) { |
|
this.upstream = appConfigUpstream; |
|
this.scheduledExecutor = scheduledExecutor; |
|
this.refreshInterval = refreshInterval; |
|
} |
|
|
|
@Override |
|
public AppConfig load(AppConfigParams key) throws NotFoundException, TransientException { |
|
log.info("Loading {}", key); |
|
final AppConfig appConfig = upstream.get(key); |
|
schedule(new RefreshJob(key, appConfig)); |
|
return appConfig; |
|
} |
|
|
|
private ScheduledFuture<?> schedule(RefreshJob job) { |
|
return scheduledExecutor.schedule( |
|
job, |
|
refreshInterval.getQuantity(), refreshInterval.getUnit()); |
|
} |
|
|
|
private class RefreshJob implements Runnable { |
|
private final AppConfigParams key; |
|
private AppConfig oldValue; |
|
|
|
private RefreshJob(AppConfigParams key, AppConfig oldValue) { |
|
this.oldValue = oldValue; |
|
log.info("Scheduling new refresh job for key: {}", key); |
|
this.key = key; |
|
} |
|
|
|
@Override |
|
public void run() { |
|
if (cache.asMap().containsKey(key)) { |
|
log.debug("Refreshing key - {} {}", key, oldValue.getUpdateKey()); |
|
tryRefresh(); |
|
schedule(this); |
|
} else { |
|
log.info("Terminating refresh job for expired key - {}", key); |
|
} |
|
} |
|
|
|
private void tryRefresh() { |
|
try { |
|
final AppConfig newValue = upstream.get(key); |
|
if (newValue.getUpdateKey() != null && |
|
!newValue.getUpdateKey().equals(oldValue.getUpdateKey())) { |
|
log.info("Got an updated value for key: {} {}", key, newValue.getUpdateKey()); |
|
cache.put(key, newValue); |
|
oldValue = newValue; |
|
} |
|
} catch (Exception e) { |
|
log.info("Transient err from upstream - {} {}", key, oldValue.getUpdateKey()); |
|
} |
|
} |
|
} |
|
} |
|
|
|
@Override |
|
public AppConfig get(AppConfigParams key) throws NotFoundException, TransientException { |
|
/** elided |
|
* cache.get(key) |
|
**/ |
|
} |
|
} |