Skip to content

Instantly share code, notes, and snippets.

@gruzovator
Last active August 29, 2015 14:15
Show Gist options
  • Save gruzovator/6382a74615dc404e6992 to your computer and use it in GitHub Desktop.
Save gruzovator/6382a74615dc404e6992 to your computer and use it in GitHub Desktop.
testing parallel code execution
// build: g++ -std=c++11 futures_queue_test.cpp -lpthread
// usage (single-threaded variant): ./a.out 10 single
// usage (multi-threaded variant): ./a.out 10 parallel
#include <iostream>
#include <sstream>
#include <functional>
#include <algorithm>
#include <future>
#include <chrono>
#include <queue>
using namespace std;
//------------------------------------------------------------------------------
// Test Geocoder (converts address to int)
//------------------------------------------------------------------------------
using Coords = int;
Coords resolve(const string& address)
{
this_thread::sleep_for(chrono::milliseconds(1000));
if(hash<string>()(address)%10==0) // throw exception sometime
{
throw std::runtime_error(string("Can't resolve address ")+address);
}
return Coords{stoi(address)};
}
//------------------------------------------------------------------------------
// Main
//------------------------------------------------------------------------------
int main(int argc, char const *argv[])
try
{
// srand(time(nullptr));
if(argc!=3)
{
cout << "Usage: " << argv[0] << " <addresses number> single|parallel" << endl;
return 1;
}
const auto addressesNumber = stoi(argv[1]);
const auto resolvingType = string(argv[2]);
//---------------------------------------------
// create addresses as vector of random strings
//---------------------------------------------
vector<string> addresses(addressesNumber);
auto makeRandomAddress = []{
return string(to_string(rand()));
};
generate(addresses.begin(), addresses.end(), makeRandomAddress);
//----------------------------------------------
// resolving (different styles)
//----------------------------------------------
if(resolvingType != "parallel")
{
cout << "Doing single-threaded address resolution..." << endl;
for(const auto& address: addresses)
{
cout << address << " -> ";
try
{
cout << resolve(address);
}
catch(const runtime_error&)
{
cout << "ERROR";
}
cout << endl;
}
}
else
{
clog << "Doing parallel address resolution..." << endl;
const int POOL_SIZE = 10;
std::queue<std::future<Coords>> futuresQueue;
auto currentAddressIdx = 0;
auto endIdx = addresses.size();
std::string* addrPtrToGetResult = nullptr; // pointer to address, if not null - we have to get result from queue
while(true)
{
// This is a queue-related code
if(currentAddressIdx!=endIdx) // push task to queue, if queue is full set addrPtrToGetResult
{
futuresQueue.emplace(async(launch::async, resolve, addresses[currentAddressIdx]));
++currentAddressIdx;
if(futuresQueue.size()==POOL_SIZE)
addrPtrToGetResult = &addresses[currentAddressIdx-POOL_SIZE];
}
else // there is no more tasks, but queue may still have results to process
if(!futuresQueue.empty())
{
auto qSize = futuresQueue.size();
addrPtrToGetResult = &addresses[currentAddressIdx-qSize];
}
else // there is no more tasks and futures queue is empty - end
break;
// This is a result processing code (same as in single-threaded verison)
if(addrPtrToGetResult) // we have to get result from queue
{
cout << *addrPtrToGetResult << " -> ";
try
{
cout << futuresQueue.front().get();
}
catch(const runtime_error&)
{
cout << "ERROR";
}
cout << endl;
futuresQueue.pop();
addrPtrToGetResult = nullptr;
}
} //while
}
return 0;
}
catch(const exception& ex) {
cerr << "Exception: " << ex.what() << endl;
return 1;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment