Created
May 9, 2017 22:07
-
-
Save ivcn/d34edcdcaec91873d26f67e685cec045 to your computer and use it in GitHub Desktop.
An examle of service providing pre-initialization, storing and concurrent access to some resources.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #ifndef _RESOURCE_H_ | |
| #define _RESOURCE_H_ | |
| #include <thread> | |
| class Resource { | |
| public: | |
| static Resource *make(int partNumber) { | |
| int timeToSleep = partNumber; | |
| //sleep(timeToSleep); | |
| std::this_thread::sleep_for(std::chrono::seconds(timeToSleep)); | |
| return new Resource(partNumber); | |
| } | |
| void dismiss() { | |
| int timeToSleep = partNumber; | |
| //sleep(timeToSleep); | |
| std::this_thread::sleep_for(std::chrono::seconds(timeToSleep)); | |
| delete this; | |
| } | |
| private: | |
| int partNumber; | |
| Resource(int partNumber) : partNumber(partNumber) {} | |
| }; | |
| #endif _RESOURCE_H_ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #include "supplier.h" | |
| // Creates new task for allocating certain amount of the resources | |
| // Function can be blocked in case when task queue is full | |
| // i.e. clients sends requests very rapidly. | |
| // This can be handled by properly selection of task queue size | |
| void Supplier::stock(int partNumber, int amountToStock) { | |
| Logger::log("stock request. resource ", partNumber, " amount ", amountToStock); | |
| threadPool.addTask([this, partNumber, amountToStock]() { | |
| if (resources.count(partNumber) == 0) { | |
| resources[partNumber] = std::make_unique<Supplier::ResourceStock>(); | |
| for (int i = 0; i < amountToStock; ++i) { | |
| resources[partNumber]->push(Resource::make(partNumber)); | |
| Logger::log("resource ", partNumber, " allocated"); | |
| } | |
| } | |
| }); | |
| } | |
| // Creates new task for destroying all | |
| // resources with particular partNumber. Function can be blocked in case when | |
| // task queue is full i.e. clients sends requests very rapidly. | |
| void Supplier::destock(int partNumber) { | |
| Logger::log("destock request for resource ", partNumber); | |
| if (resources.count(partNumber) == 0) | |
| return; | |
| else { | |
| threadPool.addTask([this, partNumber]() { | |
| Resource* r = nullptr; | |
| while ((r = resources[partNumber]->pop()) != nullptr) { | |
| r->dismiss(); | |
| Logger::log("resource ", partNumber, " released"); | |
| } | |
| resources[partNumber].reset(); | |
| }); | |
| } | |
| } | |
| // Returns pre-initialized resource if it present in storage | |
| // and sends task for allocating new one to replace the taken resource. | |
| // If there is no such pre-initialized resource in stock it will be allocated. | |
| Resource* Supplier::request(int partNumber) { | |
| Logger::log("request for resource ", partNumber); | |
| // check if there was request for pre-initialize such resource | |
| if (resources.count(partNumber) > 0) { | |
| auto r = resources[partNumber]->pop(); | |
| // initialize new resource instead of taken one | |
| threadPool.addTask([this, partNumber]() { | |
| auto r = Resource::make(partNumber); | |
| resources[partNumber]->push(r); | |
| }); | |
| if (r != nullptr) | |
| return r; | |
| } | |
| Logger::log("Resource ", partNumber, " not found in stock. Making"); | |
| return Resource::make(partNumber); | |
| } | |
| int main() | |
| { | |
| Supplier s(5); | |
| auto f1 = std::async(&Supplier::request, &s, 2); | |
| for (int i = 0; i < 10; ++i) { | |
| s.stock(i, 5); | |
| } | |
| s.stock(2, 5); | |
| auto f3 = std::async(&Supplier::stock, &s, 3, 5); | |
| auto f4 = std::async(&Supplier::request, &s, 2); | |
| auto f5 = std::async(&Supplier::destock, &s, 2); | |
| auto f6 = std::async(&Supplier::request, &s, 2); | |
| auto f7 = std::async(&Supplier::stock, &s, 100, 2); | |
| return 0; | |
| } | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #ifndef _SUPPLIER_H_ | |
| #define _SUPPLIER_H_ | |
| #include "resource.h" | |
| #include <functional> | |
| #include <future> | |
| #include <iostream> | |
| #include <memory> | |
| #include <thread> | |
| #include <unordered_map> | |
| #include <vector> | |
| class Logger { | |
| public: | |
| template<typename... Args> | |
| static void log(Args&&... args) { | |
| using dummy = int[]; | |
| std::lock_guard<std::mutex> lk(m); | |
| (void)dummy { | |
| 0, (void(std::cout << std::forward<Args>(args)), 0)... | |
| }; | |
| std::cout << '\n'; | |
| } | |
| private: | |
| static std::mutex m; | |
| }; | |
| std::mutex Logger::m; | |
| // Simple circular buffer impl for using in thread pool task queue | |
| template<typename ElementType> | |
| class Queue { | |
| public: | |
| Queue(size_t size) : | |
| buffer(size), | |
| front(0), | |
| end(0), | |
| empty(true) {} | |
| Queue(Queue&& other) = default; | |
| Queue& operator=(Queue&& other) = default; | |
| Queue(const Queue& other) = delete; | |
| Queue& operator=(const Queue& other) = delete; | |
| void push(ElementType newElement) { | |
| std::unique_lock<std::mutex> lk(m); | |
| // wait until there will be free slot | |
| cv.wait(lk, [this]() { return !isFull(); }); | |
| buffer[end] = std::move(newElement); | |
| if (end == buffer.size() - 1) | |
| end = 0; | |
| else | |
| ++end; | |
| if (empty) | |
| empty = false; | |
| } | |
| ElementType pop() { | |
| std::lock_guard<std::mutex> lk(m); | |
| if (isEmpty()) | |
| return ElementType(); | |
| auto v = std::move(buffer[front]); | |
| if (front == buffer.size() - 1) | |
| front = 0; | |
| else | |
| ++front; | |
| if (front == end) | |
| empty = true; | |
| // need to notify other thread that it can push | |
| // new element | |
| cv.notify_one(); | |
| return v; | |
| } | |
| inline bool isEmpty() { | |
| return empty; | |
| } | |
| inline bool isFull() { | |
| return !empty && front == end; | |
| } | |
| std::mutex m; | |
| std::condition_variable cv; | |
| size_t front; | |
| size_t end; | |
| bool empty; | |
| std::vector<ElementType> buffer; | |
| }; | |
| class ThreadPool { | |
| public: | |
| ThreadPool(size_t numThreads, size_t taskQueueSize) : | |
| workers(std::vector<std::thread>(numThreads)), | |
| taskQueue(taskQueueSize), | |
| stopFlag(false) { | |
| } | |
| void run() { | |
| for (auto& w : workers) { | |
| w = std::thread([this]() { | |
| while (true) { | |
| if (stopFlag) | |
| //stop pool. All tasks left in queue | |
| // will not be completed. | |
| break; | |
| auto task = taskQueue.pop(); | |
| if (task) | |
| task(); | |
| } | |
| }); | |
| } | |
| } | |
| void stop() { | |
| stopFlag = true; | |
| } | |
| void addTask(std::function<void()> t) { | |
| taskQueue.push(t); | |
| } | |
| ~ThreadPool() { | |
| // wait for all threads | |
| for (auto& w : workers) | |
| w.join(); | |
| } | |
| private: | |
| std::vector < std::thread > workers; | |
| Queue<std::function<void()>> taskQueue; | |
| bool stopFlag; | |
| std::mutex m; | |
| }; | |
| class Supplier { | |
| private: | |
| // class that manages storage | |
| // for the particular pre-initialized resource | |
| struct ResourceStock { | |
| std::vector<Resource*> storage; | |
| std::mutex m; | |
| void push(Resource* r) { | |
| std::lock_guard<std::mutex> lk(m); | |
| storage.push_back(r); | |
| } | |
| Resource* pop() { | |
| std::lock_guard<std::mutex> lk(m); | |
| if (storage.empty()) | |
| return nullptr; | |
| Resource* r = storage.back(); | |
| storage.pop_back(); | |
| return r; | |
| } | |
| ~ResourceStock() { | |
| Logger::log("Dismissing unclaimed resource"); | |
| for (auto r : storage) { | |
| r->dismiss(); | |
| } | |
| } | |
| }; | |
| // max number of threads in pool | |
| int maxConcurrentOps; | |
| // descriptor for thread pool's thread | |
| std::future<void> serviceThread; | |
| // map from partNumber to the resource storage object | |
| std::unordered_map<int, std::unique_ptr<ResourceStock>> resources; | |
| ThreadPool threadPool; | |
| public: | |
| Supplier(int maxConcurrentOps, size_t taskQueueSize = 64) : | |
| maxConcurrentOps(maxConcurrentOps), | |
| threadPool(maxConcurrentOps, taskQueueSize) { | |
| // create asynchronous task for thread pool | |
| serviceThread = std::async(std::launch::async, [&]() { | |
| threadPool.run(); | |
| Logger::log("Service up"); | |
| }); | |
| } | |
| virtual ~Supplier() { | |
| threadPool.stop(); | |
| serviceThread.get(); | |
| Logger::log("Service down. Waiting for remaining jobs"); | |
| // Waiting for workers and destroying unclaimed objects | |
| // left in storage | |
| } | |
| /** | |
| * Asks supplier to set up a stock of products for provided | |
| partNumber. | |
| * Method returns immediately and doesn't wait until operation | |
| completes. | |
| * | |
| * @param partNumber | |
| * @param amountToStock | |
| */ | |
| virtual void stock(int partNumber, int amountToStock); | |
| /** | |
| * Ask supplier to terminate stock of products with this partNumber. | |
| * Method returns immediately and doesn't wait until operation | |
| completes. | |
| * | |
| * @param partNumber | |
| */ | |
| virtual void destock(int partNumber); | |
| /** | |
| * If the requested partNumber was stocked, method returns an instance | |
| of a Resource | |
| * from the stock. If the requested partNumber was not stocked, method | |
| produces it | |
| * and returns to caller. | |
| * | |
| * @param partNumber | |
| * @return | |
| */ | |
| virtual Resource *request(int partNumber); | |
| }; | |
| #endif _SUPPLIER_H_ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment