Last active
December 11, 2015 17:48
-
-
Save chiral/4636878 to your computer and use it in GitHub Desktop.
a solution of producer-consumer problem without using lock primitives.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class coro { // コルーチンもどき | |
| protected: | |
| int coro_state; | |
| public: | |
| coro():coro_state(0){} | |
| virtual ~coro(){} | |
| void init() {coro_state=0;} | |
| }; | |
| #define begin switch(coro_state){ case 0: | |
| #define end default: break;} | |
| #define yield(x) do {coro_state=__LINE__; return x; case __LINE__:; } while(0) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // 生産者消費者問題を同期プリミティブ無しで解く | |
| // 生産者1名 vs 消費者(複数名) | |
| // | |
| // コンパイルは以下で | |
| // # g++ -pthread lockless_producer_consumer.cpp | |
| #include <pthread.h> | |
| #include <stdio.h> | |
| #include <stdlib.h> | |
| #include <unistd.h> | |
| #include "coroutine.hpp" | |
| const int num_threads = 5; // 生産者1 + 消費者(残り) | |
| const int num_data = 256; // キュー(みたいなの)の長さ | |
| const int buf_size = 256; | |
| const int word_len_max=10; | |
| const int tag_reject = -1; // 返品 | |
| const int tag_empty = 0; // 空 | |
| const int tag_done = 1; // 完了 | |
| const int tag_tid = 10; // 送付済 : tag = 10 + thread_id | |
| const int printf_interval = 4096; | |
| /* | |
| data: | |
| producerは data.tag<=tag_done なデータのみ書換え可 | |
| consumerは data.tag>=tag_tid なデータのみ書換え可 | |
| 読み出しはいずれも自由に出来る | |
| idle: | |
| 消費者スレッドが自分のスレッドの状態を書き込む | |
| 読み出しは生産者スレッド | |
| */ | |
| struct data { | |
| int tag; | |
| int size,nwords; | |
| char buf[buf_size]; | |
| }; | |
| data mem[num_data]; | |
| int check_data(int tag) { | |
| for (int i=0; i<num_data; i++) { | |
| if (mem[i].tag == tag) return i; | |
| } | |
| return -1; | |
| } | |
| bool idle[num_threads]; | |
| int random_idle() { | |
| int tmp[num_threads],num=0; | |
| for (int i=1; i<num_threads; i++) { | |
| if (idle[i]) { | |
| tmp[num++]=tag_tid+i; | |
| } | |
| } | |
| if (num>0) { | |
| int r=rand()%num; | |
| int tid=tmp[r]; | |
| return tid; | |
| } | |
| return -1; | |
| } | |
| class sentence_generator : coro { | |
| int j,tid; | |
| public: | |
| char char_gen(int word_len) { | |
| if (word_len>0) { | |
| if (word_len==word_len_max || | |
| rand()%word_len_max < word_len) | |
| return ' '; | |
| } | |
| int r = rand() % (10 + 26 + 26); | |
| #define g1(n,c) if (r<n) return c; else r-=n; | |
| g1(10, '0'+r); | |
| g1(26, 'a'+r); | |
| g1(26, 'A'+r); | |
| exit(-1); | |
| } | |
| void sentence_gen(data &d) { | |
| int len=0; | |
| for (int i=0; i<d.size; i++) { | |
| char c = d.buf[i] = char_gen(len); | |
| if (c==' ') len=0; else len++; | |
| } | |
| d.buf[d.size]='\0'; | |
| } | |
| int task() { | |
| begin; | |
| for (j=0; j<num_data; j++) { | |
| if (mem[j].tag == tag_empty) { | |
| mem[j].size = rand() % (buf_size-1); | |
| sentence_gen(mem[j]); | |
| for(;;) { | |
| tid = random_idle(); | |
| if (tid>=0) { mem[j].tag=tid; break; } | |
| yield(-1); | |
| } | |
| yield(tid); | |
| } | |
| } | |
| end; | |
| init(); | |
| return -1; | |
| } | |
| }; | |
| class word_counter : coro { | |
| int c; | |
| int i; | |
| public: | |
| bool task(data &d) { | |
| begin; | |
| c = 0; | |
| d.nwords=0; | |
| for (i=0; i<d.size; i++) { | |
| if (d.buf[i]==' ') { | |
| if (c>0) { | |
| c=0; d.nwords++; | |
| if (!(d.nwords&7)) | |
| yield(false); // 8ワード毎にチェック | |
| } | |
| } else { | |
| c++; | |
| } | |
| } | |
| end; | |
| init(); | |
| return true; | |
| } | |
| }; | |
| void *producer(void *arg) { | |
| int tid = *(static_cast<int*>(arg)); // == 0 | |
| sentence_generator sg; | |
| int loop=0,gen=0,done=0,reject=0; | |
| for(;;) { | |
| int i,tid; | |
| // 処理済みデータを回収 | |
| while((i=check_data(tag_done))>=0) { | |
| done++; | |
| mem[i].tag = tag_empty; | |
| } | |
| // 返品データを再送付 | |
| while((i=check_data(tag_reject))>=0) { | |
| tid = random_idle(); | |
| if (tid<0) break; // idleスレッドがないので即中断 | |
| reject++; | |
| mem[i].tag = tid; | |
| } | |
| tid=sg.task(); | |
| if (tid>=0) gen++; | |
| loop++; | |
| if (loop%printf_interval==0) { | |
| printf("preemption=%d, gen=%d, done=%d, reject=%d (x%d)\n", | |
| loop,gen,done,reject,gen>0 ? reject/gen : 0); | |
| } | |
| } | |
| } | |
| void *consumer(void *arg) { | |
| int tid = *(static_cast<int*>(arg)); | |
| word_counter wc; | |
| data *current=NULL; | |
| for(;;) { | |
| // 自分宛のデータをチェック | |
| int i = check_data(tid); | |
| if (i>=0) { | |
| if (current==NULL) { | |
| // 手が空いてるので取掛り | |
| idle[tid-tag_tid] = false; | |
| current = mem+i; | |
| } else { | |
| // 忙しいので返品 | |
| mem[i].tag = tag_reject; | |
| } | |
| } else { | |
| if (current==NULL) continue; // ひたすら到着を待つ | |
| } | |
| bool done = wc.task(*current); | |
| if (done) { | |
| current->tag = tag_done; | |
| current = NULL; | |
| idle[tid-tag_tid] = true; | |
| } | |
| } | |
| } | |
| int main() { | |
| pthread_t th[num_threads]; | |
| int tid[num_threads]; | |
| for (int i=0; i<num_data; i++) | |
| mem[i].tag = tag_empty; | |
| for (int i=0; i<num_threads; i++) { | |
| tid[i] = tag_tid + i; // スレッドIDの値 | |
| idle[i] = true; | |
| if (i==0) { // 生産者1名 | |
| pthread_create(th+i, NULL, producer, tid+i); | |
| } else { // 残りは全て消費者 | |
| pthread_create(th+i, NULL, consumer, tid+i); | |
| } | |
| } | |
| getchar(); | |
| //sleep(600); | |
| return 0; | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment