Skip to content

Instantly share code, notes, and snippets.

@pokutuna
Created October 29, 2012 11:53
Show Gist options
  • Save pokutuna/3973153 to your computer and use it in GitHub Desktop.
Save pokutuna/3973153 to your computer and use it in GitHub Desktop.
public class MultiplyBlock extends ProcessBlock<Integer, String> {
int keisuu = 1;
public MultiplyBlock(boolean tail, int keisuu) {
super(tail);
this.keisuu = keisuu;
}
@Override
public String process(Integer job) {
return job + " * " + keisuu + " = " + job * keisuu;
}
}
public class NumberProducer extends ProcessBlock<Object, Integer>{
int i = 0;
int max = 10000;
public NumberProducer(boolean tail) {
super(tail);
}
@Override
public Object takeJob() {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Integer next = i;
if (next > max) {
this.requestExit();
next = null;
}
i++;
return next;
}
@Override
public Integer process(Object job) {
return (Integer)job;
}
public static void main(String[] args) throws InterruptedException {
System.out.println("hoge");
NumberProducer np = new NumberProducer(false);
MultiplyBlock mb10 = new MultiplyBlock(false, 10);
mb10.start();
MultiplyBlock mb5 = new MultiplyBlock(false, 5);
mb5.start();
MultiplyBlock mb3 = new MultiplyBlock(false, 3);
mb3.start();
PrintBlock<String, Object> pb = new PrintBlock<String, Object>(true);
pb.start();
np.addConsumer(mb10);
np.addConsumer(mb5);
np.addConsumer(mb3);
mb10.addConsumer(pb);
mb5.addConsumer(pb);
mb3.addConsumer(pb);
np.start();
np.thread.join();
}
}
public class PrintBlock<T1 extends Object, T2> extends ProcessBlock<T1, T2>{
public PrintBlock(boolean tail) {
super(tail);
}
@Override
public Object process(Object job) {
System.out.println("[PrintBlock]: " + job.toString());
return job;
}
}
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import javax.management.RuntimeErrorException;
abstract class ProcessBlock<IN, OUT> implements Runnable {
Queue<IN> jobQueue = new LinkedBlockingQueue<IN>();
Vector<ProcessBlock<OUT, ?>> consumers = new Vector<ProcessBlock<OUT,?>>();
int balanceIndex = 0;
boolean isTail = false;
boolean exitRequested = false;
Thread thread;
public ProcessBlock(boolean tail) {
this.isTail = tail;
this.thread = new Thread(this);
}
public boolean addConsumer(ProcessBlock<OUT, ?> consumer) {
return this.consumers.add(consumer);
}
public boolean removeConsumer(ProcessBlock<OUT, ?> consumer) {
return this.consumers.remove(consumer);
}
protected boolean addJob(IN job) {
return this.jobQueue.offer(job);
}
synchronized protected boolean sendConsumer(OUT out) throws RuntimeException {
if (consumers.size() < 1) throw new RuntimeException("empty consumer");
if (consumers.size() <= balanceIndex) balanceIndex = 0;
ProcessBlock<OUT, ?> pb = consumers.elementAt(balanceIndex);
balanceIndex += 1;
return pb.addJob(out);
}
public void start() {
this.thread.start();
}
@Override
public void run() {
IN job = null;
while (true) {
job = takeJob();
if (job != null) {
OUT out = process(job);
if (out != null && isTail == false) sendJob(out);
} else {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (exitRequested && job == null) {
for (ProcessBlock<OUT,?> pb : consumers) {
pb.requestExit();
}
break;
}
}
}
public void requestExit() {
this.exitRequested = true;
}
public IN takeJob() {
IN job = jobQueue.poll();
// if (job == null) System.out.println(this.toString() + ": empty job");
return job;
}
public abstract OUT process(IN job);
public boolean sendJob(OUT out) {
return sendConsumer(out);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment