Skip to content

Instantly share code, notes, and snippets.

@vklachkov
Last active March 14, 2021 13:04
Show Gist options
  • Save vklachkov/5964ef1168f7c9bb3b684c700822be0a to your computer and use it in GitHub Desktop.
Save vklachkov/5964ef1168f7c9bb3b684c700822be0a to your computer and use it in GitHub Desktop.
Coroutines over mqtt android client
package mqtt
import kotlinx.coroutines.suspendCancellableCoroutine
import org.eclipse.paho.android.service.MqttAndroidClient
import org.eclipse.paho.client.mqttv3.*
import timber.log.Timber
import kotlin.coroutines.resume
suspend inline fun MqttAndroidClient.awaitConnectWith(
crossinline options: MqttConnectOptions.() -> Unit
) = suspendCancellableCoroutine<Unit> { continuation ->
Log.d("awaitConnectWith", "Start suspendCancellableCoroutine")
connect(
MqttConnectOptions().apply { options(this) },
null,
object: IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken) {
Log.d("awaitConnectWith", "On success")
continuation.resume(Unit)
}
override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
Log.d("awaitConnectWith", "On failure")
continuation.cancel(exception)
}
}
)
Log.d("awaitConnectWith", "End suspendCancellableCoroutine")
}
suspend inline fun MqttAndroidClient.awaitSubscribe(
topic: String
) = suspendCancellableCoroutine<Unit> { continuation ->
subscribe(topic, 0, null, object: IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken) {
continuation.resume(Unit)
}
override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
Timber.e("Failure on await subscribe")
Timber.e(exception)
continuation.cancel(exception)
}
})
}
suspend inline fun MqttAndroidClient.awaitPublish(
topic: String,
message: Any
) = suspendCancellableCoroutine<Unit> { continuation ->
publish(topic, message.toString().toMqttMessage(), null, object: IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken) {
continuation.resume(Unit)
}
override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
Timber.e("Failure on await publish")
Timber.e(exception)
continuation.cancel(exception)
}
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment