Created
October 29, 2012 11:53
-
-
Save pokutuna/3973153 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class MultiplyBlock extends ProcessBlock<Integer, String> { | |
int keisuu = 1; | |
public MultiplyBlock(boolean tail, int keisuu) { | |
super(tail); | |
this.keisuu = keisuu; | |
} | |
@Override | |
public String process(Integer job) { | |
return job + " * " + keisuu + " = " + job * keisuu; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class NumberProducer extends ProcessBlock<Object, Integer>{ | |
int i = 0; | |
int max = 10000; | |
public NumberProducer(boolean tail) { | |
super(tail); | |
} | |
@Override | |
public Object takeJob() { | |
try { | |
Thread.sleep(300); | |
} catch (InterruptedException e) { | |
// TODO Auto-generated catch block | |
e.printStackTrace(); | |
} | |
Integer next = i; | |
if (next > max) { | |
this.requestExit(); | |
next = null; | |
} | |
i++; | |
return next; | |
} | |
@Override | |
public Integer process(Object job) { | |
return (Integer)job; | |
} | |
public static void main(String[] args) throws InterruptedException { | |
System.out.println("hoge"); | |
NumberProducer np = new NumberProducer(false); | |
MultiplyBlock mb10 = new MultiplyBlock(false, 10); | |
mb10.start(); | |
MultiplyBlock mb5 = new MultiplyBlock(false, 5); | |
mb5.start(); | |
MultiplyBlock mb3 = new MultiplyBlock(false, 3); | |
mb3.start(); | |
PrintBlock<String, Object> pb = new PrintBlock<String, Object>(true); | |
pb.start(); | |
np.addConsumer(mb10); | |
np.addConsumer(mb5); | |
np.addConsumer(mb3); | |
mb10.addConsumer(pb); | |
mb5.addConsumer(pb); | |
mb3.addConsumer(pb); | |
np.start(); | |
np.thread.join(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class PrintBlock<T1 extends Object, T2> extends ProcessBlock<T1, T2>{ | |
public PrintBlock(boolean tail) { | |
super(tail); | |
} | |
@Override | |
public Object process(Object job) { | |
System.out.println("[PrintBlock]: " + job.toString()); | |
return job; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import javax.management.RuntimeErrorException; | |
abstract class ProcessBlock<IN, OUT> implements Runnable { | |
Queue<IN> jobQueue = new LinkedBlockingQueue<IN>(); | |
Vector<ProcessBlock<OUT, ?>> consumers = new Vector<ProcessBlock<OUT,?>>(); | |
int balanceIndex = 0; | |
boolean isTail = false; | |
boolean exitRequested = false; | |
Thread thread; | |
public ProcessBlock(boolean tail) { | |
this.isTail = tail; | |
this.thread = new Thread(this); | |
} | |
public boolean addConsumer(ProcessBlock<OUT, ?> consumer) { | |
return this.consumers.add(consumer); | |
} | |
public boolean removeConsumer(ProcessBlock<OUT, ?> consumer) { | |
return this.consumers.remove(consumer); | |
} | |
protected boolean addJob(IN job) { | |
return this.jobQueue.offer(job); | |
} | |
synchronized protected boolean sendConsumer(OUT out) throws RuntimeException { | |
if (consumers.size() < 1) throw new RuntimeException("empty consumer"); | |
if (consumers.size() <= balanceIndex) balanceIndex = 0; | |
ProcessBlock<OUT, ?> pb = consumers.elementAt(balanceIndex); | |
balanceIndex += 1; | |
return pb.addJob(out); | |
} | |
public void start() { | |
this.thread.start(); | |
} | |
@Override | |
public void run() { | |
IN job = null; | |
while (true) { | |
job = takeJob(); | |
if (job != null) { | |
OUT out = process(job); | |
if (out != null && isTail == false) sendJob(out); | |
} else { | |
try { | |
Thread.sleep(200); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
if (exitRequested && job == null) { | |
for (ProcessBlock<OUT,?> pb : consumers) { | |
pb.requestExit(); | |
} | |
break; | |
} | |
} | |
} | |
public void requestExit() { | |
this.exitRequested = true; | |
} | |
public IN takeJob() { | |
IN job = jobQueue.poll(); | |
// if (job == null) System.out.println(this.toString() + ": empty job"); | |
return job; | |
} | |
public abstract OUT process(IN job); | |
public boolean sendJob(OUT out) { | |
return sendConsumer(out); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment