Skip to content

Instantly share code, notes, and snippets.

@chiral
Last active December 11, 2015 17:48
Show Gist options
  • Select an option

  • Save chiral/4636878 to your computer and use it in GitHub Desktop.

Select an option

Save chiral/4636878 to your computer and use it in GitHub Desktop.
a solution of producer-consumer problem without using lock primitives.
class coro { // コルーチンもどき
protected:
int coro_state;
public:
coro():coro_state(0){}
virtual ~coro(){}
void init() {coro_state=0;}
};
#define begin switch(coro_state){ case 0:
#define end default: break;}
#define yield(x) do {coro_state=__LINE__; return x; case __LINE__:; } while(0)
// 生産者消費者問題を同期プリミティブ無しで解く
// 生産者1名 vs 消費者(複数名)
//
// コンパイルは以下で
// # g++ -pthread lockless_producer_consumer.cpp
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "coroutine.hpp"
const int num_threads = 5; // 生産者1 + 消費者(残り)
const int num_data = 256; // キュー(みたいなの)の長さ
const int buf_size = 256;
const int word_len_max=10;
const int tag_reject = -1; // 返品
const int tag_empty = 0; // 空
const int tag_done = 1; // 完了
const int tag_tid = 10; // 送付済 : tag = 10 + thread_id
const int printf_interval = 4096;
/*
data:
producerは data.tag<=tag_done なデータのみ書換え可
consumerは data.tag>=tag_tid なデータのみ書換え可
読み出しはいずれも自由に出来る
idle:
消費者スレッドが自分のスレッドの状態を書き込む
読み出しは生産者スレッド
*/
struct data {
int tag;
int size,nwords;
char buf[buf_size];
};
data mem[num_data];
int check_data(int tag) {
for (int i=0; i<num_data; i++) {
if (mem[i].tag == tag) return i;
}
return -1;
}
bool idle[num_threads];
int random_idle() {
int tmp[num_threads],num=0;
for (int i=1; i<num_threads; i++) {
if (idle[i]) {
tmp[num++]=tag_tid+i;
}
}
if (num>0) {
int r=rand()%num;
int tid=tmp[r];
return tid;
}
return -1;
}
class sentence_generator : coro {
int j,tid;
public:
char char_gen(int word_len) {
if (word_len>0) {
if (word_len==word_len_max ||
rand()%word_len_max < word_len)
return ' ';
}
int r = rand() % (10 + 26 + 26);
#define g1(n,c) if (r<n) return c; else r-=n;
g1(10, '0'+r);
g1(26, 'a'+r);
g1(26, 'A'+r);
exit(-1);
}
void sentence_gen(data &d) {
int len=0;
for (int i=0; i<d.size; i++) {
char c = d.buf[i] = char_gen(len);
if (c==' ') len=0; else len++;
}
d.buf[d.size]='\0';
}
int task() {
begin;
for (j=0; j<num_data; j++) {
if (mem[j].tag == tag_empty) {
mem[j].size = rand() % (buf_size-1);
sentence_gen(mem[j]);
for(;;) {
tid = random_idle();
if (tid>=0) { mem[j].tag=tid; break; }
yield(-1);
}
yield(tid);
}
}
end;
init();
return -1;
}
};
class word_counter : coro {
int c;
int i;
public:
bool task(data &d) {
begin;
c = 0;
d.nwords=0;
for (i=0; i<d.size; i++) {
if (d.buf[i]==' ') {
if (c>0) {
c=0; d.nwords++;
if (!(d.nwords&7))
yield(false); // 8ワード毎にチェック
}
} else {
c++;
}
}
end;
init();
return true;
}
};
void *producer(void *arg) {
int tid = *(static_cast<int*>(arg)); // == 0
sentence_generator sg;
int loop=0,gen=0,done=0,reject=0;
for(;;) {
int i,tid;
// 処理済みデータを回収
while((i=check_data(tag_done))>=0) {
done++;
mem[i].tag = tag_empty;
}
// 返品データを再送付
while((i=check_data(tag_reject))>=0) {
tid = random_idle();
if (tid<0) break; // idleスレッドがないので即中断
reject++;
mem[i].tag = tid;
}
tid=sg.task();
if (tid>=0) gen++;
loop++;
if (loop%printf_interval==0) {
printf("preemption=%d, gen=%d, done=%d, reject=%d (x%d)\n",
loop,gen,done,reject,gen>0 ? reject/gen : 0);
}
}
}
void *consumer(void *arg) {
int tid = *(static_cast<int*>(arg));
word_counter wc;
data *current=NULL;
for(;;) {
// 自分宛のデータをチェック
int i = check_data(tid);
if (i>=0) {
if (current==NULL) {
// 手が空いてるので取掛り
idle[tid-tag_tid] = false;
current = mem+i;
} else {
// 忙しいので返品
mem[i].tag = tag_reject;
}
} else {
if (current==NULL) continue; // ひたすら到着を待つ
}
bool done = wc.task(*current);
if (done) {
current->tag = tag_done;
current = NULL;
idle[tid-tag_tid] = true;
}
}
}
int main() {
pthread_t th[num_threads];
int tid[num_threads];
for (int i=0; i<num_data; i++)
mem[i].tag = tag_empty;
for (int i=0; i<num_threads; i++) {
tid[i] = tag_tid + i; // スレッドIDの値
idle[i] = true;
if (i==0) { // 生産者1名
pthread_create(th+i, NULL, producer, tid+i);
} else { // 残りは全て消費者
pthread_create(th+i, NULL, consumer, tid+i);
}
}
getchar();
//sleep(600);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment