Created
September 15, 2011 02:22
-
-
Save tux21b/1218360 to your computer and use it in GitHub Desktop.
Disruptor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright (C) 2011 by Christoph Hack. All rights reserved. | |
// Use of this source code is governed by the New BSD License. | |
/* | |
This program contains a simple micro-benchmark for a Disruptor [1] like | |
event buffer (or at least what I think that Disruptor might be). There | |
isn't any kind of API yet and this test is currently limited to a | |
single producer - single consumer architecture, but generally the | |
architecture of Disruptor is also well suited for multiple producers and | |
consumers. | |
According to the technical paper about Disruptor [2] it's design has | |
several benefits over traditional message queues. Some of the | |
highlights are: | |
* Preallocated Ring Buffers are faster than Single Linked Lists because | |
they can utilize the caches better and they produce no garbage. | |
* The lock free implementation avoids context switches which might lead | |
the queue to be rescheduled on another core with an empty cache. | |
* Producers and consumers will start to batch event messaging / receiving | |
as soon as they fall behind. This avoids some synchronization overhead. | |
* Disrupter like message buffers can be used in interesting architectures, | |
like that one from LMAX [3]. The full article can be found here [4]. | |
Disclaimer: I haven't done any other concurrent programs yet, and i doubt | |
that the implementation is correct. So please have patience. | |
[1]: http://code.google.com/p/disruptor/ | |
[2]: http://disruptor.googlecode.com/files/Disruptor-1.0.pdf | |
[3]: http://martinfowler.com/articles/images/lmax/arch-full.png | |
[4]: http://martinfowler.com/articles/lmax.html | |
*/ | |
package disruptor | |
import ( | |
"testing" | |
"sync/atomic" | |
) | |
const BufferSize = 2048 | |
const BufferMask = BufferSize - 1 | |
// A sequence is 64 bytes large (the usual size of todays cache lines) to | |
// avoid false sharing. Only the first element should be used. | |
type Sequence [8]uint64 | |
func BenchmarkChannels(b *testing.B) { | |
ch := make(chan int, BufferSize) | |
go func() { | |
for i := 0; i < b.N; i++ { | |
ch <- int(i) | |
} | |
}() | |
for i := 0; i < b.N; i++ { | |
val := <-ch | |
if val != int(i) { | |
panic("invalid result") | |
} | |
} | |
} | |
func BenchmarkDisruptor(b *testing.B) { | |
var ring [BufferSize]int // the ring buffer | |
var cseq, pseq Sequence // consumer and producer sequence number (+padding) | |
go func() { // producer goroutine | |
var seq, max_seq Sequence | |
for seq[0] = uint64(0); seq[0] < uint64(b.N); seq[0]++ { | |
// busy spin until there is room for writing | |
for seq[0] >= max_seq[0] { | |
max_seq[0] = atomic.LoadUint64(&cseq[0]) + BufferSize - 2 | |
} | |
// send the message | |
ring[seq[0]&BufferMask] = int(seq[0]) | |
atomic.StoreUint64(&pseq[0], seq[0]+1) // Better: LazyStoreUint64 | |
} | |
}() | |
var seq, max_seq Sequence | |
for seq[0] = uint64(0); seq[0] < uint64(b.N); seq[0]++ { | |
// busy spin until there is data available | |
for seq[0] >= max_seq[0] { | |
max_seq[0] = atomic.LoadUint64(&pseq[0]) | |
} | |
// data can now be read from ring[seq&BufferMask]. | |
val := ring[seq[0]&BufferMask] | |
atomic.StoreUint64(&cseq[0], seq[0]) // Better: LazyStoreUint64() | |
if val != int(seq[0]) { | |
panic("invalid result") | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright (C) 2011 by Christoph Hack. All rights reserved. | |
// Use of this source code is governed by the New BSD License. | |
/** | |
* Disruptor | |
* | |
* Compile with: | |
* g++ -std=gnu++0x -pthread -Wall -o disruptor -O3 disruptor.cc | |
*/ | |
#include <thread> | |
#include <atomic> | |
#include <iostream> | |
#include <pthread.h> | |
#include <errno.h> | |
const unsigned long N = 10e9; | |
const int RING_SIZE = 2048; | |
const int RING_MASK = RING_SIZE - 1; | |
unsigned long ring[RING_SIZE]; | |
std::atomic<unsigned long> __attribute__((aligned(64))) pseq(0); | |
std::atomic<unsigned long> __attribute__((aligned(64))) cseq(0); | |
static void bind_to_cpu(int cpu) | |
{ | |
cpu_set_t mask; | |
CPU_ZERO(&mask); | |
CPU_SET(cpu, &mask); | |
if (pthread_setaffinity_np(pthread_self(), sizeof(mask), &mask) < 0) { | |
perror("pthread_setaffinity_np"); | |
} | |
} | |
void producer_thread() | |
{ | |
bind_to_cpu(1); | |
unsigned long max_seq(0); | |
for (unsigned long seq = 0; seq <= N; seq++) | |
{ | |
// busy spin until there is room for writing | |
while (seq >= max_seq) { | |
max_seq = cseq.load(); | |
max_seq += RING_SIZE - 2; | |
} | |
// send the message | |
ring[seq&RING_MASK] = seq; | |
pseq.store(seq+1, std::memory_order_release); | |
} | |
} | |
int main() | |
{ | |
std::thread t(producer_thread); | |
bind_to_cpu(0); | |
unsigned long max_seq(0); | |
for (unsigned long seq = 0; seq <= N; seq++) | |
{ | |
// busy spin until there is data available | |
while (seq >= max_seq) { | |
max_seq = pseq.load(); | |
} | |
// retrieve message | |
unsigned long msg = ring[seq&RING_MASK]; | |
cseq.store(seq, std::memory_order_release); | |
if (msg != seq) { | |
std::cerr << "expected " << seq << " got " << msg << std::endl; | |
} | |
} | |
t.join(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment