Last active
July 8, 2018 00:29
-
-
Save marcosabel/39cada92369068d45f27accb9bb3dd22 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Component | |
public class MqttComponentPurePahoImpl implements MqttComponent { | |
private final String mqttUrl; | |
private final String mqttUser; | |
private final String mqttPassword; | |
private final String displayNameNode; | |
private final ObjectWriter objectWriter; | |
private MqttClient mqttClient; | |
private static final Logger log = LoggerFactory.getLogger(MqttComponentPurePahoImpl.class); | |
public MqttAccessorPurePahoImpl( | |
@Value("${displayName}") String displayName, | |
@Value("${node.id}") String node, | |
@Value("${mqtt.url}") String mqttUrl, | |
@Value("${mqtt.user}") String mqttUser, | |
@Value("${mqtt.password}") String mqttPassword) { | |
this.mqttUrl = mqttUrl; | |
this.mqttUser = mqttUser; | |
this.mqttPassword = mqttPassword; | |
this.displayNameNode = displayName + "-" + node; | |
this.objectWriter = new ObjectMapper().writer().withDefaultPrettyPrinter(); | |
this.mqttClient = createMqttClient(mqttUser, mqttPassword, mqttUrlList, displayNameNode); | |
} | |
@Override | |
public void publish(String topic, Object payload, int qos) { | |
try { | |
MqttMessage mqttMessage = new MqttMessage(objectWriter.writeValueAsBytes(payload)); | |
mqttMessage.setQos(qos); | |
log.debug("Publishing topic [{}] using plain paho. Message: [{}]", topic, mqttMessage); | |
getMqttClient().publish(topic, mqttMessage); | |
} catch (MqttException | NullPointerException | JsonProcessingException e) { | |
log.error("Cannot publish topic [{}] and message using paho", topic); | |
throw new TopicNotPublishedException(e); | |
} | |
} | |
@Override | |
public void subscribeToTopicWithEmptyPayload(String topic, Function<String, Void> callback, int qos) { | |
try { | |
getMqttClient().subscribe(topic, qos, new IMqttMessageListener() { | |
@Override | |
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { | |
callback.apply(s); | |
} | |
}); | |
} catch (MqttException e) { | |
log.error("Cannot subscribe to topic [{}]", topic); | |
} | |
} | |
private MqttClient getMqttClient() throws MqttException { | |
if (mqttClient == null) { | |
mqttClient = createMqttClient(mqttUser, mqttPassword, mqttUrlList, displayNameNode); | |
log.info("Paho client was null, a new one was created"); | |
} | |
if (!mqttClient.isConnected()) { | |
log.info("Paho client is disconnected. Reconnecting"); | |
mqttClient.reconnect(); | |
} | |
return mqttClient; | |
} | |
private static synchronized MqttClient createMqttClient(String mqttUser, String mqttPassword, String mqttUrl, | |
String nodeName) { | |
MqttConnectOptions options = new MqttConnectOptions(); | |
if (!StringUtils.isEmpty(mqttUser)) { | |
options.setUserName(mqttUser); | |
options.setPassword(mqttPassword.toCharArray()); | |
options.setCleanSession(true); | |
options.setAutomaticReconnect(true); | |
options.setMaxInflight(5000); | |
} | |
MqttClient mqttClient; | |
try { | |
mqttClient = new MqttClient(mqttUrl, UUID.randomUUID(), new MemoryPersistence()); | |
mqttClient.connect(options); | |
log.info("Paho client is connected"); | |
} catch (MqttException e) { | |
log.error("Cannot connect to mqtt"); | |
mqttClient = null; | |
} | |
return mqttClient; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment