Last active
February 16, 2018 06:33
-
-
Save kevinlynx/ba728f2f1b33c763a6c3 to your computer and use it in GitHub Desktop.
simple read/write thread memory gc
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "thread_id_pool.h" | |
#include <pthread.h> | |
#include <assert.h> | |
static pthread_key_t s_tkey; | |
static pthread_once_t s_tkey_once; | |
static void freeKey(void *p) | |
{ | |
ThreadIdPool::TVal *val = (ThreadIdPool::TVal*) p; | |
val->self->destroyVal(val); | |
} | |
static void makeKey() | |
{ | |
pthread_key_create(&s_tkey, freeKey); | |
} | |
ThreadIdPool::TVal *ThreadIdPool::createVal(size_t id) | |
{ | |
TVal *val = new TVal(); | |
val->self = this; | |
val->self->addRef(); | |
val->id = id; | |
return val; | |
} | |
void ThreadIdPool::destroyVal(TVal *val) | |
{ | |
assert(val->self == this); | |
val->self->put(val->id); | |
val->self->release(); | |
delete val; | |
} | |
ThreadIdPool::ThreadIdPool() | |
{ | |
for (int i = 0; i < TPG_TID_COUNT; ++i) { | |
_ids[i] = i + 1; | |
} | |
} | |
size_t ThreadIdPool::get() | |
{ | |
pthread_once(&s_tkey_once, makeKey); | |
TVal *val = (TVal*) pthread_getspecific(s_tkey); | |
if (val != NULL) return val->id; | |
int i = _counter.incAndReturn() - 1; | |
if (i >= TPG_TID_COUNT) return 0; | |
size_t id = _ids[i]; | |
pthread_setspecific(s_tkey, createVal(id)); | |
return id; | |
} | |
void ThreadIdPool::put(size_t id) | |
{ | |
pthread_setspecific(s_tkey, NULL); | |
int i = _counter.decAndReturn(); | |
assert(i >= 0); | |
_ids[i] = id; | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef __THREAD_ID_POOL_H_ | |
#define __THREAD_ID_POOL_H_ | |
#include <autil/AtomicCounter.h> | |
#include <cm_basic/common/ref_obj.h> | |
#define TPG_TID_COUNT 2048 | |
// internal use only | |
class ThreadIdPool : public cm_basic::RefObj | |
{ | |
public: | |
struct TVal { | |
ThreadIdPool *self; | |
size_t id; | |
}; | |
TVal *createVal(size_t id); | |
void destroyVal(TVal *val); | |
public: | |
ThreadIdPool(); | |
size_t get(); | |
void put(size_t id); | |
private: | |
autil::AtomicCounter _counter; | |
size_t _ids[TPG_TID_COUNT]; | |
}; | |
#endif | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "thread_mark_gc.h" | |
#include "thread_id_pool.h" | |
#include <string.h> | |
#define PENDING_POS(n) ((n - 1) % TPG_MAX_PENDING) | |
/////////////////////////////////////////////////////////////////////////////// | |
ThreadMarkGC::ThreadMarkGC() | |
{ | |
_idPool = new ThreadIdPool(); | |
_idPool->addRef(); | |
_head = _tail = 0; | |
memset(_pendings, 0, sizeof(_pendings)); | |
memset(_tmark, 0, sizeof(_tmark)); | |
memset(_tfreeds, 0, sizeof(_tfreeds)); | |
} | |
ThreadMarkGC::~ThreadMarkGC() | |
{ | |
_idPool->release(); | |
} | |
int ThreadMarkGC::release(void **out) | |
{ | |
return gc(out, _head); | |
} | |
int ThreadMarkGC::gc(void **out) | |
{ | |
uint64_t pos = findFreePos(); | |
return gc(out, pos); | |
} | |
int ThreadMarkGC::gc(void **out, uint64_t pos) | |
{ | |
int count = 0; | |
while (_tail != pos) { // != to avoid overflow | |
out[count++] = _pendings[PENDING_POS(_tail)]; | |
_tail ++; | |
} | |
return count; | |
} | |
bool ThreadMarkGC::update() | |
{ | |
size_t tid = _idPool->get(); | |
if (tid == 0 || tid > TPG_MAX_THREAD) return false; | |
// i'm ok now, post the signal to the update thread | |
_tmark[tid - 1] = _head; | |
return true; | |
} | |
bool ThreadMarkGC::deferFree(void *p) | |
{ | |
if (_head - _tail >= TPG_MAX_PENDING) { | |
// fatal error, memory leak | |
return false; | |
} | |
uint32_t n = PENDING_POS(_head); | |
_head ++; | |
_pendings[n] = p; | |
return true; | |
} | |
uint64_t ThreadMarkGC::findFreePos() | |
{ | |
int tcnt = TPG_MAX_THREAD; | |
// if all threads are inactive, return _head | |
uint64_t min = (uint64_t)-1, pos = _head; | |
bool found = false; | |
for (int i = 0; i < tcnt; ++i) { | |
uint64_t tpos = _tmark[i]; | |
if (tpos == _tfreeds[i]) continue; | |
// compare offset to avoid uint overflow | |
uint64_t offset = tpos - _tail; | |
if (offset < min) { | |
min = offset; | |
pos = tpos; | |
found = true; | |
} | |
} | |
if (found) { | |
// update all threads' last freed position | |
for (int i = 0; i < tcnt; ++i) { | |
// only update these active threads | |
if (_tmark[i] != _tfreeds[i]) | |
_tfreeds[i] = pos; | |
} | |
} | |
return pos; | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef __THREAD_MARK_GC_H_ | |
#define __THREAD_MARK_GC_H_ | |
#include <stdint.h> | |
#define TPG_MAX_THREAD 2048 | |
#define TPG_MAX_PENDING 8 | |
class ThreadIdPool; | |
class ThreadMarkGC | |
{ | |
public: | |
ThreadMarkGC(); | |
~ThreadMarkGC(); | |
int release(void **out); | |
// update thread calls, collect these safe to free memories | |
int gc(void **out); | |
// reader thread calls, to notify leaving the critical section | |
bool update(); | |
// update thread calls, to defer free a memory | |
bool deferFree(void *p); | |
private: | |
int gc(void **out, uint64_t pos); | |
uint64_t findFreePos(); | |
private: | |
ThreadIdPool *_idPool; | |
void *_pendings[TPG_MAX_PENDING]; | |
uint64_t _head, _tail; | |
uint64_t _tmark[TPG_MAX_THREAD]; | |
uint64_t _tfreeds[TPG_MAX_THREAD]; | |
}; | |
#define TPG_DO_GC(type, tpg) { \ | |
void *ps[TPG_MAX_PENDING]; \ | |
int cnt = (tpg).gc((void**)&ps); \ | |
while (cnt > 0) delete (type*)ps[--cnt]; \ | |
} | |
#define TPG_DO_RELEASE(type, tpg) { \ | |
void *ps[TPG_MAX_PENDING]; \ | |
int cnt = (tpg).release((void**)&ps); \ | |
while (cnt > 0) delete (type*)ps[--cnt]; \ | |
} | |
#endif | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment