Last active
August 29, 2015 14:23
-
-
Save sentinelleader/99cabbee55e2c7ec39e4 to your computer and use it in GitHub Desktop.
rmq-publish-with-header.c for Mosquitto MQTT broker
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdlib.h> | |
#include <stdio.h> | |
#include <string.h> | |
#include <stdint.h> | |
#include <clem_rmq.h> | |
#include <assert.h> | |
#include <amqp_tcp_socket.h> | |
#include <amqp.h> | |
#include <amqp_framing.h> | |
#include "memory_mosq.h" | |
#include "mosquitto_broker.h" | |
void die(const char *fmt, ...) | |
{ | |
va_list ap; | |
va_start(ap, fmt); | |
vfprintf(stderr, fmt, ap); | |
va_end(ap); | |
fprintf(stderr, "\n"); | |
exit(1); | |
} | |
void die_on_error(int x, char const *context) | |
{ | |
if (x < 0) { | |
fprintf(stderr, "%s: %s\n", context, amqp_error_string2(x)); | |
exit(1); | |
} | |
} | |
void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) | |
{ | |
switch (x.reply_type) { | |
case AMQP_RESPONSE_NORMAL: | |
return; | |
case AMQP_RESPONSE_NONE: | |
fprintf(stderr, "%s: missing RPC reply type!\n", context); | |
break; | |
case AMQP_RESPONSE_LIBRARY_EXCEPTION: | |
fprintf(stderr, "%s: %s\n", context, amqp_error_string2(x.library_error)); | |
break; | |
case AMQP_RESPONSE_SERVER_EXCEPTION: | |
switch (x.reply.id) { | |
case AMQP_CONNECTION_CLOSE_METHOD: { | |
amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded; | |
fprintf(stderr, "%s: server connection error %d, message: %.*s\n", | |
context, | |
m->reply_code, | |
(int) m->reply_text.len, (char *) m->reply_text.bytes); | |
break; | |
} | |
case AMQP_CHANNEL_CLOSE_METHOD: { | |
amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded; | |
fprintf(stderr, "%s: server channel error %d, message: %.*s\n", | |
context, | |
m->reply_code, | |
(int) m->reply_text.len, (char *) m->reply_text.bytes); | |
break; | |
} | |
default: | |
fprintf(stderr, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id); | |
break; | |
} | |
break; | |
} | |
exit(1); | |
} | |
char *gen_rkey(const char *mapped_topic, const char *orig, const char *rep) | |
{ | |
static char buffer[4096]; | |
char *p; | |
if(!(p = strstr(mapped_topic, orig))) // Is 'orig' even in 'str'? | |
return mapped_topic; | |
strncpy(buffer, mapped_topic, p-mapped_topic); // Copy characters from 'str' start to 'orig' st$ | |
buffer[p-mapped_topic] = '\0'; | |
sprintf(buffer+(p-mapped_topic), "%s%s", rep, p+strlen(orig)); | |
return buffer; | |
} | |
static void send_msg(amqp_connection_state_t conn, | |
struct mosquitto *mosq, | |
const char *source_id, | |
const char *mapped_topic, | |
const void *payload) | |
{ | |
char message[256]; | |
const char *queue_name; | |
queue_name = gen_rkey(mapped_topic, "/", "."); | |
amqp_bytes_t message_bytes; | |
message = char (payload); | |
message_bytes.len = sizeof(message); | |
message_bytes.bytes = message; | |
amqp_basic_properties_t props; | |
props._flags = AMQP_BASIC_HEADERS_FLAG; | |
amqp_table_entry_t entries[1]; | |
amqp_table_t table; | |
entries[0].key = amqp_cstring_bytes("clientid"); | |
entries[0].value.kind = AMQP_FIELD_KIND_UTF8; | |
entries[0].value.value.bytes = amqp_cstring_bytes(source_id); | |
table.num_entries = 1; | |
table.entries = entries; | |
qsort(table.entries, table.num_entries, sizeof(amqp_table_entry_t), &amqp_table_entry_cmp); | |
props.headers = table; | |
die_on_error(amqp_basic_publish(conn, | |
1, | |
amqp_cstring_bytes("amq.topic"), | |
amqp_cstring_bytes(queue_name), | |
0, | |
0, | |
&props, | |
message_bytes), | |
"Publishing"); | |
} | |
int publish_msg_rmq(struct mosquitto *mosq, const char *source_id, const char *mapped_topic, const void *payload) | |
{ | |
char const *hostname; | |
int port, status; | |
amqp_socket_t *socket = NULL; | |
amqp_connection_state_t conn; | |
hostname = "172.16.16.16"; | |
port = 5672; | |
conn = amqp_new_connection(); | |
socket = amqp_tcp_socket_new(conn); | |
if (!socket) { | |
die("creating TCP socket"); | |
} | |
status = amqp_socket_open(socket, hostname, port); | |
if (status) { | |
die("opening TCP socket"); | |
} | |
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "admin", "changeme"), | |
"Logging in"); | |
amqp_channel_open(conn, 1); | |
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); | |
send_msg(conn, mosq, mapped_topic, source_id, payload); | |
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); | |
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); | |
die_on_error(amqp_destroy_connection(conn), "Ending connection"); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment