Created
September 11, 2012 20:29
-
-
Save 0x1b-xyz/3701760 to your computer and use it in GitHub Desktop.
A generic ehcache "Stage"
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pbatch.rt.staging | |
import groovy.util.logging.Log4j | |
import net.sf.ehcache.Cache | |
import net.sf.ehcache.CacheManager | |
import net.sf.ehcache.Element | |
import net.sf.ehcache.config.CacheConfiguration | |
import net.sf.ehcache.store.MemoryStoreEvictionPolicy | |
import org.springframework.util.Assert | |
import pbatch.BatchId | |
import pbatch.NotFoundException | |
import pbatch.staging.Predicate | |
import pbatch.staging.Extractor | |
import pbatch.staging.MissingIdentifierException | |
import org.springframework.util.ClassUtils | |
import pbatch.staging.Chunkable | |
import pbatch.rt.BatchImpl | |
import pbatch.staging.Visitor | |
/** | |
* Implements a {@link pbatch.staging.Stage} using EhCache instances, one for each | |
* {@link pbatch.staging.annotation.Staged} in the {@link Registry}. The caches will be configured to | |
* reflect the attributes of their {@link pbatch.staging.annotation.Staged} annotation. | |
* | |
* @author jstiefel | |
*/ | |
@Log4j | |
class EhcacheStage implements StageOperations { | |
public static final int DEFAULT_CACHE_SIZE = 1000; | |
final Registry registry; | |
private BatchImpl batch | |
private CacheManager cacheManager | |
/** | |
* Builds a {@link pbatch.staging.Stage} for the specified {@link Registry}. | |
*/ | |
EhcacheStage(BatchImpl batchImpl, Registry registry, CacheManager cacheManager) { | |
this.batch = batchImpl; | |
this.registry = registry | |
this.cacheManager = cacheManager | |
Assert.notNull(batch, 'A pbatch is required') | |
Assert.notNull(registry, 'A registry is required'); | |
Assert.notNull(cacheManager, 'A cache manager is required'); | |
registry.descriptors.each { Descriptor descriptor -> | |
def size = getCacheSize(descriptor) | |
def name = getCacheName(descriptor) | |
CacheConfiguration config = new CacheConfiguration(name, size) | |
.memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU) | |
.overflowToDisk(true) | |
.eternal(true) | |
if (log.infoEnabled) | |
log.info("Building stage for pbatch ${batch.id} named ${name} of size ${size} ") | |
Cache cache = new Cache(config) | |
cacheManager.addCache(cache) | |
} | |
} | |
@Override | |
void clear() { | |
log.info("Clearing all staged data for ${batch.id}") | |
registry.descriptors.each { Descriptor descriptor -> | |
log.info("Clearing ${ClassUtils.getShortName(descriptor.type)} of ${batch.id}") | |
cacheManager.getCache(getCacheName(descriptor)).removeAll(); | |
} | |
} | |
@Override | |
void flush(Class<? extends Serializable> type) { | |
log.warn("EhcacheStage#flush() is not implemented due to some really odd behavior.") | |
// if (type) { | |
// if (log.debugEnabled) | |
// log.debug("Flushing stage for ${batchId}#${type}") | |
//Cache cache = cacheManager.getCache(getCacheName(registry.getDescriptor(type))) | |
//cache.flush(); | |
// } else { | |
// if (log.debugEnabled) | |
// log.debug("Flushing all stages for ${batchId}") | |
// registry.descriptors.each { flush(it.type) } | |
// } | |
} | |
@Override | |
int count(Class<? extends Serializable> type) { | |
Cache cache = cacheManager.getCache(getCacheName(registry.getDescriptor(type))) | |
cache.getSize() | |
} | |
@Override | |
def <T extends Serializable> boolean exists(Class<T> type, Serializable identifier) { | |
Cache cache = cacheManager.getCache(getCacheName(registry.getDescriptor(type))) | |
cache.isKeyInCache(identifier); | |
} | |
def <T extends Serializable> T get(Class<T> type, Serializable identifier) throws NotFoundException { | |
Cache cache = cacheManager.getCache(getCacheName(registry.getDescriptor(type))); | |
Element element = cache.get(identifier); | |
if (!element) | |
throw new NotFoundException(batch.id, identifier); | |
(T)element.value | |
} | |
@Override | |
def <T extends Serializable> void save(List<T> entities) { | |
if (entities.empty) | |
return | |
Descriptor descriptor = registry.getDescriptor(entities.first().getClass()) | |
Cache cache = cacheManager.getCache(getCacheName(descriptor)); | |
entities.each { | |
if (it.getClass() != descriptor.type) | |
throw new IllegalArgumentException("Attempt to save a list of varied entity types") | |
Serializable identifier = descriptor.getIdentifier(it); | |
if (!identifier) | |
throw new MissingIdentifierException(batch.id, it.getClass().toString()) | |
cache.put(new Element(identifier, it)) | |
} | |
} | |
/** | |
* TODO This method needs refactoring to use the {@link #findAll(Class, Predicate)} implementation with an | |
* intelligent {@link Predicate} in lieu of the more complicated implementation here. | |
*/ | |
@Override | |
def <T extends Serializable> List<T> findAll(Class<T> type, int pageSize, int page) { | |
if (log.debugEnabled) | |
log.debug("Reading page ${page}*${pageSize} of ${batch.id}#${type}") | |
Cache cache = cacheManager.getCache(getCacheName(registry.getDescriptor(type))) | |
List keys = cache.keys.sort(); | |
if (keys.empty) | |
return []; | |
int start = (pageSize * page) | |
if (start < 0) | |
throw new IllegalStateException("Invalid page ${page} + pageSize + ${pageSize}") | |
if (start > (keys.size() - 1)) { | |
if (log.debugEnabled) | |
log.debug("Read last page of ${batch.id}#${type}") | |
return []; | |
} | |
int end = (start + pageSize) - 1; | |
if (end > (keys.size() - 1)) | |
end = (keys.size() - 1) | |
keys[start..end].collectAll { cache.get(it).value } | |
} | |
@Override | |
def <T extends Serializable> List<T> findAll(Class<T> type) { | |
findAll(type, Predicate.EMPTY) | |
} | |
@Override | |
def <T extends Serializable> List<T> findAll(Class<T> type, Predicate predicate) { | |
Cache cache = cacheManager.getCache(getCacheName(registry.getDescriptor(type))); | |
cache.keys.findAll { predicate.apply((Serializable)it) }.collectAll { cache.get(it).value }.sort() | |
} | |
@Override | |
def <T extends Serializable> T find(Class<T> type, Predicate predicate) { | |
Cache cache = cacheManager.getCache(getCacheName(registry.getDescriptor(type))); | |
Serializable key = (Serializable)cache.keys.find { predicate.apply((Serializable)it) } | |
if (!key) | |
throw new NotFoundException(batch.id, predicate.toString()); | |
get(type, key); | |
} | |
@Override | |
def <T extends Serializable, AT> List<AT> extract(Class<T> type, Class<AT> attributeType, Predicate predicate, Extractor<T, AT> extractor) { | |
findAll(type, predicate).collect { (AT)extractor.extract(it) } | |
} | |
@Override | |
def <T extends Serializable> void visit(Class<T> type, Predicate predicate, Visitor<T> visitor) { | |
Cache cache = cacheManager.getCache(getCacheName(registry.getDescriptor(type))); | |
int index = 0; | |
findKeys(cache, type, predicate).each { | |
visitor.visit(index++, cache.get(it).value) | |
} | |
} | |
/** | |
* Returns a list of keys matching the {@link Predicate}. | |
*/ | |
protected <T extends Serializable> List<Serializable> findKeys(Cache cache, Class<T> type, Predicate predicate) { | |
cache.keys.findAll { predicate.apply((Serializable)it) } | |
} | |
/** | |
* Takes the highest of the {@link pbatch.staging.Chunkable#getChunkSize} from the entity | |
* {@link pbatch.staging.Loader} or {@link pbatch.staging.Processor}, or just uses the | |
* {@link #DEFAULT_CACHE_SIZE}, whichever is largest. | |
*/ | |
protected int getCacheSize(Descriptor descriptor) { | |
int size = DEFAULT_CACHE_SIZE | |
[descriptor.loader,descriptor.processor].each { | |
if (it) { | |
Chunkable chunkable = batch.context.getBean(it) | |
if (chunkable.chunkSize > size) | |
size = chunkable.chunkSize | |
} | |
} | |
size | |
} | |
/** | |
* Prepends the {@link BatchId} to the {@link Descriptor#type} name to create a unique name for the specific | |
* cache. | |
*/ | |
protected String getCacheName(Descriptor descriptor) { | |
"${batch.id}_${descriptor.type.name}" | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pbatch.staging; | |
import pbatch.NotFoundException; | |
import java.io.Serializable; | |
import java.util.List; | |
/** | |
* A referential datastore abstraction that maps a set of "@{@link pbatch.staging.annotation.Staged}" | |
* annotations to a persistence implementation like ehcache. Entities are loaded into the stage using | |
* {@link pbatch.Batch#load(List)} or the more efficient | |
* {@link pbatch.Batch#load(Class, org.springframework.core.io.Resource)} which takes advantage of a memory | |
* and garbage collection efficient chunking strategy within Spring Batch and allows batch sizes to approach gigabytes | |
* with a standard VM heap size without falling over. | |
* <p/> | |
* Entities are discovered by the {@code registry} using the | |
* {@link pbatch.Config#CONFIG_PACKAGES_TO_SCAN} | |
* configuration override. Each entity generates a graph of {@link pbatch.staging.annotation.Staged}, | |
* {@link pbatch.staging.annotation.StagedIdentifier} and | |
* {@link pbatch.staging.annotation.StagedAttribute} references that are persistable and the | |
* {@link Loader}s, {@link Transformer}s | |
* and {@link Processor}s that act upon them in the lifecycle of a {@link pbatch.Batch}. | |
* | |
* @see pbatch.staging.annotation.Staged | |
* @see pbatch.staging.annotation.StagedAttribute | |
* @see Loader | |
* @see Processor | |
* | |
* @author jstiefel | |
*/ | |
public interface Stage { | |
/** | |
* Returns a count of the specified {@link Stage} data. | |
*/ | |
int count(Class<? extends Serializable> type); | |
/** | |
* Indicates if a {@link pbatch.staging.annotation.Staged} entity with the specified identifier exists | |
* in the {@code Stage}. | |
*/ | |
<T extends Serializable> boolean exists(Class<T> type, Serializable identifier); | |
/** | |
* Finds the specified {@link pbatch.staging.annotation.Staged}. Should be the most efficient use of | |
* caching, requires a fully formed identifier for the entity (this includes the nested | |
* {@link pbatch.BatchId}. | |
*/ | |
<T extends Serializable> T get(Class<T> type, Serializable identifier) throws NotFoundException; | |
/** | |
* Finds all {@link pbatch.staging.annotation.Staged} of the specified type using an "empty" predicate | |
* delegated to {@link #findAll(Class, Predicate)}. | |
* | |
* @see Predicate#EMPTY | |
*/ | |
<T extends Serializable> List<T> findAll(Class<T> type); | |
/** | |
* Finds all of the specified {@link pbatch.staging.annotation.Staged} using the provided | |
* {@link Predicate}. Ordering is based on a natural sort of the keys of the entities, so implement | |
* {@link Comparable} if want to impose your own order. | |
*/ | |
<T extends Serializable> List<T> findAll(Class<T> type, Predicate predicate); | |
/** | |
* Searches against the identifiers of a {@code Stage} for those that match the {@code predicate}. | |
* | |
* @throws NotFoundException When nothing matches. The "identifier" in the exception will be the | |
* {@code toString} of the {@link Predicate}. | |
*/ | |
<T extends Serializable> T find(Class<T> type, Predicate predicate) throws NotFoundException; | |
/** | |
* Finds a page of {@link pbatch.staging.annotation.Staged} entities. Ordering is imposed by | |
* {@link #findAll(Class, Predicate)}. | |
*/ | |
<T extends Serializable> List<T> findAll(Class<T> type, int pageSize, int page); | |
/** | |
* Enables extraction of data from each entity identified by the {@code predicate}, returning a list of whatever | |
* you're interested in in the order imposed by {@link #findAll(Class)}. Ordering is imposed by | |
* {@link #findAll(Class, Predicate)} | |
*/ | |
<T extends Serializable, AT> List<AT> extract(Class<T> type, Class<AT> attributeType, Predicate predicate, Extractor<T, AT> extractor); | |
/** | |
* Visits each entity matching the {@link Predicate}, executing {@link Visitor#visit(int, java.io.Serializable)}. | |
* Ordering is imposed by {@link #findAll(Class, Predicate)} | |
*/ | |
<T extends Serializable> void visit(Class<T> type, Predicate predicate, Visitor<T> visitor); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment