Skip to content

Instantly share code, notes, and snippets.

@langmi
Created February 3, 2012 07:23
Show Gist options
  • Save langmi/1728744 to your computer and use it in GitHub Desktop.
Save langmi/1728744 to your computer and use it in GitHub Desktop.
CompositeItemStreamReader wraps multiple ItemStreamReaders and reads them simultaneously.
import java.util.ArrayList;
import java.util.List;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
/**
* Copies the CompositeItemStream implementation, wraps its registered readers
* under one hood, so all read operations are simultaneously.
* Is not threadsafe, at least not on purpose.
*
* @author Michael R. Lange <[email protected]>
*/
public class CompositeItemStreamReader<T> implements ItemStreamReader<T> {
/** Registered ItemStreamReaders. */
private List<ItemStreamReader<?>> itemReaderStreams;
/** Mandatory Unifying Mapper Implementation. */
private UnifyingItemsMapper<T> unifyingMapper;
@Override
public T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
// read from all registered readers
List items = new ArrayList();
for (ItemStreamReader<?> itemReaderStream : itemReaderStreams) {
items.add(itemReaderStream.read());
}
// delegate to mapper
return unifyingMapper.mapItems(items);
}
/**
* Broadcast the call to update to all registered readers.
*
* @see org.springframework.batch.item.ItemStream#update(ExecutionContext)
*/
@Override
public void update(ExecutionContext executionContext) {
for (ItemStream itemStream : itemReaderStreams) {
itemStream.update(executionContext);
}
}
/**
* Broadcast the call to close to all registered readers.
*
* @throws ItemStreamException
*/
@Override
public void close() throws ItemStreamException {
for (ItemStream itemStream : itemReaderStreams) {
itemStream.close();
}
}
/**
* Broadcast the call to open to all registered readers.
*
* @throws ItemStreamException
*/
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
for (ItemStream itemStream : itemReaderStreams) {
itemStream.open(executionContext);
}
}
public void setUnifyingMapper(UnifyingItemsMapper<T> mapper) {
this.unifyingMapper = mapper;
}
/**
* Register ItemStreamReaders.
*
* @param itemReaderStreams
*/
public void setItemReaderStreams(List<ItemStreamReader<?>> itemReaderStreams) {
this.itemReaderStreams = itemReaderStreams;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment