Skip to content

Instantly share code, notes, and snippets.

@ivcn
Created May 9, 2017 22:07
Show Gist options
  • Select an option

  • Save ivcn/d34edcdcaec91873d26f67e685cec045 to your computer and use it in GitHub Desktop.

Select an option

Save ivcn/d34edcdcaec91873d26f67e685cec045 to your computer and use it in GitHub Desktop.
An examle of service providing pre-initialization, storing and concurrent access to some resources.
#ifndef _RESOURCE_H_
#define _RESOURCE_H_
#include <thread>
class Resource {
public:
static Resource *make(int partNumber) {
int timeToSleep = partNumber;
//sleep(timeToSleep);
std::this_thread::sleep_for(std::chrono::seconds(timeToSleep));
return new Resource(partNumber);
}
void dismiss() {
int timeToSleep = partNumber;
//sleep(timeToSleep);
std::this_thread::sleep_for(std::chrono::seconds(timeToSleep));
delete this;
}
private:
int partNumber;
Resource(int partNumber) : partNumber(partNumber) {}
};
#endif _RESOURCE_H_
#include "supplier.h"
// Creates new task for allocating certain amount of the resources
// Function can be blocked in case when task queue is full
// i.e. clients sends requests very rapidly.
// This can be handled by properly selection of task queue size
void Supplier::stock(int partNumber, int amountToStock) {
Logger::log("stock request. resource ", partNumber, " amount ", amountToStock);
threadPool.addTask([this, partNumber, amountToStock]() {
if (resources.count(partNumber) == 0) {
resources[partNumber] = std::make_unique<Supplier::ResourceStock>();
for (int i = 0; i < amountToStock; ++i) {
resources[partNumber]->push(Resource::make(partNumber));
Logger::log("resource ", partNumber, " allocated");
}
}
});
}
// Creates new task for destroying all
// resources with particular partNumber. Function can be blocked in case when
// task queue is full i.e. clients sends requests very rapidly.
void Supplier::destock(int partNumber) {
Logger::log("destock request for resource ", partNumber);
if (resources.count(partNumber) == 0)
return;
else {
threadPool.addTask([this, partNumber]() {
Resource* r = nullptr;
while ((r = resources[partNumber]->pop()) != nullptr) {
r->dismiss();
Logger::log("resource ", partNumber, " released");
}
resources[partNumber].reset();
});
}
}
// Returns pre-initialized resource if it present in storage
// and sends task for allocating new one to replace the taken resource.
// If there is no such pre-initialized resource in stock it will be allocated.
Resource* Supplier::request(int partNumber) {
Logger::log("request for resource ", partNumber);
// check if there was request for pre-initialize such resource
if (resources.count(partNumber) > 0) {
auto r = resources[partNumber]->pop();
// initialize new resource instead of taken one
threadPool.addTask([this, partNumber]() {
auto r = Resource::make(partNumber);
resources[partNumber]->push(r);
});
if (r != nullptr)
return r;
}
Logger::log("Resource ", partNumber, " not found in stock. Making");
return Resource::make(partNumber);
}
int main()
{
Supplier s(5);
auto f1 = std::async(&Supplier::request, &s, 2);
for (int i = 0; i < 10; ++i) {
s.stock(i, 5);
}
s.stock(2, 5);
auto f3 = std::async(&Supplier::stock, &s, 3, 5);
auto f4 = std::async(&Supplier::request, &s, 2);
auto f5 = std::async(&Supplier::destock, &s, 2);
auto f6 = std::async(&Supplier::request, &s, 2);
auto f7 = std::async(&Supplier::stock, &s, 100, 2);
return 0;
}
#ifndef _SUPPLIER_H_
#define _SUPPLIER_H_
#include "resource.h"
#include <functional>
#include <future>
#include <iostream>
#include <memory>
#include <thread>
#include <unordered_map>
#include <vector>
class Logger {
public:
template<typename... Args>
static void log(Args&&... args) {
using dummy = int[];
std::lock_guard<std::mutex> lk(m);
(void)dummy {
0, (void(std::cout << std::forward<Args>(args)), 0)...
};
std::cout << '\n';
}
private:
static std::mutex m;
};
std::mutex Logger::m;
// Simple circular buffer impl for using in thread pool task queue
template<typename ElementType>
class Queue {
public:
Queue(size_t size) :
buffer(size),
front(0),
end(0),
empty(true) {}
Queue(Queue&& other) = default;
Queue& operator=(Queue&& other) = default;
Queue(const Queue& other) = delete;
Queue& operator=(const Queue& other) = delete;
void push(ElementType newElement) {
std::unique_lock<std::mutex> lk(m);
// wait until there will be free slot
cv.wait(lk, [this]() { return !isFull(); });
buffer[end] = std::move(newElement);
if (end == buffer.size() - 1)
end = 0;
else
++end;
if (empty)
empty = false;
}
ElementType pop() {
std::lock_guard<std::mutex> lk(m);
if (isEmpty())
return ElementType();
auto v = std::move(buffer[front]);
if (front == buffer.size() - 1)
front = 0;
else
++front;
if (front == end)
empty = true;
// need to notify other thread that it can push
// new element
cv.notify_one();
return v;
}
inline bool isEmpty() {
return empty;
}
inline bool isFull() {
return !empty && front == end;
}
std::mutex m;
std::condition_variable cv;
size_t front;
size_t end;
bool empty;
std::vector<ElementType> buffer;
};
class ThreadPool {
public:
ThreadPool(size_t numThreads, size_t taskQueueSize) :
workers(std::vector<std::thread>(numThreads)),
taskQueue(taskQueueSize),
stopFlag(false) {
}
void run() {
for (auto& w : workers) {
w = std::thread([this]() {
while (true) {
if (stopFlag)
//stop pool. All tasks left in queue
// will not be completed.
break;
auto task = taskQueue.pop();
if (task)
task();
}
});
}
}
void stop() {
stopFlag = true;
}
void addTask(std::function<void()> t) {
taskQueue.push(t);
}
~ThreadPool() {
// wait for all threads
for (auto& w : workers)
w.join();
}
private:
std::vector < std::thread > workers;
Queue<std::function<void()>> taskQueue;
bool stopFlag;
std::mutex m;
};
class Supplier {
private:
// class that manages storage
// for the particular pre-initialized resource
struct ResourceStock {
std::vector<Resource*> storage;
std::mutex m;
void push(Resource* r) {
std::lock_guard<std::mutex> lk(m);
storage.push_back(r);
}
Resource* pop() {
std::lock_guard<std::mutex> lk(m);
if (storage.empty())
return nullptr;
Resource* r = storage.back();
storage.pop_back();
return r;
}
~ResourceStock() {
Logger::log("Dismissing unclaimed resource");
for (auto r : storage) {
r->dismiss();
}
}
};
// max number of threads in pool
int maxConcurrentOps;
// descriptor for thread pool's thread
std::future<void> serviceThread;
// map from partNumber to the resource storage object
std::unordered_map<int, std::unique_ptr<ResourceStock>> resources;
ThreadPool threadPool;
public:
Supplier(int maxConcurrentOps, size_t taskQueueSize = 64) :
maxConcurrentOps(maxConcurrentOps),
threadPool(maxConcurrentOps, taskQueueSize) {
// create asynchronous task for thread pool
serviceThread = std::async(std::launch::async, [&]() {
threadPool.run();
Logger::log("Service up");
});
}
virtual ~Supplier() {
threadPool.stop();
serviceThread.get();
Logger::log("Service down. Waiting for remaining jobs");
// Waiting for workers and destroying unclaimed objects
// left in storage
}
/**
* Asks supplier to set up a stock of products for provided
partNumber.
* Method returns immediately and doesn't wait until operation
completes.
*
* @param partNumber
* @param amountToStock
*/
virtual void stock(int partNumber, int amountToStock);
/**
* Ask supplier to terminate stock of products with this partNumber.
* Method returns immediately and doesn't wait until operation
completes.
*
* @param partNumber
*/
virtual void destock(int partNumber);
/**
* If the requested partNumber was stocked, method returns an instance
of a Resource
* from the stock. If the requested partNumber was not stocked, method
produces it
* and returns to caller.
*
* @param partNumber
* @return
*/
virtual Resource *request(int partNumber);
};
#endif _SUPPLIER_H_
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment