Created
August 25, 2017 23:34
-
-
Save mangalaman93/b66ca3ff09c515023ff072870ad13ac4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <cstdio> | |
#include <cstdlib> | |
#include <cstring> | |
#include <cassert> | |
#include <condition_variable> | |
#include <mutex> | |
#include <thread> | |
#include <arpa/inet.h> | |
#include <sys/socket.h> | |
#include <netinet/in.h> | |
#include <unistd.h> | |
using namespace std; | |
#define N 4 | |
#define ntohll(x) ((1==ntohl(1)) ? (x) : ((uint64_t)ntohl((x) & 0xFFFFFFFF) << 32) | ntohl((x) >> 32)) | |
/* Specialized Queue implementation with following properties - | |
* - One thread will only call pop() | |
* - Pop will always return with a number | |
* - The other thread will only call push() | |
*/ | |
typedef struct Node { | |
int elem; | |
struct Node* next; | |
} Node; | |
class Queue { | |
Node* first = nullptr; | |
Node* last = nullptr; | |
mutex mut; | |
condition_variable cond; | |
public: | |
Queue(); | |
~Queue(); | |
int pop(); | |
void push(int elem); | |
}; | |
Queue::Queue() {} | |
Queue::~Queue() {} | |
int Queue::pop() { | |
unique_lock<std::mutex> lk(this->mut); | |
while (!this->first) { | |
this->cond.wait(lk); | |
} | |
if (first == last) { | |
last = nullptr; | |
} | |
int elem = this->first->elem; | |
Node* to_delete = this->first; | |
this->first = this->first->next; | |
delete to_delete; | |
return elem; | |
} | |
void Queue::push(int elem) { | |
Node* node = new Node(); | |
node->elem = elem; | |
node->next = nullptr; | |
unique_lock<std::mutex> lk(this->mut); | |
if (this->last) { | |
this->last->next = node; | |
this->last = node; | |
} else { | |
this->first = node; | |
this->last = node; | |
} | |
this->cond.notify_one(); | |
} | |
/* Min Heap implementation */ | |
typedef struct HeapNode { | |
uint32_t exnum; | |
uint64_t num; | |
HeapNode(int e, int n) : exnum(e), num(n) {} | |
} HeapNode; | |
class MinHeap { | |
HeapNode** data; | |
int length = 0; | |
public: | |
MinHeap(int maxelem); | |
~MinHeap(); | |
int size(); | |
HeapNode* pop(); | |
void push(HeapNode* node); | |
}; | |
MinHeap::MinHeap(int maxelem) { | |
data = new HeapNode*[maxelem]; | |
} | |
MinHeap::~MinHeap() { | |
delete[] this->data; | |
} | |
int MinHeap::size() { | |
return this->length; | |
} | |
HeapNode* MinHeap::pop() { | |
assert(this->length > 0); | |
this->length--; | |
HeapNode* root = this->data[0]; | |
this->data[0] = this->data[this->length]; | |
this->data[this->length] = nullptr; | |
int i = 0; | |
while (true) { | |
int least = i; | |
if (2*i+1 < this->length && this->data[least]->num > this->data[2*i+1]->num) { | |
least = 2*i + 1; | |
} | |
if (2*i+2 < this->length && this->data[least]->num > this->data[2*i+2]->num) { | |
least = 2*i + 2; | |
} | |
if (least == i) { | |
break; | |
} | |
HeapNode* temp = this->data[i]; | |
this->data[i] = this->data[least]; | |
this->data[least] = temp; | |
i = least; | |
} | |
return root; | |
} | |
void MinHeap::push(HeapNode* node) { | |
int i = this->length; | |
int pi = (i-1)/2; // Parent(i) = i/2 | |
this->length++; | |
while (i > 0 && this->data[pi]->num > node->num) { | |
this->data[i] = this->data[pi]; | |
i = pi; | |
pi = (i-1)/2; | |
} | |
this->data[i] = node; | |
} | |
/* Main function */ | |
Queue q[N]; | |
// Ensure that we read 12 bytes | |
HeapNode readOne(int fd) { | |
char buffer[12]; | |
int read_bytes = 0; | |
while (read_bytes < 12) { | |
int n = read(fd, (void*)(buffer+read_bytes), 12-read_bytes); | |
read_bytes += n; | |
} | |
uint32_t exnum = *(uint32_t*)(buffer); | |
uint64_t num = *(uint64_t*)(buffer+4); | |
// exnum = ntohl(exnum); | |
// num = ntohll(num); | |
return HeapNode(exnum, num); | |
} | |
void read_num(int fd) { | |
while (true) { | |
HeapNode node = readOne(fd); | |
q[node.exnum].push(node.num); | |
if (node.num == 0) { | |
close(fd); | |
break; | |
} | |
} | |
} | |
int main(int argc, char* argv[]) { | |
if (argc < 2) { | |
cout<<"Usage: "<<argv[0]<<" <Port>"<<endl; | |
exit(1); | |
} | |
// TODO: error handling | |
int port; | |
port = atoi(argv[1]); | |
int server_fd; | |
struct sockaddr_in address; | |
int opt = 1; | |
int addrlen = sizeof(address); | |
// Creating socket file descriptor | |
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) { | |
perror("socket failed"); | |
exit(EXIT_FAILURE); | |
} | |
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) { | |
perror("setsockopt failed"); | |
exit(EXIT_FAILURE); | |
} | |
address.sin_family = AF_INET; | |
address.sin_addr.s_addr = INADDR_ANY; | |
address.sin_port = htons(port); | |
if(bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) { | |
perror("bind failed"); | |
exit(EXIT_FAILURE); | |
} | |
if (listen(server_fd, N) < 0) { | |
perror("listen failed"); | |
exit(EXIT_FAILURE); | |
} | |
// first accept connections from all the N exchanges | |
thread* t = new thread[N]; | |
for (int i = 0; i < N; ++i) { | |
int new_socket; | |
if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0) { | |
perror("accept failed"); | |
exit(EXIT_FAILURE); | |
} | |
cout<<"accepted connection "<<i<<endl; | |
t[i] = thread(read_num, new_socket); | |
} | |
cout<<"accepted all connections"<<endl; | |
// sorting algorithm | |
// first, add first element to the minheap | |
MinHeap mh(N); | |
for (int i = 0; i < N; ++i) { | |
int num = q[i].pop(); | |
// there are no elements coming from this exchange | |
if (num == 0) { | |
continue; | |
} | |
HeapNode* node = new HeapNode(i, num); | |
mh.push(node); | |
} | |
cout<<"inserted first element of all queues into MinHeap"<<endl; | |
while (mh.size() > 0) { | |
HeapNode* node = mh.pop(); | |
cout<<node->num<<endl; | |
uint32_t exnum = node->exnum; | |
delete node; | |
uint64_t num = q[exnum].pop(); | |
if (num == 0) { | |
continue; | |
} | |
mh.push(new HeapNode(exnum, num)); | |
} | |
for (int i = 0; i < N; ++i) { | |
t[i].join(); | |
} | |
close(server_fd); | |
delete[] t; | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment