Last active
August 29, 2015 14:27
-
-
Save thiagoh/ab54ca1049760cd7fe82 to your computer and use it in GitHub Desktop.
Input stream with multiple sources
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.io.InputStream; | |
import java.util.concurrent.ArrayBlockingQueue; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.ThreadFactory; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
public class ExtendedInputStream extends InputStream { | |
private static final Log _log = LogFactory.getLog(ExtendedInputStream.class.getName()); | |
private final BlockingQueue<InputStream> queue; | |
private final Thread producerThread; | |
private final InputStreamProducer producer; | |
private final ExtendedInputStreamProdutor runnable; | |
private IOException _error; | |
private boolean _producerFinished = false; | |
private boolean _startedReading = false; | |
private InputStream currentInputStream; | |
private static final ThreadFactory _namedThreadFactory = new NamedThreadFactory(); | |
private static final int INITIAL_CAPACITY = 10; | |
public ExtendedInputStream(InputStreamProducer producer) { | |
this.producer = producer; | |
this.runnable = new ExtendedInputStreamProdutor(); | |
this.queue = new ArrayBlockingQueue<InputStream>(INITIAL_CAPACITY, true); | |
this.producerThread = _namedThreadFactory.newThread(runnable); | |
} | |
private static class NamedThreadFactory implements ThreadFactory { | |
private static AtomicInteger atomicInteger = new AtomicInteger(0); | |
@Override | |
public Thread newThread(Runnable runnable) { | |
ThreadGroup threadGroup = Thread.currentThread().getThreadGroup(); | |
Thread thread = new Thread(threadGroup, runnable, | |
ExtendedInputStream.class.getName() + "_" + atomicInteger.getAndIncrement()); | |
thread.setDaemon(true); | |
return thread; | |
} | |
} | |
private void start() throws IOException { | |
_startedReading = true; | |
producerThread.start(); | |
fillNextInputStream(); | |
} | |
private boolean _checkError() throws IOException { | |
if (_error != null) { | |
throw _error; | |
} | |
return true; | |
} | |
private synchronized boolean fillNextInputStream() throws IOException { | |
try { | |
// If is finished stop consuming from queue | |
if (queue.isEmpty() && _producerFinished) { | |
return false; | |
} | |
// If there is an error stop consuming from queue | |
if (_error != null) { | |
currentInputStream = null; | |
return false; | |
} | |
InputStream is = null; | |
while (_checkError() && (is == null) && (!queue.isEmpty() || !_producerFinished)) { | |
is = queue.poll(10, TimeUnit.MILLISECONDS); | |
} | |
currentInputStream = is; | |
return is != null; | |
} catch (InterruptedException ie) { | |
_error = new IOException(ie); | |
try { | |
this.producerThread.interrupt(); | |
} catch (Exception e2) { | |
if (_log.isDebugEnabled()) { | |
_log.debug(e2); | |
} | |
} | |
throw _error; | |
} | |
} | |
@Override | |
public long skip(long n) throws IOException { | |
return currentInputStream.skip(n); | |
} | |
@Override | |
public synchronized void mark(int readlimit) { | |
currentInputStream.mark(readlimit); | |
} | |
@Override | |
public synchronized void reset() throws IOException { | |
throw new IOException("mark/reset not supported"); | |
} | |
@Override | |
public boolean markSupported() { | |
return currentInputStream.markSupported(); | |
} | |
@Override | |
public int available() throws IOException { | |
return currentInputStream.available(); | |
} | |
@Override | |
public int read(byte[] b) throws IOException { | |
return read(b, 0, b.length); | |
} | |
@Override | |
public synchronized int read() throws IOException { | |
if (!_startedReading) { | |
start(); | |
} | |
if (currentInputStream == null) { | |
return -1; | |
} | |
_checkError(); | |
int byteRead = currentInputStream.read(); | |
if (byteRead == -1) { | |
if (!fillNextInputStream() && _checkError()) { | |
return -1; | |
} | |
byteRead = currentInputStream.read(); | |
} | |
return byteRead; | |
} | |
@Override | |
public synchronized int read(byte b[], int off, int len) throws IOException { | |
if (!_startedReading) { | |
start(); | |
} | |
if (currentInputStream == null) { | |
return -1; | |
} | |
_checkError(); | |
int count = currentInputStream.read(b, off, len); | |
if (count == -1) { | |
if (!fillNextInputStream() && _checkError()) { | |
return -1; | |
} | |
count = currentInputStream.read(b, off, len); | |
} | |
return count; | |
} | |
@Override | |
public void close() throws IOException { | |
super.close(); | |
try { | |
this.producerThread.interrupt(); | |
} catch (Exception e) { | |
if (_log.isDebugEnabled()) { | |
_log.debug(e); | |
} | |
} | |
} | |
private void _error(IOException e) { | |
_error = e; | |
try { | |
this.producerThread.interrupt(); | |
} catch (Exception e2) { | |
if (_log.isDebugEnabled()) { | |
_log.debug(e2); | |
} | |
} | |
} | |
public void finished() { | |
_producerFinished = true; | |
} | |
public InputStream getNextInputStream() { | |
return currentInputStream; | |
} | |
public BlockingQueue<InputStream> getQueue() { | |
return queue; | |
} | |
private class ExtendedInputStreamProdutor implements Runnable { | |
@Override | |
public void run() { | |
if (_log.isDebugEnabled()) { | |
_log.debug("InputStream producer has been started"); | |
} | |
try { | |
while (producer.hasNext()) { | |
InputStream is = null; | |
try { | |
is = producer.produce(); | |
} catch (IOException e) { | |
_log.warn(e); | |
} | |
if (is != null) { | |
while (!queue.offer(is, 10, TimeUnit.MILLISECONDS)) { | |
// wait for the offer be accepted | |
} | |
} | |
} | |
// to finish the stream | |
_producerFinished = true; | |
} catch (Exception e) { | |
_log.warn(e); | |
_error(new IOException(e)); | |
} finally { | |
try { | |
close(); | |
} catch (IOException e) { | |
_log.warn(e); | |
} | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.io.InputStream; | |
public interface InputStreamProducer { | |
public InputStream produce() throws IOException; | |
public boolean hasNext(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment