Skip to content

Instantly share code, notes, and snippets.

@Liam0205
Last active January 16, 2018 09:24
Show Gist options
  • Save Liam0205/b45ed0344f79ed8ab6d6fafff7eaf68e to your computer and use it in GitHub Desktop.
Save Liam0205/b45ed0344f79ed8ab6d6fafff7eaf68e to your computer and use it in GitHub Desktop.
demo for a C style future/promise multitask frame
#pragma once
#ifndef CFUTURE_H_
#define CFUTURE_H_
#include <stdio.h>
#include <pthread.h>
#ifdef DEBUG
#include <random>
#include <limits>
#endif // DEBUG
#include "./type.h"
namespace liam {
namespace cstyle {
#ifdef DEBUG
#define msg(format, ...) \
printf("T(%d)|" format "\n", static_cast<int>(pthread_self()), ##__VA_ARGS__)
#else
#define msg(format, ...)
#endif
struct future {
pthread_t thread;
pthread_attr_t attr;
thread_routine_t func;
};
struct future_arg {
thread_routine_t func;
void* arg;
};
struct promise {
int result;
pthread_mutex_t mutex;
pthread_cond_t cond;
bool done;
#ifdef DEBUG
int id;
#endif // DEBUG
};
inline future* future_create(thread_routine_t start_routine) {
future* f = reinterpret_cast<future*>(malloc(sizeof(future)));
pthread_attr_init(&f->attr);
pthread_attr_setdetachstate(&f->attr, PTHREAD_CREATE_JOINABLE);
f->func = start_routine;
return f;
}
inline void* future_func_wrapper(void* arg) {
//msg("WRAP");
future_arg* f = (future_arg*) arg;
void* res = f->func(f->arg);
free(f);
//msg("WRAPPED");
pthread_exit(res);
return res;
}
inline void future_start(future* f, void* arg) {
future_arg* farg = reinterpret_cast<future_arg*>(malloc(sizeof(future_arg)));
farg->func = f->func;
farg->arg = arg;
pthread_create(&f->thread, &f->attr, future_func_wrapper, farg);
}
inline void future_stop(future* f) {
pthread_cancel(f->thread);
}
inline void future_close(future* f) {
void* status;
// int rc = pthread_join(f->thread, &status);
pthread_join(f->thread, &status);
pthread_attr_destroy(&f->attr);
free(f);
}
inline promise* promise_create() {
promise* p = reinterpret_cast<promise*>(malloc(sizeof(promise)));
pthread_mutex_init(&p->mutex, NULL);
pthread_cond_init(&p->cond, NULL);
#ifdef DEBUG
std::random_device rd;
std::mt19937 urbg(rd());
std::uniform_int_distribution<> rint(std::numeric_limits<int>::min(),
std::numeric_limits<int>::max());
p->id = rint(urbg);
#endif // DEBUG
msg("P(%d) created", p->id);
return p;
}
inline void promise_set(promise* p, int res) {
msg("P(%d) set LOCK", p->id);
pthread_mutex_lock(&p->mutex);
p->result = res;
p->done = true;
msg("P(%d) set UNLOCK", p->id);
pthread_mutex_unlock(&p->mutex);
pthread_cond_signal(&p->cond);
}
inline int promise_get(promise* p) {
msg("P(%d) get LOCK", p->id);
pthread_mutex_lock(&p->mutex);
while(!p->done) {
msg("P(%d) get WAIT", p->id);
pthread_cond_wait(&p->cond, &p->mutex);
}
msg("P(%d) get UNLOCK", p->id);
pthread_mutex_unlock(&p->mutex);
return p->result;
}
inline bool promise_done(promise* p) {
pthread_mutex_lock(&p->mutex);
bool done = p->done;
pthread_mutex_unlock(&p->mutex);
return done;
}
inline void promise_close(promise* p) {
pthread_mutex_destroy(&p->mutex);
pthread_cond_destroy(&p->cond);
free(p);
}
} // namespace cstyle
} // namespace liam
#endif // CFUTURE_H_
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <queue>
#include <random>
#include "cfuture.h"
namespace {
std::random_device rd;
std::mt19937 urbg(rd());
std::uniform_int_distribution<> rdigit(0, 9);
const constexpr size_t kLoop = 10;
} // namespace
using namespace liam::cstyle;
struct arguments {
promise* arg_promise = nullptr;
int foo = 0;
int bar = 0;
int baz = 0;
arguments(promise* p_, const int foo_, const int bar_, const int baz_) :
arg_promise{p_}, foo{foo_}, bar{bar_}, baz{baz_} {}
};
int do_func(const int foo, const int bar, const int baz) {
printf("\ttask running...\n");
sleep(rdigit(urbg));
return foo + bar + baz;
}
void* func_wrapper(void* arg) {
arguments* args = reinterpret_cast<arguments*>(arg);
promise* p = args->arg_promise;
int res = do_func(args->foo, args->bar, args->baz);
promise_set(p, res);
pthread_exit(nullptr);
}
int main() {
printf("main thread\n");
std::queue<future*> fqueue;
std::queue<promise*> pqueue;
std::queue<arguments*> aqueue;
for (size_t i = 0; i != kLoop; ++i) {
future* f = future_create(func_wrapper);
promise* p = promise_create();
arguments* a = new arguments{p, rdigit(urbg), rdigit(urbg), rdigit(urbg)};
fqueue.push(f);
pqueue.push(p);
aqueue.push(a);
future_start(f, reinterpret_cast<void*>(a));
}
for (size_t i = 0; i != kLoop; ++i) {
future* f = fqueue.front();
promise* p = pqueue.front();
arguments* a = aqueue.front();
printf("got result from future: %d\n", promise_get(p));
future_close(f);
promise_close(p);
delete a;
aqueue.pop();
pqueue.pop();
fqueue.pop();
}
return 0;
}
#pragma once
#ifndef TYPE_H_
#define TYPE_H_
#include <type_traits>
namespace liam {
inline namespace type {
template <typename T>
using result_of_t = typename std::result_of<T>::type;
template <typename T>
using decay_t = typename std::decay<T>::type;
namespace funcptr_smp {
using thread_routine_t = void* (*)(void*);
} // namespace funcptr_smp
inline namespace funcptr {
template <typename ReturnT, typename... ArgsT>
struct function_pointer {
using type = ReturnT (*)(ArgsT...);
};
template <typename ReturnT, typename... ArgsT>
using function_pointer_t = typename function_pointer<ReturnT, ArgsT...>::type;
using thread_routine_t = typename function_pointer<void*, void*>::type;
} // namespace funcptr (inlined)
} // namespace type (inlined)
} // namespace liam
#endif // TYPE_H_
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment