Skip to content

Instantly share code, notes, and snippets.

@deyindra
Created March 31, 2016 07:08
Show Gist options
  • Select an option

  • Save deyindra/b38d46f6860ee7f249bdf94c5d4dcd1a to your computer and use it in GitHub Desktop.

Select an option

Save deyindra/b38d46f6860ee7f249bdf94c5d4dcd1a to your computer and use it in GitHub Desktop.
TimeSeriesBasedStreamIterator
import java.util.PriorityQueue;
import java.util.function.Function;
public abstract class AggregateFunction<T extends Comparable<T>> implements Function<PriorityQueue<TimeSeriesType<T>>, T> {
@Override
public <V> Function<PriorityQueue<TimeSeriesType<T>>, V> andThen(Function<? super T, ? extends V> after) {
throw new UnsupportedOperationException("Not supported...");
}
@Override
public <V> Function<V, T> compose(Function<? super V, ? extends PriorityQueue<TimeSeriesType<T>>> before) {
throw new UnsupportedOperationException("Not supported...");
}
}
import java.util.Comparator;
public class CustomComparator<T extends Comparable<T>> implements Comparator<TimeSeriesType<T>> {
@Override
public int compare(TimeSeriesType<T> o1, TimeSeriesType<T> o2) {
if(o1==null && o2==null){
return 0;
}else if(o1==o2){
return 0;
}else if(o1==null){
return -1;
}else{
if(o2==null){
return 1;
}else{
T obj1 = o1.getObject();
T obj2 = o2.getObject();
if(obj1==null && obj2==null){
return 0;
}else if(obj1==obj2){
return 0;
}else if(obj1==null){
return -1;
}else{
return obj1.compareTo(obj2);
}
}
}
}
}
import java.util.Optional;
import java.util.PriorityQueue;
public class MaxAggregateFunction<T extends Comparable<T>> extends AggregateFunction<T> {
@Override
public T apply(PriorityQueue<TimeSeriesType<T>> timeSeriesTypes) {
Optional<TimeSeriesType<T>> optional = timeSeriesTypes.stream().max(new CustomComparator<T>());
if(optional.isPresent()){
return optional.get().getObject();
}else{
return null;
}
}
}
import java.util.Optional;
import java.util.PriorityQueue;
public class MinAggregateFunction<T extends Comparable<T>> extends AggregateFunction<T> {
@Override
public T apply(PriorityQueue<TimeSeriesType<T>> timeSeriesTypes) {
Optional<TimeSeriesType<T>> optional = timeSeriesTypes.stream().min(new CustomComparator<T>());
if(optional.isPresent()){
return optional.get().getObject();
}else{
return null;
}
}
}
import java.sql.Timestamp;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
public class TimeSeriesBasedStreamIterator<T extends Comparable<T>> implements Iterator<T> {
private Iterator<TimeSeriesType<T>> iterator;
private PriorityQueue<TimeSeriesType<T>> priorityQueue;
private boolean hasNext=false;
private Timestamp lastTimestamp;
private AggregateFunction<T> aggregateFunction;
private TimeUnit timeUnit;
private long windowSize;
private T object;
public TimeSeriesBasedStreamIterator(Stream<TimeSeriesType<T>> stream, AggregateFunction<T> function, TimeUnit timeUnit, long windowSize){
this.iterator = stream.iterator();
this.aggregateFunction = function;
priorityQueue = new PriorityQueue<>();
this.timeUnit = timeUnit;
this.windowSize = windowSize;
setAdvance();
}
public TimeSeriesBasedStreamIterator(Stream<TimeSeriesType<T>> stream, AggregateFunction<T> function, long windowSize){
this(stream,function,TimeUnit.MILLISECONDS,windowSize);
}
@Override
public boolean hasNext() {
return hasNext;
}
@Override
public T next() {
if(!hasNext){
throw new NoSuchElementException("No More Elements...");
}
T prevObject = object;
setAdvance();
return prevObject;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Method not supported...");
}
private void setAdvance(){
hasNext=false;
while (iterator.hasNext()){
TimeSeriesType<T> type = iterator.next();
Timestamp ts = type.getTs();
if(lastTimestamp==null){
lastTimestamp = ts;
}else{
if(isGreaterThan(ts)){
lastTimestamp=ts;
object = aggregateFunction.apply(priorityQueue);
priorityQueue.clear();
hasNext=true;
}
}
priorityQueue.offer(type);
if(hasNext){
break;
}
}
//this is for the last iterator set...
if(!hasNext){
if(!priorityQueue.isEmpty()){
object = aggregateFunction.apply(priorityQueue);
priorityQueue.clear();
hasNext=true;
}
}
}
private boolean isGreaterThan(Timestamp ts){
long diff = ts.getTime() - lastTimestamp.getTime();
return (timeUnit.convert(diff,TimeUnit.MILLISECONDS)) >= windowSize;
}
}
import java.sql.Timestamp;
public class TimeSeriesType<T extends Comparable<T>> implements Comparable<TimeSeriesType<T>> {
private Timestamp ts;
private T object;
public TimeSeriesType(T object) {
this.object = object;
ts = new Timestamp(System.currentTimeMillis());
}
public T getObject() {
return object;
}
public Timestamp getTs() {
return ts;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TimeSeriesType<?> that = (TimeSeriesType<?>) o;
if (!ts.equals(that.ts)) return false;
if (object != null ? !object.equals(that.object) : that.object != null) return false;
return true;
}
@Override
public int hashCode() {
int result = ts.hashCode();
result = 31 * result + (object != null ? object.hashCode() : 0);
return result;
}
@Override
public int compareTo(TimeSeriesType<T> o) {
if(o==null){
return 1;
}else if(this==o){
return 0;
}else{
int compare = this.ts.compareTo(o.ts);
if(compare==0){
if(this.object==null && o.object==null){
return 0;
}else{
if(this.object==null){
return -1;
}else{
return this.object.compareTo(o.object);
}
}
}
return compare;
}
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("TimeSeriesType{");
sb.append("object=").append(object);
sb.append(", ts=").append(ts);
sb.append('}');
return sb.toString();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment