Last active
September 21, 2018 19:26
-
-
Save fieldju/a0e698344f85e0284a77cabeee34986e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
protected List<Double> getTimeSeriesDataFromChannelMessages(List<ChannelMessage> channelMessages) { | |
// TODO If an error message is present at all does that mean everything is garbage? | |
channelMessages.parallelStream().filter(channelMessage -> channelMessage.getType().equals(ERROR_MESSAGE)) | |
.findAny() | |
.ifPresent(error -> { | |
ChannelMessage.ErrorMessage errorMessage = (ChannelMessage.ErrorMessage) error; | |
// TODO How do you get the errors from these objects? | |
List<Object> errros = errorMessage.getErrors(); | |
// TODO this error message sucks, how do we add more context to it, ex: get error messages from above? | |
throw new RuntimeException("Some sort of error occurred, when executing the signal flow program"); | |
}); | |
return channelMessages.parallelStream() | |
.filter(channelMessage -> channelMessage.getType().equals(DATA_MESSAGE)) | |
.map(message -> { | |
ChannelMessage.DataMessage dataMessage = (ChannelMessage.DataMessage) message; | |
Map<String, Number> data = dataMessage.getData(); | |
if (data.size() > 1) { | |
throw new IllegalStateException("There was more than one value for a given timestamp, an " + | |
"aggregation method should have been applied to the signal flow program"); | |
} | |
return data.size() == 1 ? data.values().stream().findFirst().get().doubleValue() : Double.NaN; | |
}).collect(Collectors.toList()); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment