Skip to content

Instantly share code, notes, and snippets.

@yuikns
Created February 12, 2015 01:46
Show Gist options
  • Save yuikns/aeef32ddfac7296e0424 to your computer and use it in GitHub Desktop.
Save yuikns/aeef32ddfac7296e0424 to your computer and use it in GitHub Desktop.
Producer And Consumer
#include <cstdio>
#include <cstring>
#include <pthread.h> // thread
#include <unistd.h> // usleep
#include <string>
#include <mutex> // std::lock_guard , mutex
#include <queue> // queue
template<class T>
class Pool{
public:
Pool(size_t max_size = 0 , size_t spin_time = 100 ) : max_size(max_size), spin_time(spin_time){ in_use = true;}
~Pool(){}
void push(const T& t) {
while(!try_push(t)){
usleep(spin_time);
}
}
bool pop(T * _val) {
int st;
while((st = try_pop(_val)) == 1) {
usleep(spin_time);
}
return st == 0 ;
}
inline bool try_push(const T& t) {
std::lock_guard<std::mutex> l(m_mutex);
if(max_size == 0 || m_queue.size() < max_size) {
m_queue.push(t);
return true;
}else {
return false;
}
}
// 0 : found
// 1 : to be continue
// 2 : all stopped
inline int try_pop(T * _val) {
std::lock_guard<std::mutex> l(m_mutex);
if(m_queue.empty()){
return in_use ? 1 : 2;
} else {
* _val = m_queue.front();
m_queue.pop();
return 0;
}
}
void stop_signal() {
std::lock_guard<std::mutex> l(m_mutex);
in_use = false;
}
bool empty() { std::lock_guard<std::mutex> l(m_mutex); return m_queue.empty(); }
size_t size() const { std::lock_guard<std::mutex> l(m_mutex); return m_queue.size(); }
T& front() { std::lock_guard<std::mutex> l(m_mutex); return m_queue.front(); }
const T& front() const { std::lock_guard<std::mutex> l(m_mutex); return m_queue.front(); }
private:
size_t max_size;
size_t spin_time;
bool in_use;
std::queue<T> m_queue;
std::mutex m_mutex;
};//endof Pool
class ThreadUtil {
public:
~ThreadUtil() {}
void start() {
pthread_create(&thread, 0, ThreadUtil::dispatch, this);
}
void join() {
pthread_join(thread, 0);
}
protected:
virtual void run() = 0;
private:
pthread_t thread;
static void * dispatch(void * ptr) {
if (!ptr) return 0;
static_cast<ThreadUtil *>(ptr)->run();
pthread_exit(ptr);
return 0;
}
};
class Producer : public ThreadUtil {
public:
Producer(Pool<std::string> * pool, std::string id) : pool(pool), id(id), num_items(0) {}
void produce(size_t num_items)
{
if (!pool) return;
if (num_items <= 0) return;
this->num_items = num_items;
start();
}
protected:
void run()
{
if (!pool) return;
for (size_t i = 0; i < num_items; ++i)
{
std::string to_push = id + "#" + std::to_string(i); // not suported
printf("push: %s\n",to_push.c_str());
pool->push(to_push);
usleep(100);
}
printf("push: stop signal\n");
pool->stop_signal();
}
private:
Pool<std::string> * pool;
std::string id;
size_t num_items;
};
class Consumer : public ThreadUtil {
public:
Consumer(Pool<std::string> * pool, std::string id, std::mutex * _mtx) : pool(pool), id(id), _mtx(_mtx){}
void consume()
{
if (!pool) return;
start();
}
protected:
void run()
{
if (!pool) return;
int count = 0;
while (true)
{
std::string item;
bool status = pool->pop(&item);
if(status){
_mtx->lock();
printf("[%s:%d] : <%s> \n", id.c_str(), count, item.c_str());
count ++ ;
_mtx->unlock();
}else{
_mtx->lock();
printf("[%s:%d] : empty, exit \n", id.c_str(), count);
count ++ ;
_mtx->unlock();
break;
}
}
}
private:
Pool<std::string> * pool;
std::string id;
std::mutex * _mtx;
};
int main(int argc, char* argv[]) {
std::mutex mtx;
Pool<std::string> pool(2,10000);
Producer prod(&pool, "Producer 1");
Consumer cons0(&pool, "Consumer 0", &mtx);
Consumer cons1(&pool, "Consumer 1", &mtx);
cons0.consume();
cons1.consume();
prod.produce(10);
prod.join();
cons0.join();
cons1.join();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment