Skip to content

Instantly share code, notes, and snippets.

@Teagan42
Last active March 25, 2019 20:56
Show Gist options
  • Save Teagan42/e4cf2357c0665fa147089b542ebcd9d8 to your computer and use it in GitHub Desktop.
Save Teagan42/e4cf2357c0665fa147089b542ebcd9d8 to your computer and use it in GitHub Desktop.
Quick Channel Wrapped MqttClient
package rocks.teagantotally.kotqtt.domain.framework.client
interface Client : MqttCommandExecutor, MqttEventProducer
package rocks.teagantotally.kotqtt.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.launch
import org.eclipse.paho.client.mqttv3.MqttAsyncClient
import rocks.teagantotally.kotqtt.domain.framework.client.Client
import rocks.teagantotally.kotqtt.domain.framework.client.CommandResult
import rocks.teagantotally.kotqtt.domain.framework.connections.MqttBrokerConnection
import rocks.teagantotally.kotqtt.domain.framework.connections.MqttConnectionOptions
import rocks.teagantotally.kotqtt.domain.models.commands.*
import rocks.teagantotally.kotqtt.domain.models.events.MqttEvent
class MqttClient(
val brokerConnection: MqttBrokerConnection,
val connectionOptions: MqttConnectionOptions,
coroutineScope: CoroutineScope
) : Client,
CoroutineScope by coroutineScope {
private val client: MqttAsyncClient = MqttAsyncClient(
brokerConnection.brokerUri,
brokerConnection.clientId
)
private val commandChannel: ReceiveChannel<MqttCommand> = Channel()
private val eventChannel: BroadcastChannel<MqttEvent> = ConflatedBroadcastChannel()
override suspend fun execute(command: MqttCommand) {
if (!commandChannel.isClosedForReceive) {
commandChannel.consumeEach {
when (it) {
is MqttConnectCommand -> connect(it)
is MqttDisconnectCommand -> disconnect(it)
is MqttPublishCommand -> publish(it)
is MqttSubscribeCommand -> subscribe(it)
is MqttUnsubscribeCommand -> unsubscribe(it)
}
}
}
}
override fun subscribe(): ReceiveChannel<MqttEvent> =
eventChannel.openSubscription()
private fun connect(command: MqttConnectCommand) {
client.connect()
}
private fun disconnect(command: MqttDisconnectCommand) {
if (!client.isConnected) {
sendNotConnectedEvent(command)
} else {
client.disconnect()
}
}
private fun publish(command: MqttPublishCommand) {
if (!client.isConnected) {
sendNotConnectedEvent(command)
} else {
with(command.message) {
client.publish(
topic,
payload,
qos.value,
retain
)
}
}
}
private fun subscribe(command: MqttSubscribeCommand) {
if (!client.isConnected) {
sendNotConnectedEvent(command)
} else {
with(command) {
client.subscribe(
topic,
qos.value
)
}
}
}
private fun unsubscribe(command: MqttUnsubscribeCommand) {
if (!client.isConnected) {
sendNotConnectedEvent(command)
} else {
with(command) {
client.unsubscribe(topic)
}
}
}
private fun <CommandType : MqttCommand> sendNotConnectedEvent(command: CommandType) {
launch {
eventChannel.send(
CommandResult.Failure(
command,
IllegalStateException("Client is not connected")
)
)
}
}
}
package rocks.teagantotally.kotqtt.domain.framework.client
import rocks.teagantotally.kotqtt.domain.models.commands.MqttCommand
interface MqttCommandExecutor {
suspend fun execute(command: MqttCommand)
}
package rocks.teagantotally.kotqtt.domain.framework.client
import kotlinx.coroutines.channels.ReceiveChannel
import rocks.teagantotally.kotqtt.domain.models.events.MqttEvent
interface MqttEventProducer {
fun subscribe(): ReceiveChannel<MqttEvent>
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment