Skip to content

Instantly share code, notes, and snippets.

@0x1b-xyz
Created September 11, 2012 20:29
Show Gist options
  • Save 0x1b-xyz/3701760 to your computer and use it in GitHub Desktop.
Save 0x1b-xyz/3701760 to your computer and use it in GitHub Desktop.
A generic ehcache "Stage"
package pbatch.rt.staging
import groovy.util.logging.Log4j
import net.sf.ehcache.Cache
import net.sf.ehcache.CacheManager
import net.sf.ehcache.Element
import net.sf.ehcache.config.CacheConfiguration
import net.sf.ehcache.store.MemoryStoreEvictionPolicy
import org.springframework.util.Assert
import pbatch.BatchId
import pbatch.NotFoundException
import pbatch.staging.Predicate
import pbatch.staging.Extractor
import pbatch.staging.MissingIdentifierException
import org.springframework.util.ClassUtils
import pbatch.staging.Chunkable
import pbatch.rt.BatchImpl
import pbatch.staging.Visitor
/**
* Implements a {@link pbatch.staging.Stage} using EhCache instances, one for each
* {@link pbatch.staging.annotation.Staged} in the {@link Registry}. The caches will be configured to
* reflect the attributes of their {@link pbatch.staging.annotation.Staged} annotation.
*
* @author jstiefel
*/
@Log4j
class EhcacheStage implements StageOperations {
public static final int DEFAULT_CACHE_SIZE = 1000;
final Registry registry;
private BatchImpl batch
private CacheManager cacheManager
/**
* Builds a {@link pbatch.staging.Stage} for the specified {@link Registry}.
*/
EhcacheStage(BatchImpl batchImpl, Registry registry, CacheManager cacheManager) {
this.batch = batchImpl;
this.registry = registry
this.cacheManager = cacheManager
Assert.notNull(batch, 'A pbatch is required')
Assert.notNull(registry, 'A registry is required');
Assert.notNull(cacheManager, 'A cache manager is required');
registry.descriptors.each { Descriptor descriptor ->
def size = getCacheSize(descriptor)
def name = getCacheName(descriptor)
CacheConfiguration config = new CacheConfiguration(name, size)
.memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU)
.overflowToDisk(true)
.eternal(true)
if (log.infoEnabled)
log.info("Building stage for pbatch ${batch.id} named ${name} of size ${size} ")
Cache cache = new Cache(config)
cacheManager.addCache(cache)
}
}
@Override
void clear() {
log.info("Clearing all staged data for ${batch.id}")
registry.descriptors.each { Descriptor descriptor ->
log.info("Clearing ${ClassUtils.getShortName(descriptor.type)} of ${batch.id}")
cacheManager.getCache(getCacheName(descriptor)).removeAll();
}
}
@Override
void flush(Class<? extends Serializable> type) {
log.warn("EhcacheStage#flush() is not implemented due to some really odd behavior.")
// if (type) {
// if (log.debugEnabled)
// log.debug("Flushing stage for ${batchId}#${type}")
//Cache cache = cacheManager.getCache(getCacheName(registry.getDescriptor(type)))
//cache.flush();
// } else {
// if (log.debugEnabled)
// log.debug("Flushing all stages for ${batchId}")
// registry.descriptors.each { flush(it.type) }
// }
}
@Override
int count(Class<? extends Serializable> type) {
Cache cache = cacheManager.getCache(getCacheName(registry.getDescriptor(type)))
cache.getSize()
}
@Override
def <T extends Serializable> boolean exists(Class<T> type, Serializable identifier) {
Cache cache = cacheManager.getCache(getCacheName(registry.getDescriptor(type)))
cache.isKeyInCache(identifier);
}
def <T extends Serializable> T get(Class<T> type, Serializable identifier) throws NotFoundException {
Cache cache = cacheManager.getCache(getCacheName(registry.getDescriptor(type)));
Element element = cache.get(identifier);
if (!element)
throw new NotFoundException(batch.id, identifier);
(T)element.value
}
@Override
def <T extends Serializable> void save(List<T> entities) {
if (entities.empty)
return
Descriptor descriptor = registry.getDescriptor(entities.first().getClass())
Cache cache = cacheManager.getCache(getCacheName(descriptor));
entities.each {
if (it.getClass() != descriptor.type)
throw new IllegalArgumentException("Attempt to save a list of varied entity types")
Serializable identifier = descriptor.getIdentifier(it);
if (!identifier)
throw new MissingIdentifierException(batch.id, it.getClass().toString())
cache.put(new Element(identifier, it))
}
}
/**
* TODO This method needs refactoring to use the {@link #findAll(Class, Predicate)} implementation with an
* intelligent {@link Predicate} in lieu of the more complicated implementation here.
*/
@Override
def <T extends Serializable> List<T> findAll(Class<T> type, int pageSize, int page) {
if (log.debugEnabled)
log.debug("Reading page ${page}*${pageSize} of ${batch.id}#${type}")
Cache cache = cacheManager.getCache(getCacheName(registry.getDescriptor(type)))
List keys = cache.keys.sort();
if (keys.empty)
return [];
int start = (pageSize * page)
if (start < 0)
throw new IllegalStateException("Invalid page ${page} + pageSize + ${pageSize}")
if (start > (keys.size() - 1)) {
if (log.debugEnabled)
log.debug("Read last page of ${batch.id}#${type}")
return [];
}
int end = (start + pageSize) - 1;
if (end > (keys.size() - 1))
end = (keys.size() - 1)
keys[start..end].collectAll { cache.get(it).value }
}
@Override
def <T extends Serializable> List<T> findAll(Class<T> type) {
findAll(type, Predicate.EMPTY)
}
@Override
def <T extends Serializable> List<T> findAll(Class<T> type, Predicate predicate) {
Cache cache = cacheManager.getCache(getCacheName(registry.getDescriptor(type)));
cache.keys.findAll { predicate.apply((Serializable)it) }.collectAll { cache.get(it).value }.sort()
}
@Override
def <T extends Serializable> T find(Class<T> type, Predicate predicate) {
Cache cache = cacheManager.getCache(getCacheName(registry.getDescriptor(type)));
Serializable key = (Serializable)cache.keys.find { predicate.apply((Serializable)it) }
if (!key)
throw new NotFoundException(batch.id, predicate.toString());
get(type, key);
}
@Override
def <T extends Serializable, AT> List<AT> extract(Class<T> type, Class<AT> attributeType, Predicate predicate, Extractor<T, AT> extractor) {
findAll(type, predicate).collect { (AT)extractor.extract(it) }
}
@Override
def <T extends Serializable> void visit(Class<T> type, Predicate predicate, Visitor<T> visitor) {
Cache cache = cacheManager.getCache(getCacheName(registry.getDescriptor(type)));
int index = 0;
findKeys(cache, type, predicate).each {
visitor.visit(index++, cache.get(it).value)
}
}
/**
* Returns a list of keys matching the {@link Predicate}.
*/
protected <T extends Serializable> List<Serializable> findKeys(Cache cache, Class<T> type, Predicate predicate) {
cache.keys.findAll { predicate.apply((Serializable)it) }
}
/**
* Takes the highest of the {@link pbatch.staging.Chunkable#getChunkSize} from the entity
* {@link pbatch.staging.Loader} or {@link pbatch.staging.Processor}, or just uses the
* {@link #DEFAULT_CACHE_SIZE}, whichever is largest.
*/
protected int getCacheSize(Descriptor descriptor) {
int size = DEFAULT_CACHE_SIZE
[descriptor.loader,descriptor.processor].each {
if (it) {
Chunkable chunkable = batch.context.getBean(it)
if (chunkable.chunkSize > size)
size = chunkable.chunkSize
}
}
size
}
/**
* Prepends the {@link BatchId} to the {@link Descriptor#type} name to create a unique name for the specific
* cache.
*/
protected String getCacheName(Descriptor descriptor) {
"${batch.id}_${descriptor.type.name}"
}
}
package pbatch.staging;
import pbatch.NotFoundException;
import java.io.Serializable;
import java.util.List;
/**
* A referential datastore abstraction that maps a set of "@{@link pbatch.staging.annotation.Staged}"
* annotations to a persistence implementation like ehcache. Entities are loaded into the stage using
* {@link pbatch.Batch#load(List)} or the more efficient
* {@link pbatch.Batch#load(Class, org.springframework.core.io.Resource)} which takes advantage of a memory
* and garbage collection efficient chunking strategy within Spring Batch and allows batch sizes to approach gigabytes
* with a standard VM heap size without falling over.
* <p/>
* Entities are discovered by the {@code registry} using the
* {@link pbatch.Config#CONFIG_PACKAGES_TO_SCAN}
* configuration override. Each entity generates a graph of {@link pbatch.staging.annotation.Staged},
* {@link pbatch.staging.annotation.StagedIdentifier} and
* {@link pbatch.staging.annotation.StagedAttribute} references that are persistable and the
* {@link Loader}s, {@link Transformer}s
* and {@link Processor}s that act upon them in the lifecycle of a {@link pbatch.Batch}.
*
* @see pbatch.staging.annotation.Staged
* @see pbatch.staging.annotation.StagedAttribute
* @see Loader
* @see Processor
*
* @author jstiefel
*/
public interface Stage {
/**
* Returns a count of the specified {@link Stage} data.
*/
int count(Class<? extends Serializable> type);
/**
* Indicates if a {@link pbatch.staging.annotation.Staged} entity with the specified identifier exists
* in the {@code Stage}.
*/
<T extends Serializable> boolean exists(Class<T> type, Serializable identifier);
/**
* Finds the specified {@link pbatch.staging.annotation.Staged}. Should be the most efficient use of
* caching, requires a fully formed identifier for the entity (this includes the nested
* {@link pbatch.BatchId}.
*/
<T extends Serializable> T get(Class<T> type, Serializable identifier) throws NotFoundException;
/**
* Finds all {@link pbatch.staging.annotation.Staged} of the specified type using an "empty" predicate
* delegated to {@link #findAll(Class, Predicate)}.
*
* @see Predicate#EMPTY
*/
<T extends Serializable> List<T> findAll(Class<T> type);
/**
* Finds all of the specified {@link pbatch.staging.annotation.Staged} using the provided
* {@link Predicate}. Ordering is based on a natural sort of the keys of the entities, so implement
* {@link Comparable} if want to impose your own order.
*/
<T extends Serializable> List<T> findAll(Class<T> type, Predicate predicate);
/**
* Searches against the identifiers of a {@code Stage} for those that match the {@code predicate}.
*
* @throws NotFoundException When nothing matches. The "identifier" in the exception will be the
* {@code toString} of the {@link Predicate}.
*/
<T extends Serializable> T find(Class<T> type, Predicate predicate) throws NotFoundException;
/**
* Finds a page of {@link pbatch.staging.annotation.Staged} entities. Ordering is imposed by
* {@link #findAll(Class, Predicate)}.
*/
<T extends Serializable> List<T> findAll(Class<T> type, int pageSize, int page);
/**
* Enables extraction of data from each entity identified by the {@code predicate}, returning a list of whatever
* you're interested in in the order imposed by {@link #findAll(Class)}. Ordering is imposed by
* {@link #findAll(Class, Predicate)}
*/
<T extends Serializable, AT> List<AT> extract(Class<T> type, Class<AT> attributeType, Predicate predicate, Extractor<T, AT> extractor);
/**
* Visits each entity matching the {@link Predicate}, executing {@link Visitor#visit(int, java.io.Serializable)}.
* Ordering is imposed by {@link #findAll(Class, Predicate)}
*/
<T extends Serializable> void visit(Class<T> type, Predicate predicate, Visitor<T> visitor);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment