Last active
March 25, 2019 20:56
-
-
Save Teagan42/e4cf2357c0665fa147089b542ebcd9d8 to your computer and use it in GitHub Desktop.
Quick Channel Wrapped MqttClient
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package rocks.teagantotally.kotqtt.domain.framework.client | |
interface Client : MqttCommandExecutor, MqttEventProducer |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package rocks.teagantotally.kotqtt.data | |
import kotlinx.coroutines.CoroutineScope | |
import kotlinx.coroutines.channels.* | |
import kotlinx.coroutines.launch | |
import org.eclipse.paho.client.mqttv3.MqttAsyncClient | |
import rocks.teagantotally.kotqtt.domain.framework.client.Client | |
import rocks.teagantotally.kotqtt.domain.framework.client.CommandResult | |
import rocks.teagantotally.kotqtt.domain.framework.connections.MqttBrokerConnection | |
import rocks.teagantotally.kotqtt.domain.framework.connections.MqttConnectionOptions | |
import rocks.teagantotally.kotqtt.domain.models.commands.* | |
import rocks.teagantotally.kotqtt.domain.models.events.MqttEvent | |
class MqttClient( | |
val brokerConnection: MqttBrokerConnection, | |
val connectionOptions: MqttConnectionOptions, | |
coroutineScope: CoroutineScope | |
) : Client, | |
CoroutineScope by coroutineScope { | |
private val client: MqttAsyncClient = MqttAsyncClient( | |
brokerConnection.brokerUri, | |
brokerConnection.clientId | |
) | |
private val commandChannel: ReceiveChannel<MqttCommand> = Channel() | |
private val eventChannel: BroadcastChannel<MqttEvent> = ConflatedBroadcastChannel() | |
override suspend fun execute(command: MqttCommand) { | |
if (!commandChannel.isClosedForReceive) { | |
commandChannel.consumeEach { | |
when (it) { | |
is MqttConnectCommand -> connect(it) | |
is MqttDisconnectCommand -> disconnect(it) | |
is MqttPublishCommand -> publish(it) | |
is MqttSubscribeCommand -> subscribe(it) | |
is MqttUnsubscribeCommand -> unsubscribe(it) | |
} | |
} | |
} | |
} | |
override fun subscribe(): ReceiveChannel<MqttEvent> = | |
eventChannel.openSubscription() | |
private fun connect(command: MqttConnectCommand) { | |
client.connect() | |
} | |
private fun disconnect(command: MqttDisconnectCommand) { | |
if (!client.isConnected) { | |
sendNotConnectedEvent(command) | |
} else { | |
client.disconnect() | |
} | |
} | |
private fun publish(command: MqttPublishCommand) { | |
if (!client.isConnected) { | |
sendNotConnectedEvent(command) | |
} else { | |
with(command.message) { | |
client.publish( | |
topic, | |
payload, | |
qos.value, | |
retain | |
) | |
} | |
} | |
} | |
private fun subscribe(command: MqttSubscribeCommand) { | |
if (!client.isConnected) { | |
sendNotConnectedEvent(command) | |
} else { | |
with(command) { | |
client.subscribe( | |
topic, | |
qos.value | |
) | |
} | |
} | |
} | |
private fun unsubscribe(command: MqttUnsubscribeCommand) { | |
if (!client.isConnected) { | |
sendNotConnectedEvent(command) | |
} else { | |
with(command) { | |
client.unsubscribe(topic) | |
} | |
} | |
} | |
private fun <CommandType : MqttCommand> sendNotConnectedEvent(command: CommandType) { | |
launch { | |
eventChannel.send( | |
CommandResult.Failure( | |
command, | |
IllegalStateException("Client is not connected") | |
) | |
) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package rocks.teagantotally.kotqtt.domain.framework.client | |
import rocks.teagantotally.kotqtt.domain.models.commands.MqttCommand | |
interface MqttCommandExecutor { | |
suspend fun execute(command: MqttCommand) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package rocks.teagantotally.kotqtt.domain.framework.client | |
import kotlinx.coroutines.channels.ReceiveChannel | |
import rocks.teagantotally.kotqtt.domain.models.events.MqttEvent | |
interface MqttEventProducer { | |
fun subscribe(): ReceiveChannel<MqttEvent> | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment