Created
February 12, 2015 01:46
-
-
Save yuikns/aeef32ddfac7296e0424 to your computer and use it in GitHub Desktop.
Producer And Consumer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <cstdio> | |
#include <cstring> | |
#include <pthread.h> // thread | |
#include <unistd.h> // usleep | |
#include <string> | |
#include <mutex> // std::lock_guard , mutex | |
#include <queue> // queue | |
template<class T> | |
class Pool{ | |
public: | |
Pool(size_t max_size = 0 , size_t spin_time = 100 ) : max_size(max_size), spin_time(spin_time){ in_use = true;} | |
~Pool(){} | |
void push(const T& t) { | |
while(!try_push(t)){ | |
usleep(spin_time); | |
} | |
} | |
bool pop(T * _val) { | |
int st; | |
while((st = try_pop(_val)) == 1) { | |
usleep(spin_time); | |
} | |
return st == 0 ; | |
} | |
inline bool try_push(const T& t) { | |
std::lock_guard<std::mutex> l(m_mutex); | |
if(max_size == 0 || m_queue.size() < max_size) { | |
m_queue.push(t); | |
return true; | |
}else { | |
return false; | |
} | |
} | |
// 0 : found | |
// 1 : to be continue | |
// 2 : all stopped | |
inline int try_pop(T * _val) { | |
std::lock_guard<std::mutex> l(m_mutex); | |
if(m_queue.empty()){ | |
return in_use ? 1 : 2; | |
} else { | |
* _val = m_queue.front(); | |
m_queue.pop(); | |
return 0; | |
} | |
} | |
void stop_signal() { | |
std::lock_guard<std::mutex> l(m_mutex); | |
in_use = false; | |
} | |
bool empty() { std::lock_guard<std::mutex> l(m_mutex); return m_queue.empty(); } | |
size_t size() const { std::lock_guard<std::mutex> l(m_mutex); return m_queue.size(); } | |
T& front() { std::lock_guard<std::mutex> l(m_mutex); return m_queue.front(); } | |
const T& front() const { std::lock_guard<std::mutex> l(m_mutex); return m_queue.front(); } | |
private: | |
size_t max_size; | |
size_t spin_time; | |
bool in_use; | |
std::queue<T> m_queue; | |
std::mutex m_mutex; | |
};//endof Pool | |
class ThreadUtil { | |
public: | |
~ThreadUtil() {} | |
void start() { | |
pthread_create(&thread, 0, ThreadUtil::dispatch, this); | |
} | |
void join() { | |
pthread_join(thread, 0); | |
} | |
protected: | |
virtual void run() = 0; | |
private: | |
pthread_t thread; | |
static void * dispatch(void * ptr) { | |
if (!ptr) return 0; | |
static_cast<ThreadUtil *>(ptr)->run(); | |
pthread_exit(ptr); | |
return 0; | |
} | |
}; | |
class Producer : public ThreadUtil { | |
public: | |
Producer(Pool<std::string> * pool, std::string id) : pool(pool), id(id), num_items(0) {} | |
void produce(size_t num_items) | |
{ | |
if (!pool) return; | |
if (num_items <= 0) return; | |
this->num_items = num_items; | |
start(); | |
} | |
protected: | |
void run() | |
{ | |
if (!pool) return; | |
for (size_t i = 0; i < num_items; ++i) | |
{ | |
std::string to_push = id + "#" + std::to_string(i); // not suported | |
printf("push: %s\n",to_push.c_str()); | |
pool->push(to_push); | |
usleep(100); | |
} | |
printf("push: stop signal\n"); | |
pool->stop_signal(); | |
} | |
private: | |
Pool<std::string> * pool; | |
std::string id; | |
size_t num_items; | |
}; | |
class Consumer : public ThreadUtil { | |
public: | |
Consumer(Pool<std::string> * pool, std::string id, std::mutex * _mtx) : pool(pool), id(id), _mtx(_mtx){} | |
void consume() | |
{ | |
if (!pool) return; | |
start(); | |
} | |
protected: | |
void run() | |
{ | |
if (!pool) return; | |
int count = 0; | |
while (true) | |
{ | |
std::string item; | |
bool status = pool->pop(&item); | |
if(status){ | |
_mtx->lock(); | |
printf("[%s:%d] : <%s> \n", id.c_str(), count, item.c_str()); | |
count ++ ; | |
_mtx->unlock(); | |
}else{ | |
_mtx->lock(); | |
printf("[%s:%d] : empty, exit \n", id.c_str(), count); | |
count ++ ; | |
_mtx->unlock(); | |
break; | |
} | |
} | |
} | |
private: | |
Pool<std::string> * pool; | |
std::string id; | |
std::mutex * _mtx; | |
}; | |
int main(int argc, char* argv[]) { | |
std::mutex mtx; | |
Pool<std::string> pool(2,10000); | |
Producer prod(&pool, "Producer 1"); | |
Consumer cons0(&pool, "Consumer 0", &mtx); | |
Consumer cons1(&pool, "Consumer 1", &mtx); | |
cons0.consume(); | |
cons1.consume(); | |
prod.produce(10); | |
prod.join(); | |
cons0.join(); | |
cons1.join(); | |
return 0; | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment