Created
March 31, 2016 07:08
-
-
Save deyindra/b38d46f6860ee7f249bdf94c5d4dcd1a to your computer and use it in GitHub Desktop.
TimeSeriesBasedStreamIterator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.util.PriorityQueue; | |
| import java.util.function.Function; | |
| public abstract class AggregateFunction<T extends Comparable<T>> implements Function<PriorityQueue<TimeSeriesType<T>>, T> { | |
| @Override | |
| public <V> Function<PriorityQueue<TimeSeriesType<T>>, V> andThen(Function<? super T, ? extends V> after) { | |
| throw new UnsupportedOperationException("Not supported..."); | |
| } | |
| @Override | |
| public <V> Function<V, T> compose(Function<? super V, ? extends PriorityQueue<TimeSeriesType<T>>> before) { | |
| throw new UnsupportedOperationException("Not supported..."); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.util.Comparator; | |
| public class CustomComparator<T extends Comparable<T>> implements Comparator<TimeSeriesType<T>> { | |
| @Override | |
| public int compare(TimeSeriesType<T> o1, TimeSeriesType<T> o2) { | |
| if(o1==null && o2==null){ | |
| return 0; | |
| }else if(o1==o2){ | |
| return 0; | |
| }else if(o1==null){ | |
| return -1; | |
| }else{ | |
| if(o2==null){ | |
| return 1; | |
| }else{ | |
| T obj1 = o1.getObject(); | |
| T obj2 = o2.getObject(); | |
| if(obj1==null && obj2==null){ | |
| return 0; | |
| }else if(obj1==obj2){ | |
| return 0; | |
| }else if(obj1==null){ | |
| return -1; | |
| }else{ | |
| return obj1.compareTo(obj2); | |
| } | |
| } | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.util.Optional; | |
| import java.util.PriorityQueue; | |
| public class MaxAggregateFunction<T extends Comparable<T>> extends AggregateFunction<T> { | |
| @Override | |
| public T apply(PriorityQueue<TimeSeriesType<T>> timeSeriesTypes) { | |
| Optional<TimeSeriesType<T>> optional = timeSeriesTypes.stream().max(new CustomComparator<T>()); | |
| if(optional.isPresent()){ | |
| return optional.get().getObject(); | |
| }else{ | |
| return null; | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.util.Optional; | |
| import java.util.PriorityQueue; | |
| public class MinAggregateFunction<T extends Comparable<T>> extends AggregateFunction<T> { | |
| @Override | |
| public T apply(PriorityQueue<TimeSeriesType<T>> timeSeriesTypes) { | |
| Optional<TimeSeriesType<T>> optional = timeSeriesTypes.stream().min(new CustomComparator<T>()); | |
| if(optional.isPresent()){ | |
| return optional.get().getObject(); | |
| }else{ | |
| return null; | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.sql.Timestamp; | |
| import java.util.*; | |
| import java.util.concurrent.TimeUnit; | |
| import java.util.stream.Stream; | |
| public class TimeSeriesBasedStreamIterator<T extends Comparable<T>> implements Iterator<T> { | |
| private Iterator<TimeSeriesType<T>> iterator; | |
| private PriorityQueue<TimeSeriesType<T>> priorityQueue; | |
| private boolean hasNext=false; | |
| private Timestamp lastTimestamp; | |
| private AggregateFunction<T> aggregateFunction; | |
| private TimeUnit timeUnit; | |
| private long windowSize; | |
| private T object; | |
| public TimeSeriesBasedStreamIterator(Stream<TimeSeriesType<T>> stream, AggregateFunction<T> function, TimeUnit timeUnit, long windowSize){ | |
| this.iterator = stream.iterator(); | |
| this.aggregateFunction = function; | |
| priorityQueue = new PriorityQueue<>(); | |
| this.timeUnit = timeUnit; | |
| this.windowSize = windowSize; | |
| setAdvance(); | |
| } | |
| public TimeSeriesBasedStreamIterator(Stream<TimeSeriesType<T>> stream, AggregateFunction<T> function, long windowSize){ | |
| this(stream,function,TimeUnit.MILLISECONDS,windowSize); | |
| } | |
| @Override | |
| public boolean hasNext() { | |
| return hasNext; | |
| } | |
| @Override | |
| public T next() { | |
| if(!hasNext){ | |
| throw new NoSuchElementException("No More Elements..."); | |
| } | |
| T prevObject = object; | |
| setAdvance(); | |
| return prevObject; | |
| } | |
| @Override | |
| public void remove() { | |
| throw new UnsupportedOperationException("Method not supported..."); | |
| } | |
| private void setAdvance(){ | |
| hasNext=false; | |
| while (iterator.hasNext()){ | |
| TimeSeriesType<T> type = iterator.next(); | |
| Timestamp ts = type.getTs(); | |
| if(lastTimestamp==null){ | |
| lastTimestamp = ts; | |
| }else{ | |
| if(isGreaterThan(ts)){ | |
| lastTimestamp=ts; | |
| object = aggregateFunction.apply(priorityQueue); | |
| priorityQueue.clear(); | |
| hasNext=true; | |
| } | |
| } | |
| priorityQueue.offer(type); | |
| if(hasNext){ | |
| break; | |
| } | |
| } | |
| //this is for the last iterator set... | |
| if(!hasNext){ | |
| if(!priorityQueue.isEmpty()){ | |
| object = aggregateFunction.apply(priorityQueue); | |
| priorityQueue.clear(); | |
| hasNext=true; | |
| } | |
| } | |
| } | |
| private boolean isGreaterThan(Timestamp ts){ | |
| long diff = ts.getTime() - lastTimestamp.getTime(); | |
| return (timeUnit.convert(diff,TimeUnit.MILLISECONDS)) >= windowSize; | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.sql.Timestamp; | |
| public class TimeSeriesType<T extends Comparable<T>> implements Comparable<TimeSeriesType<T>> { | |
| private Timestamp ts; | |
| private T object; | |
| public TimeSeriesType(T object) { | |
| this.object = object; | |
| ts = new Timestamp(System.currentTimeMillis()); | |
| } | |
| public T getObject() { | |
| return object; | |
| } | |
| public Timestamp getTs() { | |
| return ts; | |
| } | |
| @Override | |
| public boolean equals(Object o) { | |
| if (this == o) return true; | |
| if (o == null || getClass() != o.getClass()) return false; | |
| TimeSeriesType<?> that = (TimeSeriesType<?>) o; | |
| if (!ts.equals(that.ts)) return false; | |
| if (object != null ? !object.equals(that.object) : that.object != null) return false; | |
| return true; | |
| } | |
| @Override | |
| public int hashCode() { | |
| int result = ts.hashCode(); | |
| result = 31 * result + (object != null ? object.hashCode() : 0); | |
| return result; | |
| } | |
| @Override | |
| public int compareTo(TimeSeriesType<T> o) { | |
| if(o==null){ | |
| return 1; | |
| }else if(this==o){ | |
| return 0; | |
| }else{ | |
| int compare = this.ts.compareTo(o.ts); | |
| if(compare==0){ | |
| if(this.object==null && o.object==null){ | |
| return 0; | |
| }else{ | |
| if(this.object==null){ | |
| return -1; | |
| }else{ | |
| return this.object.compareTo(o.object); | |
| } | |
| } | |
| } | |
| return compare; | |
| } | |
| } | |
| @Override | |
| public String toString() { | |
| final StringBuilder sb = new StringBuilder("TimeSeriesType{"); | |
| sb.append("object=").append(object); | |
| sb.append(", ts=").append(ts); | |
| sb.append('}'); | |
| return sb.toString(); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment