Skip to content

Instantly share code, notes, and snippets.

@m2mIO-gister
Last active June 3, 2022 05:40
Show Gist options
  • Save m2mIO-gister/5275324 to your computer and use it in GitHub Desktop.
Save m2mIO-gister/5275324 to your computer and use it in GitHub Desktop.
Example MQTT Messaging in Java
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
public class SimpleMqttClient implements MqttCallback {
MqttClient myClient;
MqttConnectOptions connOpt;
static final String BROKER_URL = "tcp://q.m2m.io:1883";
static final String M2MIO_DOMAIN = "<Insert m2m.io domain here>";
static final String M2MIO_STUFF = "things";
static final String M2MIO_THING = "<Unique device ID>";
static final String M2MIO_USERNAME = "<m2m.io username>";
static final String M2MIO_PASSWORD_MD5 = "<m2m.io password (MD5 sum of password)>";
// the following two flags control whether this example is a publisher, a subscriber or both
static final Boolean subscriber = true;
static final Boolean publisher = true;
/**
*
* connectionLost
* This callback is invoked upon losing the MQTT connection.
*
*/
@Override
public void connectionLost(Throwable t) {
System.out.println("Connection lost!");
// code to reconnect to the broker would go here if desired
}
/**
*
* deliveryComplete
* This callback is invoked when a message published by this client
* is successfully received by the broker.
*
*/
@Override
public void deliveryComplete(MqttDeliveryToken token) {
//System.out.println("Pub complete" + new String(token.getMessage().getPayload()));
}
/**
*
* messageArrived
* This callback is invoked when a message is received on a subscribed topic.
*
*/
@Override
public void messageArrived(MqttTopic topic, MqttMessage message) throws Exception {
System.out.println("-------------------------------------------------");
System.out.println("| Topic:" + topic.getName());
System.out.println("| Message: " + new String(message.getPayload()));
System.out.println("-------------------------------------------------");
}
/**
*
* MAIN
*
*/
public static void main(String[] args) {
SimpleMqttClient smc = new SimpleMqttClient();
smc.runClient();
}
/**
*
* runClient
* The main functionality of this simple example.
* Create a MQTT client, connect to broker, pub/sub, disconnect.
*
*/
public void runClient() {
// setup MQTT Client
String clientID = M2MIO_THING;
connOpt = new MqttConnectOptions();
connOpt.setCleanSession(true);
connOpt.setKeepAliveInterval(30);
connOpt.setUserName(M2MIO_USERNAME);
connOpt.setPassword(M2MIO_PASSWORD_MD5.toCharArray());
// Connect to Broker
try {
myClient = new MqttClient(BROKER_URL, clientID);
myClient.setCallback(this);
myClient.connect(connOpt);
} catch (MqttException e) {
e.printStackTrace();
System.exit(-1);
}
System.out.println("Connected to " + BROKER_URL);
// setup topic
// topics on m2m.io are in the form <domain>/<stuff>/<thing>
String myTopic = M2MIO_DOMAIN + "/" + M2MIO_STUFF + "/" + M2MIO_THING;
MqttTopic topic = myClient.getTopic(myTopic);
// subscribe to topic if subscriber
if (subscriber) {
try {
int subQoS = 0;
myClient.subscribe(myTopic, subQoS);
} catch (Exception e) {
e.printStackTrace();
}
}
// publish messages if publisher
if (publisher) {
for (int i=1; i<=10; i++) {
String pubMsg = "{\"pubmsg\":" + i + "}";
int pubQoS = 0;
MqttMessage message = new MqttMessage(pubMsg.getBytes());
message.setQos(pubQoS);
message.setRetained(false);
// Publish the message
System.out.println("Publishing to topic \"" + topic + "\" qos " + pubQoS);
MqttDeliveryToken token = null;
try {
// publish message to broker
token = topic.publish(message);
// Wait until the message has been delivered to the broker
token.waitForCompletion();
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
}
}
// disconnect
try {
// wait to ensure subscribed messages are delivered
if (subscriber) {
Thread.sleep(5000);
}
myClient.disconnect();
} catch (Exception e) {
e.printStackTrace();
}
}
}
@apparao1
Copy link

directly created mqttClient. java but in that condition is if(publisher) and if(subscriber) this condition where it will check without providing classes

@apparao1
Copy link

@m2mIO-gister; Hi gister need your help on this exapmle. I am not getting output from this class MqttApp.java and when I run publisher.class output displayed and when I run Subscriber no output. Please can you check below code and update me if possible.

public class MqttApp {

public static void main(String[] args) throws MqttException {

	if(args.length < 1) {
		throw new IllegalArgumentException("Must have either 'publisher' or 'subscriber' as argument");
	}
	
	switch (args[0]) {
	case "publisher":
		Publisher.main(args);
	case "subscriber":
		Subscriber.main(args);
	break;
	default:
		throw new IllegalArgumentException("What to pass args" + args[0]);
	
	}
	
}

}
and

public class SimpleMqttCallBack implements MqttCallback{

@Override
public void connectionLost(Throwable cause) {
	
	System.out.println("Connection to MQTT broker Lost!");
	
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
	
	System.out.println("Message Received:\t"+ new String(message.getPayload()));
	
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
			
} 

}

and

public class Publisher {

public static void main(String[] args) throws MqttException{
	// TODO Auto-generated method stub
	
	String messageString = "Hello World from Java";
	
	if(args.length == 2) {
		messageString = args[1];
	}
	
	System.out.println("== START PUBLISHER ==");
	
	MqttClient client = new MqttClient("tcp://localhost:1883", MqttClient.generateClientId());
	client.connect();
	//System.out.println(MqttClient.generateClientId());
	MqttMessage msg =  new MqttMessage();
	msg.setPayload(messageString.getBytes());
	client.publish("iot_data", msg);
	System.out.println("\tMessage '"+ messageString +"' to 'iot_data'");
	
	
}

}

and

package com.mqtt.mqttCall;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;

public class Subscriber {

public static void main(String[] args) throws MqttException {
	
	System.out.println("== START SUBSCRIBE ==");
	
	MqttClient client = new MqttClient("tcp://localhost:1883", MqttClient.generateClientId());
	
	//System.out.println(MqttClient.generateClientId());
	
	client.setCallback(new SimpleMqttCallBack());
	
	client.connect();
	
	client.subscribe("iot_data");
	
}

}

@apparao1
Copy link

Please help me with that code what is error in that?

@apparao1
Copy link

Please reply for the example I am started learning MQTT from yesterday onwards I didn't understand anything. Please help me

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment