Skip to content

Instantly share code, notes, and snippets.

@swarog
Last active June 14, 2019 20:41
Show Gist options
  • Save swarog/0abfd72a12ba6f109875094a9e8de925 to your computer and use it in GitHub Desktop.
Save swarog/0abfd72a12ba6f109875094a9e8de925 to your computer and use it in GitHub Desktop.
package ru.exceedscm.crawler.monitoring;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
import ru.exceedscm.crawler.task.TaskJobFactory;
import ru.exceedscm.crawler.task.dto.TaskDescription;
import ru.exceedscm.crawler.task.dto.TaskJob;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@Service
@RequiredArgsConstructor
public class QueueService {
private final TaskJobFactory jobFactory;
private UnicastProcessor<TaskJob> processor = UnicastProcessor.create();
private Map<String, TaskJob> taskJobs = new ConcurrentHashMap<>(); //TODO возможно стоит переписать на какую-то другую структуру данных что-бы гарантировать парвильнуб работу в многопоточном режиме
synchronized public TaskDescription add(TaskDescription task) {
if (getIfInProgress(task) != null) {
throw new TaskAlreadyExist();
}
TaskJob newJob = jobFactory.createTaskJob(task);
addJob(newJob);
processor.onNext(newJob);
return task;
}
private TaskJob getIfInProgress(TaskDescription newTask) {
return taskJobs.values().stream()
.filter(oldTask -> oldTask.getBaseUrl().equals(newTask.getUrl()))
.findFirst()
.orElse(null);
}
public Flux<TaskJob> getChannel() {
return this.processor;
}
private void addJob(TaskJob job) {
taskJobs.put(job.getTaskId(), job);
}
public void removeJob(String id) {
taskJobs.remove(id);
}
public Mono<TaskJob> getJob(@NonNull String id) {
return Mono.fromCallable(() -> taskJobs.get(id));
}
public Collection<TaskJob> getInactiveTasks(final int timeoutMinutes) {
return taskJobs.values().stream()
.filter(taskJob -> taskJob.isInactive(timeoutMinutes))
.collect(Collectors.toSet());
}
public Mono<TaskJob> getWrappedIfInProgress(final TaskDescription newTask) {
return Mono.from(getIfInProgress(newTask));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment