Created
August 30, 2021 23:53
-
-
Save DavidAntliff/06828b7e5e4ad85a0cb2616ba9151127 to your computer and use it in GitHub Desktop.
Support disconnection via subscription or timeout
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Working integration of mqtt_cpp with producer thread. | |
* Having the worker thread started by the connack handler is recommended by the mqtt_cpp author. | |
* This program also demonstrates automatic disconnection by a timer, as well as handling disconnection by the broker. | |
* Additionally, a message can be sent to the 'quit' topic to initiate disconnection. | |
* | |
* E.g. | |
* $ mosquitto_pub -h localhost -t quit -m 1 | |
*/ | |
#include <iostream> | |
#include <thread> | |
#include <chrono> | |
#include <boost/asio.hpp> | |
#include <mqtt_client_cpp.hpp> | |
using namespace std::chrono; | |
// Maintain consistent message publish rate in messages per second | |
constexpr int PUBLISH_RATE {25'000}; | |
// Auto disconnect after this period. Set to zero to disable. | |
constexpr auto WATCHDOG_TIMER {seconds{0}}; | |
struct App { | |
boost::asio::io_context ioc {}; | |
using client_t = decltype(MQTT_NS::make_sync_client(std::declval<boost::asio::io_context &>(), "", 0)); | |
client_t c; | |
}; | |
std::atomic<bool> running = false; | |
void task(App::client_t & c) { | |
// accessing c is generally not thread-safe, but calling c->socket() and using the result to post() is OK | |
auto loop_delay = round<system_clock::duration>(duration<double>{1.0 / PUBLISH_RATE}); | |
auto begin_time = system_clock::now(); | |
auto end_time = begin_time + loop_delay; | |
running = true; | |
while (running) { | |
c->socket()->post( | |
[&c] { | |
c->publish("mqtt_cpp_demo/topic1", "0123456789ABCDEF", mqtt::qos::at_most_once); | |
} | |
); | |
std::this_thread::sleep_until(end_time); | |
begin_time = end_time; | |
end_time = begin_time + loop_delay; | |
} | |
std::cout << "task complete" << std::endl; | |
} | |
int main() { | |
std::thread t; | |
auto app = std::make_shared<App>(); | |
app->c = mqtt::make_sync_client(app->ioc, "localhost", 1883); | |
app->c->set_client_id("mqtt_cpp_demo"); | |
app->c->set_clean_session(true); | |
app->c->set_connack_handler( | |
[&c = app->c, &t](bool sp, mqtt::connect_return_code connack_return_code){ | |
std::cout << "Connack handler called" << std::endl; | |
std::cout << "Session Present: " << std::boolalpha << sp << std::endl; | |
std::cout << "Connack Return Code: " << connack_return_code << std::endl; | |
if (connack_return_code == mqtt::connect_return_code::accepted) { | |
c->subscribe("quit", mqtt::qos::at_most_once); | |
// start the publish thread | |
t = std::thread(&task, std::ref(c)); | |
} | |
return true; | |
} | |
); | |
app->c->set_close_handler( [](){ | |
std::cout << "close connection" << std::endl; | |
running = false; | |
}); | |
app->c->set_error_handler( | |
[](boost::system::error_code const & ec) { | |
std::cerr << "error " << ec << std::endl; | |
} | |
); | |
app->c->set_puback_handler( | |
[](std::uint16_t packet_id) { return true; } | |
); | |
app->c->set_pubrec_handler( | |
[](std::uint16_t packet_id) { return true; } | |
); | |
app->c->set_pubcomp_handler( | |
[](std::uint16_t packet_id) { return true; } | |
); | |
app->c->set_suback_handler( | |
[&c = app->c](std::uint16_t packet_id, std::vector<mqtt::suback_return_code> results){ | |
std::cout << "suback received. packet_id: " << packet_id << std::endl; | |
for (auto const& e : results) { | |
std::cout << "subscribe result: " << e << std::endl; | |
} | |
return true; | |
} | |
); | |
app->c->set_publish_handler( | |
[&c = app->c] | |
(mqtt::optional<std::uint16_t> packet_id, | |
mqtt::publish_options pubopts, | |
mqtt::buffer topic_name, | |
mqtt::buffer contents) { | |
std::cout << "publish received." | |
<< " dup: " << pubopts.get_dup() | |
<< " qos: " << pubopts.get_qos() | |
<< " retain: " << pubopts.get_retain() << std::endl; | |
if (packet_id) | |
std::cout << "packet_id: " << *packet_id << std::endl; | |
std::cout << "topic_name: " << topic_name << std::endl; | |
std::cout << "contents: " << contents << std::endl; | |
if (topic_name == "quit") { | |
std::cout << "Disconnect" << std::endl; | |
c->disconnect(); | |
} | |
return true; | |
} | |
); | |
std::thread watchdog; | |
if (WATCHDOG_TIMER > system_clock::duration::zero()) { | |
watchdog = std::thread([&]{ | |
std::cout << "watchdog started (" << duration_cast<seconds>(WATCHDOG_TIMER).count() << " seconds)" << std::endl; | |
std::this_thread::sleep_for(WATCHDOG_TIMER); | |
std::cout << "watchdog expired" << std::endl; | |
app->ioc.stop(); | |
std::cout << "ioc stopped" << std::endl; | |
}); | |
} | |
app->c->connect(); | |
app->ioc.run(); | |
std::cout << "ioc finished" << std::endl; | |
// ask the thread to stop | |
running = false; | |
if (watchdog.joinable()) { | |
watchdog.join(); | |
std::cout << "Watchdog thread joined" << std::endl; | |
} | |
if (t.joinable()) { | |
t.join(); | |
std::cout << "Task thread joined" << std::endl; | |
} | |
std::cout << "Done" << std::endl; | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment