Skip to content

Instantly share code, notes, and snippets.

@shvyrev
Created September 27, 2024 14:15
Show Gist options
  • Save shvyrev/be2a69e6df19a8b4e51b4d618045dd33 to your computer and use it in GitHub Desktop.
Save shvyrev/be2a69e6df19a8b4e51b4d618045dd33 to your computer and use it in GitHub Desktop.
package org.bft.bftresend.business.zags.route;
import lombok.RequiredArgsConstructor;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.bft.bftresend.business.zags.processors.ZagsGetAllMessagesProcessor;
import org.bft.bftresend.business.zags.processors.ZagsLoadFilesForAnalyzerProcessor;
import org.bft.bftresend.business.zags.processors.ZagsReadFileByFileUUIDProcessor;
import org.bft.bftresend.business.zags.processors.ZagsReadFileByMessageIdProcessor;
import org.bft.bftresend.business.zags.processors.ZagsReadFileForAnalyzeProcessor;
import org.bft.bftresend.configuration.ZagsConfig;
import org.bft.bftresend.jobs.ResendJob;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Collectors;
import static org.bft.bftresend.route.Routes.ZAGS_BY_A3;
import static org.bft.bftresend.route.Routes.ZAGS_BY_FILE_ID;
import static org.bft.bftresend.route.Routes.ZAGS_BY_MESSAGE_ID;
import static org.bft.bftresend.route.Routes.ZAGS_JOB;
import static org.bft.bftresend.route.Routes.ZAGS_TO_RABBIT;
import static org.bft.bftresend.utils.Const.ZAGS_RESENDS_BY_FILE_ID_JOB;
import static org.bft.bftresend.utils.Const.ZAGS_RESENDS_BY_MESSAGE_ID_JOB;
import static org.bft.bftresend.utils.Const.ZAGS_RESND_BY_A_3_JOB;
@Component
@RequiredArgsConstructor
public class ZagsJobRouter extends RouteBuilder {
private final ZagsReadFileByMessageIdProcessor zagsReadFileByMessageIdProcessor;
private final ZagsReadFileByFileUUIDProcessor zagsReadFileByFileUUIDProcessor;
private final ZagsReadFileForAnalyzeProcessor zagsReadFileForAnalyzeProcessor;
private final ZagsLoadFilesForAnalyzerProcessor zagsLoadFilesForAnalyzerProcessor;
private final ZagsGetAllMessagesProcessor zagsGetAllMessagesProcessor;
private final ZagsConfig config;
@Override
public void configure() throws Exception {
onException(Exception.class)
.handled(true)
.log(LoggingLevel.INFO, "Error: ${exception.message}");
onCompletion()
.log(LoggingLevel.INFO, "Completed: ${exchangeId}");
from(ZAGS_JOB.getUri())
.routeId(ZAGS_JOB.getId())
.process(e -> ((ResendJob) e.getIn().getBody()).enrich(e))
.process(this::nextRouteByJobNameNew)
// .process(e -> e.getIn().setHeader("nextRouteUri", getRouteByJobName(e.getProperty("job").toString())))
.toD("${header.nextRouteUri}")
.end();
from("direct:get-all-messages-job")
.routeId("get-all-messages-job")
.process(this::listLogFiles)
.split().body().threads(config.getPoolSize())
.process(zagsGetAllMessagesProcessor);
from(ZAGS_BY_MESSAGE_ID.getUri())
.routeId(ZAGS_BY_MESSAGE_ID.getId())
.process(this::listLogFiles)
.split().body().threads(config.getPoolSize())
.process(zagsReadFileByMessageIdProcessor)
.split().body().threads(config.getPoolSize())
.to(ZAGS_TO_RABBIT.getUri());
from(ZAGS_BY_FILE_ID.getUri())
.routeId(ZAGS_BY_FILE_ID.getId())
.process(this::listLogFiles)
.split().body().threads(config.getPoolSize())
.process(zagsReadFileByFileUUIDProcessor)
.split().body().threads(config.getPoolSize())
.to(ZAGS_TO_RABBIT.getUri());
from(ZAGS_BY_A3.getUri())
.routeId(ZAGS_BY_A3.getId())
.process(this::listLogFiles)
.split().body().threads(config.getPoolSize())
.process(zagsReadFileForAnalyzeProcessor)
.split().body().threads(config.getPoolSize())
.process(zagsLoadFilesForAnalyzerProcessor)
;
}
private void nextRouteByJobNameNew(Exchange exchange) {
exchange.getIn().setHeader("nextRouteUri", getRouteByJobName(exchange.getProperty("job").toString()));
}
private void listLogFiles(Exchange exchange) {
try (var stream = Files.list(Paths.get(config.getLogDir()))
.map(Path::toAbsolutePath)) {
exchange.getIn().setBody(stream.collect(Collectors.toList()));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private String getRouteByJobName(String jobName) {
String routeUri = "stub:unknown";
switch (jobName) {
case ZAGS_RESENDS_BY_MESSAGE_ID_JOB:
routeUri = ZAGS_BY_MESSAGE_ID.getUri();
break;
case ZAGS_RESENDS_BY_FILE_ID_JOB:
routeUri = ZAGS_BY_FILE_ID.getUri();
break;
case ZAGS_RESND_BY_A_3_JOB:
routeUri = ZAGS_BY_A3.getUri();
break;
// TODO to Constants
case "GetAllMessagesJob":
routeUri = "direct:get-all-messages-job";
break;
}
return routeUri;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment