Last active
September 13, 2017 22:00
-
-
Save fatihky/a7053bd2a8048e9bb76c to your computer and use it in GitHub Desktop.
libmill + nanomsg
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <nanomsg/nn.h> | |
#include <nanomsg/pair.h> | |
#include <libmill.h> | |
struct nn_mill_msg { | |
int size; | |
char *msg; | |
}; | |
static int nn_mill_getfd (int s) { | |
int rc, fd; | |
size_t fdsz = sizeof fd; | |
if ( nn_getsockopt (s, NN_SOL_SOCKET, NN_RCVFD, &fd, &fdsz) != 0 ) | |
return -1; | |
/* TODO: we might as well return both FDs, NN_SNDFD too */ | |
return fd; | |
} | |
static coroutine void receiver(int s, chan inch) { | |
int fd = nn_mill_getfd(s); | |
struct nn_pollfd pfd[1]; | |
int rc; | |
for(;;) { | |
int events = fdwait(fd, FDW_IN, -1); | |
if (!(events & FDW_IN)) | |
continue; | |
pfd[0] = (struct nn_pollfd) { | |
.fd = s, | |
.events = NN_POLLIN | |
}; | |
nn_poll (pfd, 1, -1); | |
if (!(pfd[0].revents & NN_POLLIN)) | |
continue; | |
for (;;) { | |
char *buf; | |
struct nn_mill_msg msg; | |
rc = nn_recv (s, &buf, NN_MSG, NN_DONTWAIT); | |
if (rc <= 0) | |
break; | |
msg.size = rc; | |
msg.msg = buf; | |
chs(inch, struct nn_mill_msg, msg); | |
} | |
} | |
} | |
static coroutine void sender(int s, chan outch) { | |
for(;;) { | |
struct nn_mill_msg msg; | |
msg = chr(outch, struct nn_mill_msg); | |
nn_send(s, msg.msg, msg.size, NN_DONTWAIT); | |
} | |
} | |
int main(int argc, char *argv[]) { | |
// setup | |
chan rcv = chmake(struct nn_mill_msg, 0); | |
chan snd = chmake(struct nn_mill_msg, 0); | |
chan rcv2 = chmake(struct nn_mill_msg, 0); | |
chan snd2 = chmake(struct nn_mill_msg, 0); | |
int s = nn_socket(AF_SP, NN_PAIR); | |
int s2 = nn_socket(AF_SP, NN_PAIR); | |
int rc; | |
nn_bind(s, "tcp://127.0.0.1:7458"); | |
nn_connect(s2, "tcp://127.0.0.1:7458"); | |
// start | |
// receiver | |
go(receiver(s, rcv)); | |
go(sender(s2, snd2)); | |
// use | |
struct nn_mill_msg msg; | |
while (1) { | |
choose { | |
in(rcv, struct nn_mill_msg, msg): { | |
printf("received message: %.*s\n", msg.size, msg.msg); | |
} | |
deadline(now() + 1000): { | |
msg.msg = "test"; | |
msg.size = 4; | |
chs(snd2, struct nn_mill_msg, msg); | |
} | |
end | |
} | |
} | |
// stop | |
// clean exit | |
return 0; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include <stdbool.h> | |
#include <nanomsg/nn.h> | |
#include <nanomsg/pair.h> | |
#include <libmill.h> | |
struct nn_mill_msg { | |
int size; | |
char *msg; | |
}; | |
static int nn_mill_getfd (int s) { | |
int rc, fd; | |
size_t fdsz = sizeof fd; | |
if ( nn_getsockopt (s, NN_SOL_SOCKET, NN_RCVFD, &fd, &fdsz) != 0 ) | |
return -1; | |
/* TODO: we might as well return both FDs, NN_SNDFD too */ | |
return fd; | |
} | |
static coroutine void receiver(int s, chan inch, int *stopfdwait) { | |
int fd = nn_mill_getfd(s); | |
struct nn_pollfd pfd[1]; | |
int rc; | |
for(;;) { | |
int events = fdwait(fd, FDW_IN, now() + 200); | |
if (*stopfdwait == true) { | |
printf("*stopfdwait == true\n"); | |
break; | |
} | |
if (!(events & FDW_IN)) | |
continue; | |
pfd[0] = (struct nn_pollfd) { | |
.fd = s, | |
.events = NN_POLLIN | |
}; | |
nn_poll (pfd, 1, -1); | |
if (!(pfd[0].revents & NN_POLLIN)) | |
continue; | |
for (;;) { | |
char *buf; | |
struct nn_mill_msg msg; | |
rc = nn_recv (s, &buf, NN_MSG, NN_DONTWAIT); | |
if (rc <= 0) | |
break; | |
msg.size = rc; | |
msg.msg = buf; | |
chs(inch, struct nn_mill_msg, msg); | |
} | |
} | |
printf("int stopfdwait: %d\n", *stopfdwait); | |
exit(0); | |
} | |
static coroutine void sender(int s, chan outch) { | |
for(;;) { | |
struct nn_mill_msg msg; | |
msg = chr(outch, struct nn_mill_msg); | |
nn_send(s, msg.msg, msg.size, NN_DONTWAIT); | |
} | |
} | |
int main(int argc, char *argv[]) { | |
// setup | |
chan rcv = chmake(struct nn_mill_msg, 0); | |
chan snd = chmake(struct nn_mill_msg, 0); | |
chan rcv2 = chmake(struct nn_mill_msg, 0); | |
chan snd2 = chmake(struct nn_mill_msg, 0); | |
int s = nn_socket(AF_SP, NN_PAIR); | |
int s2 = nn_socket(AF_SP, NN_PAIR); | |
int rc; | |
nn_bind(s, "tcp://127.0.0.1:7458"); | |
nn_connect(s2, "tcp://127.0.0.1:7458"); | |
// start | |
int stopfdwait = false; | |
// receiver | |
go(receiver(s, rcv, &stopfdwait)); | |
go(sender(s2, snd2)); | |
// use | |
struct nn_mill_msg msg; | |
int cnt = 0; | |
while (1) { | |
choose { | |
in(rcv, struct nn_mill_msg, msg): { | |
printf("received message: %.*s\n", msg.size, msg.msg); | |
cnt++; | |
if (cnt == 5) { | |
stopfdwait = true; | |
} | |
} | |
deadline(now() + 1000): { | |
msg.msg = "test"; | |
msg.size = 4; | |
chs(snd2, struct nn_mill_msg, msg); | |
} | |
end | |
} | |
} | |
// stop | |
// clean exit | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment