I hereby claim:
- I am bbejeck on github.
- I am terpsdad (https://keybase.io/terpsdad) on keybase.
- I have a public key ASCyvUZxKkXxViUw26zdemp5hRPxzRAdg1PCBTJw42TU5Ao
To claim this, I am signing this object:
| echo "Enter the name for your cluster" | |
| read CLUSTER_NAME; | |
| echo "Enter the cloud (aws, azure, gcp)" | |
| read CLOUD; | |
| echo "Enter the region" | |
| read REGION; | |
| echo "Enter the geo location (us for now)" | |
| read GEO; |
I hereby claim:
To claim this, I am signing this object:
| /* | |
| * Licensed to the Apache Software Foundation (ASF) under one or more | |
| * contributor license agreements. See the NOTICE file distributed with | |
| * this work for additional information regarding copyright ownership. | |
| * The ASF licenses this file to You under the Apache License, Version 2.0 | |
| * (the "License"); you may not use this file except in compliance with | |
| * the License. You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * |
| /* | |
| * Licensed to the Apache Software Foundation (ASF) under one or more | |
| * contributor license agreements. See the NOTICE file distributed with | |
| * this work for additional information regarding copyright ownership. | |
| * The ASF licenses this file to You under the Apache License, Version 2.0 | |
| * (the "License"); you may not use this file except in compliance with | |
| * the License. You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * |
| /* | |
| * Licensed to the Apache Software Foundation (ASF) under one or more | |
| * contributor license agreements. See the NOTICE file distributed with | |
| * this work for additional information regarding copyright ownership. | |
| * The ASF licenses this file to You under the Apache License, Version 2.0 | |
| * (the "License"); you may not use this file except in compliance with | |
| * the License. You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * |
| /* | |
| * Licensed to the Apache Software Foundation (ASF) under one or more | |
| * contributor license agreements. See the NOTICE file distributed with | |
| * this work for additional information regarding copyright ownership. | |
| * The ASF licenses this file to You under the Apache License, Version 2.0 | |
| * (the "License"); you may not use this file except in compliance with | |
| * the License. You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * |
| // imports and license left out for clarity | |
| public class OptimizedStreams { | |
| public static void main(String[] args) { | |
| final Properties properties = new Properties(); | |
| properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application"); | |
| properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092 "); | |
| properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); | |
| properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); |
| // process call left out for clarity; it places airline data | |
| // in a list, storing in state store by airport code | |
| @Override | |
| public void punctuate(long timestamp) { | |
| KeyValueIterator<String, List<String>> allFlights = flights.all(); | |
| while (allFlights.hasNext()) { | |
| KeyValue<String, List<String>> kv = allFlights.next(); | |
| List<String> flightList = kv.value; | |
| String key = kv.key; | |
| if(flightList.size() >= 100) { |
| public static String predict(DataRegression dataRegression) { | |
| try (OnlineLogisticRegression logisticRegression = new OnlineLogisticRegression()) { | |
| FlightData flightData = new FlightData(dataRegression.data); | |
| logisticRegression.readFields(new DataInputStream(new ByteArrayInputStream(dataRegression.coefficients))); | |
| double prediction = logisticRegression.classifyScalar(flightData.vector); | |
| String arrivalPrediction = prediction > 0.5 ? "on-time" : "late"; | |
| return String.format("%s predicted to be %s", new Flight(dataRegression.data), arrivalPrediction); | |
| } catch (Exception e) { | |
| LOG.error("Problems with predicting " + dataRegression.data, e); | |
| return null; |
| dataByAirportStream.join(regressionsByAirPortTable, | |
| (k, v) -> k, | |
| DataRegression::new) | |
| .mapValues(Predictor::predict) |