Last active
August 29, 2015 13:57
-
-
Save tarqd/9560518 to your computer and use it in GitHub Desktop.
Simple Standalone Single Producer Single Consumer Queue (Based on Facebook/Folly)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// based on facebook/folly/ProducerConsumerQueue.h | |
// main difference is the inclusion of a pop_all() method | |
// and more stl-ish method names | |
// and it uses a vector for the underlying storage (which let's us leave out lots of manual cleanup/memory management) | |
// this is fine because we never resize the vector | |
// also the capacity parameter is the usable capacity, this class will automatically allocate one more for the dummy value | |
// original copyright: | |
/* | |
* Copyright 2014 Facebook, Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
// @author Bo Hu ([email protected]) | |
// @author Jordan DeLong ([email protected]) | |
#ifndef SPSC_QUEUE_H_ | |
#define SPSC_QUEUE_H_ | |
#include <vector> | |
#include <cstdlib> | |
template <class T> | |
class spsc_queue { | |
public: | |
using size_type = std::size_t; | |
using difference_type = std::ptrdiff_t; | |
using value_type = typename std::decay<T>::type; | |
using pointer = typename std::add_pointer<value_type>::type; | |
using const_pointer = typename std::add_const<pointer>::type; | |
using reference = typename std::add_lvalue_reference<value_type>::type; | |
using const_reference = typename std::add_const<reference>::type; | |
spsc_queue(size_type capacity = 16) : | |
m_write_index(0), | |
m_read_index(0), | |
m_capacity(capacity+1), // +1 capacity for dummy value that indicates "empty" | |
m_buffer(m_capacity) | |
{ | |
}; | |
bool try_push(const_reference value) { | |
auto const write_index = m_write_index.load(std::memory_order_relaxed); | |
auto next_index = write_index + 1; | |
if (next_index == m_capacity) { | |
next_index = 0; | |
} | |
if (next_index != m_read_index.load(std::memory_order_acquire)) { | |
m_buffer[write_index] = value; | |
m_write_index.store(next_index, std::memory_order_release); | |
return true; | |
} | |
// queue is full | |
return false; | |
} | |
// same as above but takes an R-value reference (doesn't work with lambdas :( ) | |
bool try_push(value_type&& value) { | |
auto const write_index = m_write_index.load(std::memory_order_relaxed); | |
auto next_index = write_index + 1; | |
if (next_index == m_capacity) { | |
next_index = 0; | |
} | |
if (next_index != m_read_index.load(std::memory_order_acquire)) { | |
m_buffer[write_index] = std::move(value); | |
m_write_index.store(next_index, std::memory_order_release); | |
return true; | |
} | |
// queue is full | |
return false; | |
} | |
// try and pop an element off the queue | |
// uses 2 atomic reads and 1 atomic write | |
// generally you'll want to use pop_all() instead | |
bool try_pop(reference value) { | |
auto const read_index = m_read_index.load(std::memory_order_relaxed); | |
if (read_index == m_write_index.load(std::memory_order_acquire)) { | |
// queue is empty | |
return false; | |
} | |
auto next_index = read_index + 1; | |
if (next_index == m_capacity) { | |
next_index = 0; | |
} | |
value = std::move(m_buffer[read_index]); | |
m_read_index.store(next_index, std::memory_order_release); | |
return true; | |
} | |
// use this for message queues and the like | |
// copies the current elements into a vector and returns it | |
// only uses 2 atomic reads and 1 atomic write for N elements | |
// whereas using try_pop() in a loop you'd use 2N atomic reads and N atomic writes | |
// plus we it's faster to move all the elements out of the underlying buffer since | |
// they'll likely be in cache already. should be Fast Enough (TM) for most message queues | |
std::vector<value_type> pop_all() { | |
std::vector<value_type> value; | |
using std::make_move_iterator; | |
auto const read_index = m_read_index.load(std::memory_order_relaxed); | |
auto const write_index = m_write_index.load(std::memory_order_acquire); | |
if (read_index == write_index) { | |
// queue is empty | |
return value; | |
} | |
const bool is_wrapped = write_index < read_index; | |
auto const buffer_begin = m_buffer.begin(); | |
auto const read_begin = buffer_begin + read_index; | |
auto const read_end = buffer_begin + write_index; | |
// fast case, linear copy | |
if (LIKELY(!is_wrapped)) { | |
assert(read_begin < read_end); | |
assert(read_end <= m_buffer.end()); | |
value.insert(value.begin(), make_move_iterator(read_begin), make_move_iterator(read_end)); | |
} else { | |
// slow case, copy to end and then wrap around | |
// technically we could check if the wrap is false (write_index == 0) | |
// but additional branching would slow down the fast path | |
auto const buffer_end = m_buffer.end(); | |
size_type initial_size = std::distance(read_begin, buffer_end); | |
size_type total_size = initial_size + std::distance(buffer_begin, read_end); | |
value.reserve(total_size); | |
assert(read_begin < buffer_end); | |
value.insert(value.begin(), make_move_iterator(read_begin), make_move_iterator(buffer_end)); | |
assert(buffer_begin <= read_end); | |
assert(read_end <= buffer_end); | |
value.insert(value.end(), make_move_iterator(buffer_begin), make_move_iterator(read_end)); | |
assert(value.size() == total_size); | |
} | |
// we've emptied the queue (as far as we know) | |
m_read_index.store(write_index, std::memory_order_release); | |
return value; | |
} | |
bool is_empty() const { | |
return m_write_index.load(std::memory_order_consume) == m_read_index.load(std::memory_order_consume); | |
} | |
bool is_full() const { | |
auto next_index = m_write_index.load(std::memory_order_consume) + 1; | |
if (next_index == m_capacity) { | |
next_index = 0; | |
} | |
if (next_index != m_read_index.load(std::memory_order_consume)) { | |
return false; | |
} | |
return true; | |
} | |
// obviously this is an estimate if you're concurrently writing to the queue | |
// can be stupidly inaccurate if you're using it from a producer | |
size_type size() const { | |
difference_type ret = m_write_index.load(std::memory_order_consume) - m_read_index.load(std::memory_order_consume); | |
return ret < 0 ? ret + m_capacity : ret; | |
}; | |
// true capacity of the queue is size()-1 because a dummy element is required to check for emptiness | |
size_type capacity() const { | |
return m_buffer.size() - 1; | |
}; | |
// queue is a fixed size, just return the capacity | |
size_type max_size() const { | |
return capacity(); | |
} | |
private: | |
std::atomic<size_t> m_write_index, m_read_index, m_size; | |
const size_t m_capacity; | |
std::vector<value_type> m_buffer; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment