Created
April 24, 2016 02:53
-
-
Save sunng87/285530b9199f7b560f33cb59ee936644 to your computer and use it in GitHub Desktop.
A file base blocking queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.squareup.tape.FileObjectQueue; | |
import java.io.File; | |
import java.io.IOException; | |
import java.io.Serializable; | |
import java.util.UUID; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.locks.ReentrantLock; | |
/** | |
* Created by nsun on 16-4-22. | |
*/ | |
public class InfQueue<E extends Serializable> extends LinkedBlockingQueue<E> implements BlockingQueue<E> { | |
private int memoryQueueSize; | |
private FileObjectQueue<E> backendQueue; | |
private ReentrantLock backendQueueLock = new ReentrantLock(); | |
public InfQueue(int inMemorySize) throws IOException { | |
super(inMemorySize); | |
this.memoryQueueSize = inMemorySize; | |
File backendFile = File.createTempFile(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString()); | |
backendQueue = new FileObjectQueue<E>(backendFile, new SerializableConvertor()); | |
} | |
@Override | |
public int size() { | |
return super.size() + backendQueue.size(); | |
} | |
@Override | |
public void put(E e) throws InterruptedException { | |
if (this.size() < this.memoryQueueSize) { | |
super.put(e); | |
} else { | |
backendQueueLock.lock(); | |
backendQueue.add(e); | |
backendQueueLock.unlock(); | |
} | |
} | |
@Override | |
public boolean offer(E e, long l, TimeUnit timeUnit) throws InterruptedException { | |
boolean offered = super.offer(e, l, timeUnit); | |
if (! offered) { | |
backendQueueLock.lock(); | |
this.backendQueue.add(e); | |
backendQueueLock.unlock(); | |
} | |
return true; | |
} | |
@Override | |
public boolean offer(E e) { | |
try { | |
this.put(e); | |
} catch (InterruptedException e1) { | |
} | |
return true; | |
} | |
@Override | |
public E poll(long l, TimeUnit timeUnit) throws InterruptedException { | |
backendQueueLock.lock(); | |
if (this.backendQueue.size() > 0) { | |
E obj = this.backendQueue.peek(); | |
this.backendQueue.remove(); | |
backendQueueLock.unlock(); | |
return obj; | |
} else { | |
backendQueueLock.unlock(); | |
return super.poll(l, timeUnit); | |
} | |
} | |
@Override | |
public E poll() { | |
backendQueueLock.lock(); | |
if (this.backendQueue.size() > 0) { | |
E obj = this.backendQueue.peek(); | |
this.backendQueue.remove(); | |
backendQueueLock.unlock(); | |
return obj; | |
} else { | |
backendQueueLock.unlock(); | |
return super.poll(); | |
} | |
} | |
@Override | |
public E peek() { | |
backendQueueLock.lock(); | |
if (this.backendQueue.size() > 0) { | |
E obj = this.backendQueue.peek(); | |
backendQueueLock.unlock(); | |
return obj; | |
} else { | |
backendQueueLock.unlock(); | |
return super.peek(); | |
} | |
} | |
@Override | |
public E take() throws InterruptedException { | |
backendQueueLock.lock(); | |
if (this.backendQueue.size() > 0) { | |
E obj = this.backendQueue.peek(); | |
this.backendQueue.remove(); | |
backendQueueLock.unlock(); | |
return obj; | |
} else { | |
backendQueueLock.unlock(); | |
return super.take(); | |
} | |
} | |
@Override | |
public boolean remove(Object o) { | |
throw new UnsupportedOperationException("Not work for now"); | |
} | |
@Override | |
public boolean contains(Object o) { | |
throw new UnsupportedOperationException("Not work for now"); | |
} | |
@Override | |
public void clear() { | |
throw new UnsupportedOperationException("Not work for now."); | |
} | |
@Override | |
public boolean isEmpty() { | |
return this.backendQueue.size() == 0 && super.isEmpty(); | |
} | |
public void close() { | |
this.backendQueue.close(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment