Skip to content

Instantly share code, notes, and snippets.

@fuyufjh
Last active February 2, 2020 20:46
Show Gist options
  • Save fuyufjh/9657835df8202b29af8be1610c8327ad to your computer and use it in GitHub Desktop.
Save fuyufjh/9657835df8202b29af8be1610c8327ad to your computer and use it in GitHub Desktop.
Performance Test: BlockingQueue (Java) vs. Channel (Go)
package main
import (
"fmt"
"sync"
"time"
)
const channelNum = 8
const channelBufferSize = 65536
func main() {
var channels [channelNum]chan *Integer
channels[0] = make(chan *Integer, channelBufferSize)
for i := 1; i < channelNum; i++ {
channels[i] = make(chan *Integer, channelBufferSize)
go adder(i, channels[i-1], channels[i])
}
var wg sync.WaitGroup
wg.Add(1)
go ender(channels[channelNum-1], &wg)
startTime := time.Now()
var x int64
for x = 0; x < 10000000; x++ {
channels[0] <- &Integer{x}
}
close(channels[0])
wg.Wait()
fmt.Println("TIME:", time.Since(startTime))
}
type Integer struct {
value int64
}
func adder(no int, in <-chan *Integer, out chan<- *Integer) {
for {
if recv := <-in; recv != nil {
out <- recv;
} else {
close(out)
break
}
}
fmt.Printf("adder %d exited\n", no);
}
func ender(in <-chan *Integer, wg *sync.WaitGroup) {
var result int64
for {
if recv := <-in; recv != nil {
result += recv.value
} else {
break
}
}
fmt.Printf("ender exited\nsum = %d\n", result);
wg.Done()
}
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class TestArrayBlockingQueuePerf {
private static final int CHANNEL_NUM = 8;
private static final int CHANNEL_BUFFER_SIZE = 65536;
public static void main(String[] args) throws InterruptedException {
List<BlockingQueue<Integer>> channels = new ArrayList<>(CHANNEL_NUM);
channels.add(new ArrayBlockingQueue<>(CHANNEL_BUFFER_SIZE));
for (int i = 1; i < CHANNEL_NUM; i++) {
channels.add(new ArrayBlockingQueue<>(CHANNEL_BUFFER_SIZE));
new Thread(new Adder(i, channels.get(i - 1), channels.get(i))).start();
}
long startTime = System.currentTimeMillis();
Thread threadEnder = new Thread(new Ender(channels.get(CHANNEL_NUM - 1)));
threadEnder.start();
BlockingQueue<Integer> initChannel = channels.get(0);
for (int x = 0; x < 10000000; x++) {
initChannel.put(x);
}
initChannel.put(-1);
threadEnder.join();
System.out.println((System.currentTimeMillis() - startTime) / 1000.0);
}
static class Adder implements Runnable {
private final int no;
private final BlockingQueue<Integer> inputQueue;
private final BlockingQueue<Integer> outputQueue;
public Adder(int no, BlockingQueue<Integer> inputQueue, BlockingQueue<Integer> outputQueue) {
this.no = no;
this.inputQueue = inputQueue;
this.outputQueue = outputQueue;
}
@Override
public void run() {
Integer x;
try {
do {
x = inputQueue.take();
outputQueue.put(x);
} while (x != -1);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
System.out.println("Adder " + no + " exited");
}
}
static class Ender implements Runnable {
private final BlockingQueue<Integer> inputQueue;
public Ender(BlockingQueue<Integer> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
long sum = 0;
Integer x;
try {
while (true) {
x = inputQueue.take();
if (x != -1) {
sum += x;
} else {
break;
}
}
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
System.out.println("Ender exited, sum = " + sum);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment