Last active
June 14, 2019 20:41
-
-
Save swarog/0abfd72a12ba6f109875094a9e8de925 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ru.exceedscm.crawler.monitoring; | |
import lombok.NonNull; | |
import lombok.RequiredArgsConstructor; | |
import org.springframework.stereotype.Service; | |
import reactor.core.publisher.Flux; | |
import reactor.core.publisher.Mono; | |
import reactor.core.publisher.UnicastProcessor; | |
import ru.exceedscm.crawler.task.TaskJobFactory; | |
import ru.exceedscm.crawler.task.dto.TaskDescription; | |
import ru.exceedscm.crawler.task.dto.TaskJob; | |
import java.util.Collection; | |
import java.util.Map; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.stream.Collectors; | |
@Service | |
@RequiredArgsConstructor | |
public class QueueService { | |
private final TaskJobFactory jobFactory; | |
private UnicastProcessor<TaskJob> processor = UnicastProcessor.create(); | |
private Map<String, TaskJob> taskJobs = new ConcurrentHashMap<>(); //TODO возможно стоит переписать на какую-то другую структуру данных что-бы гарантировать парвильнуб работу в многопоточном режиме | |
synchronized public TaskDescription add(TaskDescription task) { | |
if (getIfInProgress(task) != null) { | |
throw new TaskAlreadyExist(); | |
} | |
TaskJob newJob = jobFactory.createTaskJob(task); | |
addJob(newJob); | |
processor.onNext(newJob); | |
return task; | |
} | |
private TaskJob getIfInProgress(TaskDescription newTask) { | |
return taskJobs.values().stream() | |
.filter(oldTask -> oldTask.getBaseUrl().equals(newTask.getUrl())) | |
.findFirst() | |
.orElse(null); | |
} | |
public Flux<TaskJob> getChannel() { | |
return this.processor; | |
} | |
private void addJob(TaskJob job) { | |
taskJobs.put(job.getTaskId(), job); | |
} | |
public void removeJob(String id) { | |
taskJobs.remove(id); | |
} | |
public Mono<TaskJob> getJob(@NonNull String id) { | |
return Mono.fromCallable(() -> taskJobs.get(id)); | |
} | |
public Collection<TaskJob> getInactiveTasks(final int timeoutMinutes) { | |
return taskJobs.values().stream() | |
.filter(taskJob -> taskJob.isInactive(timeoutMinutes)) | |
.collect(Collectors.toSet()); | |
} | |
public Mono<TaskJob> getWrappedIfInProgress(final TaskDescription newTask) { | |
return Mono.from(getIfInProgress(newTask)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment