Skip to content

Instantly share code, notes, and snippets.

@jnbdz
Forked from sentinelleader/rmq-publish-with-header.c
Last active August 29, 2015 14:26
Show Gist options
  • Save jnbdz/b10c52c7a032a1e8afd7 to your computer and use it in GitHub Desktop.
Save jnbdz/b10c52c7a032a1e8afd7 to your computer and use it in GitHub Desktop.
rmq-publish-with-header.c for Mosquitto MQTT broker
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include <clem_rmq.h>
#include <assert.h>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
#include "memory_mosq.h"
#include "mosquitto_broker.h"
void die(const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
fprintf(stderr, "\n");
exit(1);
}
void die_on_error(int x, char const *context)
{
if (x < 0) {
fprintf(stderr, "%s: %s\n", context, amqp_error_string2(x));
exit(1);
}
}
void die_on_amqp_error(amqp_rpc_reply_t x, char const *context)
{
switch (x.reply_type) {
case AMQP_RESPONSE_NORMAL:
return;
case AMQP_RESPONSE_NONE:
fprintf(stderr, "%s: missing RPC reply type!\n", context);
break;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
fprintf(stderr, "%s: %s\n", context, amqp_error_string2(x.library_error));
break;
case AMQP_RESPONSE_SERVER_EXCEPTION:
switch (x.reply.id) {
case AMQP_CONNECTION_CLOSE_METHOD: {
amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
fprintf(stderr, "%s: server connection error %d, message: %.*s\n",
context,
m->reply_code,
(int) m->reply_text.len, (char *) m->reply_text.bytes);
break;
}
case AMQP_CHANNEL_CLOSE_METHOD: {
amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
fprintf(stderr, "%s: server channel error %d, message: %.*s\n",
context,
m->reply_code,
(int) m->reply_text.len, (char *) m->reply_text.bytes);
break;
}
default:
fprintf(stderr, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id);
break;
}
break;
}
exit(1);
}
char *gen_rkey(const char *mapped_topic, const char *orig, const char *rep)
{
static char buffer[4096];
char *p;
if(!(p = strstr(mapped_topic, orig))) // Is 'orig' even in 'str'?
return mapped_topic;
strncpy(buffer, mapped_topic, p-mapped_topic); // Copy characters from 'str' start to 'orig' st$
buffer[p-mapped_topic] = '\0';
sprintf(buffer+(p-mapped_topic), "%s%s", rep, p+strlen(orig));
return buffer;
}
static void send_msg(amqp_connection_state_t conn,
struct mosquitto *mosq,
const char *source_id,
const char *mapped_topic,
const void *payload)
{
char message[256];
const char *queue_name;
queue_name = gen_rkey(mapped_topic, "/", ".");
amqp_bytes_t message_bytes;
message = char (payload);
message_bytes.len = sizeof(message);
message_bytes.bytes = message;
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_HEADERS_FLAG;
amqp_table_entry_t entries[1];
amqp_table_t table;
entries[0].key = amqp_cstring_bytes("clientid");
entries[0].value.kind = AMQP_FIELD_KIND_UTF8;
entries[0].value.value.bytes = amqp_cstring_bytes(source_id);
table.num_entries = 1;
table.entries = entries;
qsort(table.entries, table.num_entries, sizeof(amqp_table_entry_t), &amqp_table_entry_cmp);
props.headers = table;
die_on_error(amqp_basic_publish(conn,
1,
amqp_cstring_bytes("amq.topic"),
amqp_cstring_bytes(queue_name),
0,
0,
&props,
message_bytes),
"Publishing");
}
int publish_msg_rmq(struct mosquitto *mosq, const char *source_id, const char *mapped_topic, const void *payload)
{
char const *hostname;
int port, status;
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
hostname = "172.16.16.16";
port = 5672;
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
status = amqp_socket_open(socket, hostname, port);
if (status) {
die("opening TCP socket");
}
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "admin", "changeme"),
"Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
send_msg(conn, mosq, mapped_topic, source_id, payload);
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment