Skip to content

Instantly share code, notes, and snippets.

@kzk
Created December 25, 2010 16:51
Show Gist options
  • Save kzk/754942 to your computer and use it in GitHub Desktop.
Save kzk/754942 to your computer and use it in GitHub Desktop.
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <assert.h>
#include <pthread.h>
#include <bmi.h>
#define MAX_IDLE_TIME 10
int
bmi_comm_send(BMI_addr_t peer_addr, void *buffer, bmi_size_t buflen,
bmi_msg_tag_t tag, bmi_context_id context) {
bmi_op_id_t op_id;
int ret, outcount;
bmi_size_t actual_size;
bmi_error_code_t error_code;
/* Post the BMI send request and wait for its completion */
ret = BMI_post_send(&op_id, peer_addr, buffer, buflen, BMI_PRE_ALLOC, tag,
NULL, context, NULL);
if (ret < 0) {
fprintf(stderr, "bmi_comm_send: BMI_post_send() failed.\n");
exit(1);
} else if (ret == 0) {
do {
ret = BMI_test(op_id, &outcount, &error_code, &actual_size, NULL,
MAX_IDLE_TIME, context);
} while (ret == 0 && outcount == 0);
if (ret < 0 || error_code != 0) {
fprintf(stderr, "bmi_comm_send: Data send failed.\n");
exit(1);
}
if (actual_size != buflen) {
fprintf(stderr, "bmi_comm_send: Expected %ld but received %lu\n",
buflen, actual_size);
exit(1);
}
}
return 0;
}
/*
* bmi_comm_recv
* Synchronous call for receiving messages using BMI.
*/
int bmi_comm_recv(BMI_addr_t peer_addr, void *buffer, bmi_size_t buflen,
bmi_msg_tag_t tag, bmi_context_id context) {
bmi_op_id_t op_id;
int ret, outcount;
bmi_size_t actual_size;
bmi_error_code_t error_code;
/* Post the BMI recv request and wait for its completion */
ret = BMI_post_recv(&op_id, peer_addr, buffer, buflen, &actual_size,
BMI_PRE_ALLOC, tag, NULL, context, NULL);
if (ret < 0) {
fprintf(stderr, "bmi_comm_recv: BMI_post_recv() failed.\n");
exit(1);
} else if (ret == 0) {
do {
ret = BMI_test(op_id, &outcount, &error_code, &actual_size, NULL,
MAX_IDLE_TIME, context);
} while (ret == 0 && outcount == 0);
if (ret < 0 || error_code != 0) {
fprintf(stderr, "bmi_comm_recv: Data receive failed.\n");
exit(1);
}
}
return 0;
}
/*
* bmi_comm_sendu
* Synchronous call for sending unexpected messages using BMI.
*/
int bmi_comm_sendu(BMI_addr_t peer_addr, void *buffer, bmi_size_t buflen,
bmi_msg_tag_t tag, bmi_context_id context) {
bmi_op_id_t op_id;
int ret, outcount;
bmi_size_t actual_size;
bmi_error_code_t error_code;
/* Post the BMI unexpected send request and wait for its completion */
ret = BMI_post_sendunexpected(&op_id, peer_addr, buffer, buflen,
BMI_PRE_ALLOC, tag, NULL, context, NULL);
if (ret < 0) {
fprintf(stderr, "bmi_comm_sendu: BMI_post_sendunexpected() failed.\n");
exit(1);
} else if (ret == 0) {
do {
ret = BMI_test(op_id, &outcount, &error_code, &actual_size, NULL,
MAX_IDLE_TIME, context);
} while (ret == 0 && outcount == 0);
if (ret < 0 || error_code != 0) {
fprintf(stderr, "bmi_comm_sendu: Data send failed.\n");
exit(1);
}
if (actual_size != buflen) {
fprintf(stderr, "bmi_comm_sendu: Expected %ld but received %lu\n",
buflen, actual_size);
exit(1);
}
}
return 0;
}
/*
* bmi_comm_recvu
* Synchronous call for receiving unexpected messages using BMI.
*/
int bmi_comm_recvu(BMI_addr_t *peer_addr, void **recvbuf,
bmi_size_t *recvbuflen, bmi_msg_tag_t *tag) {
int ret, outcount = 0;
struct BMI_unexpected_info request_info;
/* Wait for an initial request from client */
do {
ret = BMI_testunexpected(1, &outcount, &request_info, MAX_IDLE_TIME);
} while (ret == 0 && outcount == 0);
if (ret < 0) {
fprintf(stderr, "bmi_comm_recvu: Request recv failure (bad state).\n");
fprintf(stderr, "bmi_comm_recvu: BMI_testunexpected failed.\n");
exit(1);
}
if (request_info.error_code != 0) {
fprintf(stderr, "bmi_comm_recvu: Request recv failure (bad state).\n");
exit(1);
}
*peer_addr = request_info.addr;
*recvbuflen = request_info.size;
*tag = request_info.tag;
*recvbuf = BMI_memalloc(*peer_addr, *recvbuflen, BMI_RECV);
if (!recvbuf) {
fprintf(stderr, "bmi_comm_recvu: BMI_memalloc() failed.\n");
exit(1);
}
memcpy(*recvbuf, request_info.buffer, *recvbuflen);
BMI_unexpected_free(*peer_addr, request_info.buffer);
return 0;
}
void do_server()
{
int ret, check = 0;
const char *hostid = "tcp://localhost:3381";
const char *network = "bmi_tcp";
bmi_context_id context;
/* Initialize BMI */
ret = BMI_initialize(network, hostid, BMI_INIT_SERVER);
assert(ret == 0);
BMI_set_info(0, BMI_TCP_CHECK_UNEXPECTED, &check);
/* Create a new BMI context */
ret = BMI_open_context(&context);
assert(ret == 0);
/* Loop forever waiting for requests from clients */
while (1) {
void *recvbuf;
bmi_msg_tag_t tag;
BMI_addr_t peer_addr;
bmi_size_t recvbuflen;
/* Wait for an unexpected request from the clients */
ret = bmi_comm_recvu(&peer_addr, &recvbuf, &recvbuflen, &tag);
assert(ret == 0);
/* Check Operation Code */
char *op = (char*)recvbuf;
if (strncmp(op, "write", 5) == 0) {
fprintf(stderr, "Body: %s\n", op + 6);
/* Send the reply (not unexpected message) */
char *sendbuf = "ok";
bmi_size_t sendbuflen = 2;
ret = bmi_comm_send(peer_addr, sendbuf, sendbuflen, tag, context);
assert(ret == 0);
}
}
}
void do_client()
{
int ret;
const char *hostid = "tcp://localhost:3381";
bmi_context_id context;
BMI_addr_t peer_addr;
/* Initialize BMI */
ret = BMI_initialize(NULL, NULL, 0);
assert(ret == 0);
/* Create a new BMI context */
ret = BMI_open_context(&context);
assert(ret == 0);
/* Perform an address lookup on the ION */
ret = BMI_addr_lookup(&peer_addr, hostid);
assert(ret == 0);
/* Send unexpected message (8k limit) */
bmi_msg_tag_t tag = pthread_self();
char *sendbuf = "write\nHello World";
bmi_size_t sendbuflen = strlen(sendbuf);
ret = bmi_comm_sendu(peer_addr, sendbuf, sendbuflen, tag, context);
assert(ret == 0);
/* Recv reply (not unexpected message) */
char recvbuf[3];
bmi_size_t recvbuflen = 2;
ret = bmi_comm_recv(peer_addr, recvbuf, recvbuflen, tag, context);
recvbuf[2] = '\0';
assert(ret == 0);
fprintf(stderr, "Reply: %s\n", recvbuf);
}
int main(int argc, char **argv)
{
if (strcmp(argv[1], "server") == 0) {
do_server();
}
if (strcmp(argv[1], "client") == 0) {
do_client();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment