Skip to content

Instantly share code, notes, and snippets.

@marcosabel
Last active July 8, 2018 00:29
Show Gist options
  • Save marcosabel/39cada92369068d45f27accb9bb3dd22 to your computer and use it in GitHub Desktop.
Save marcosabel/39cada92369068d45f27accb9bb3dd22 to your computer and use it in GitHub Desktop.
@Component
public class MqttComponentPurePahoImpl implements MqttComponent {
private final String mqttUrl;
private final String mqttUser;
private final String mqttPassword;
private final String displayNameNode;
private final ObjectWriter objectWriter;
private MqttClient mqttClient;
private static final Logger log = LoggerFactory.getLogger(MqttComponentPurePahoImpl.class);
public MqttAccessorPurePahoImpl(
@Value("${displayName}") String displayName,
@Value("${node.id}") String node,
@Value("${mqtt.url}") String mqttUrl,
@Value("${mqtt.user}") String mqttUser,
@Value("${mqtt.password}") String mqttPassword) {
this.mqttUrl = mqttUrl;
this.mqttUser = mqttUser;
this.mqttPassword = mqttPassword;
this.displayNameNode = displayName + "-" + node;
this.objectWriter = new ObjectMapper().writer().withDefaultPrettyPrinter();
this.mqttClient = createMqttClient(mqttUser, mqttPassword, mqttUrlList, displayNameNode);
}
@Override
public void publish(String topic, Object payload, int qos) {
try {
MqttMessage mqttMessage = new MqttMessage(objectWriter.writeValueAsBytes(payload));
mqttMessage.setQos(qos);
log.debug("Publishing topic [{}] using plain paho. Message: [{}]", topic, mqttMessage);
getMqttClient().publish(topic, mqttMessage);
} catch (MqttException | NullPointerException | JsonProcessingException e) {
log.error("Cannot publish topic [{}] and message using paho", topic);
throw new TopicNotPublishedException(e);
}
}
@Override
public void subscribeToTopicWithEmptyPayload(String topic, Function<String, Void> callback, int qos) {
try {
getMqttClient().subscribe(topic, qos, new IMqttMessageListener() {
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
callback.apply(s);
}
});
} catch (MqttException e) {
log.error("Cannot subscribe to topic [{}]", topic);
}
}
private MqttClient getMqttClient() throws MqttException {
if (mqttClient == null) {
mqttClient = createMqttClient(mqttUser, mqttPassword, mqttUrlList, displayNameNode);
log.info("Paho client was null, a new one was created");
}
if (!mqttClient.isConnected()) {
log.info("Paho client is disconnected. Reconnecting");
mqttClient.reconnect();
}
return mqttClient;
}
private static synchronized MqttClient createMqttClient(String mqttUser, String mqttPassword, String mqttUrl,
String nodeName) {
MqttConnectOptions options = new MqttConnectOptions();
if (!StringUtils.isEmpty(mqttUser)) {
options.setUserName(mqttUser);
options.setPassword(mqttPassword.toCharArray());
options.setCleanSession(true);
options.setAutomaticReconnect(true);
options.setMaxInflight(5000);
}
MqttClient mqttClient;
try {
mqttClient = new MqttClient(mqttUrl, UUID.randomUUID(), new MemoryPersistence());
mqttClient.connect(options);
log.info("Paho client is connected");
} catch (MqttException e) {
log.error("Cannot connect to mqtt");
mqttClient = null;
}
return mqttClient;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment