Skip to content

Instantly share code, notes, and snippets.

@elizarov
Last active June 21, 2017 22:45
Show Gist options
  • Save elizarov/19652e5d677ec41cecedfa828b89c5f2 to your computer and use it in GitHub Desktop.
Save elizarov/19652e5d677ec41cecedfa828b89c5f2 to your computer and use it in GitHub Desktop.
Converted Rx Code
/*
fun BluetoothSocket.asyncListen(): Flowable<String> =
flowable<String>(BackpressureStrategy.BUFFER) { emitter ->
try {
val reader = BufferedReader(InputStreamReader(inputStream))
while (!emitter.isCancelled) {
reader.readLine()?.let { emitter.onNext(it) }
}
} catch (e: IOException) {
emitter.onError(e)
}
}
.subscribeOnNewThread()
.share()
*/
// for simplicity we assume just a single thread of reader is Ok
val readerThreadContext = newSingleThreadContext("reader")
fun BluetoothSocket.asyncListen() = produce<String>(readerThreadContext) {
val reader = BufferedReader(InputStreamReader(inputStream))
while (isActive) {
reader.readLine()?.let { send(it) }
}
}
/*
private fun configIncomingData(dataStream: Flowable<String>) {
disposables += dataStream
.observeOnComputationThread()
.filter(String::isNumber)
.map(String::toDigitalValue)
.subscribeBy(
onNext = {
val color = profile.analyteInfo.calibration.getColor(currentTime)
val coloredValue = ColoredValue(it, color)
currentTime += calibration.timeParams.increment.toFloat()
val measurement = Measurement(currentTime, it.toDouble())
if (coloredValue.color == Color.BLUE) measurements.add(measurement)
coloredDots.add(coloredValue)
},
onError = { postEvent(ConnectionEvent(DISCONNECTED)) }
)
disposables += dataStream
.observeOnNewThread()
.filter(String::isBaselineMessage)
.doOnNext {
calibration.baseline =
measurements.map { (_, digitalValue) -> digitalValue }.average()
}
.observeOnMainThread()
.subscribeBy(
onNext = { view.mainView.displaySnackbar("Baseline Analysis Completed") },
onError = { postEvent(ConnectionEvent(DISCONNECTED)) }
)
disposables += dataStream
.filter(String::isSampleMessage)
.observeOnMainThread()
.subscribeBy(
onNext = {
val correctedData = calibration.correctBaseline(measurements)
val concentration = concentrationLevel.get().toDouble()
val integral: Double = correctedData.integral.withThreeDecimalPlaces
calibration.replicates.add(Replicate(concentration, integral))
view.displayData(coloredDots)
lastMeasurementMessage.set(
"Last Measurement: L = $concentration ppm, I = $integral"
)
view.mainView.displaySnackbar("Sample Analysis Completed")
},
onError = { postEvent(ConnectionEvent(DISCONNECTED)) }
)
}
*/
private fun configIncomingData(dataStream: Channel<String>) = launch(UI) {
dataStream.consumeEach {
if (it.isNumber()) {
val color = profile.analyteInfo.calibration.getColor(currentTime)
val coloredValue = ColoredValue(it.toDigitalValue(), color)
currentTime += calibration.timeParams.increment.toFloat()
val measurement = Measurement(currentTime, it.toDigitalValue().toDouble())
if (coloredValue.color == Color.BLUE) measurements.add(measurement)
coloredDots.add(coloredValue)
}
if (it.isBaselineMessage()) {
calibration.baseline =
measurements.map { (_, digitalValue) -> digitalValue }.average()
view.mainView.displaySnackbar("Baseline Analysis Completed")
}
if (it.isSampleMessage()) {
val correctedData = calibration.correctBaseline(measurements)
val concentration = concentrationLevel.get().toDouble()
val integral: Double = correctedData.integral.withThreeDecimalPlaces
calibration.replicates.add(Replicate(concentration, integral))
view.displayData(coloredDots)
lastMeasurementMessage.set(
"Last Measurement: L = $concentration ppm, I = $integral"
)
view.mainView.displaySnackbar("Sample Analysis Completed")
}
// todo: handle error
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment