Skip to content

Instantly share code, notes, and snippets.

@shvyrev
Created September 27, 2024 11:22
Show Gist options
  • Save shvyrev/20c6ccfa9f858492dd0bf347de066cb9 to your computer and use it in GitHub Desktop.
Save shvyrev/20c6ccfa9f858492dd0bf347de066cb9 to your computer and use it in GitHub Desktop.
package org.bft.bftresend.business.zags.route;
import lombok.RequiredArgsConstructor;
import org.apache.camel.DynamicRouter;
import org.apache.camel.ExchangeProperties;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.bft.bftresend.business.zags.processors.ZagsGetAllMessagesProcessor;
import org.bft.bftresend.business.zags.processors.ZagsLoadFilesForAnalyzerProcessor;
import org.bft.bftresend.business.zags.processors.ZagsReadFileByFileUUIDProcessor;
import org.bft.bftresend.business.zags.processors.ZagsReadFileByMessageIdProcessor;
import org.bft.bftresend.business.zags.processors.ZagsReadFileForAnalyzeProcessor;
import org.bft.bftresend.configuration.ZagsConfig;
import org.bft.bftresend.jobs.ResendJob;
import org.springframework.stereotype.Component;
import oshi.util.Constants;
import java.util.Map;
import static org.apache.commons.lang3.StringUtils.EMPTY;
import static org.bft.bftresend.route.Routes.ZAGS_BY_A3;
import static org.bft.bftresend.route.Routes.ZAGS_BY_FILE_ID;
import static org.bft.bftresend.route.Routes.ZAGS_BY_MESSAGE_ID;
import static org.bft.bftresend.route.Routes.ZAGS_JOB;
import static org.bft.bftresend.route.Routes.ZAGS_TO_RABBIT;
import static org.bft.bftresend.utils.Const.CAMEL_FILE_NAME;
import static org.bft.bftresend.utils.Const.ZAGS_RESENDS_BY_FILE_ID_JOB;
import static org.bft.bftresend.utils.Const.ZAGS_RESENDS_BY_MESSAGE_ID_JOB;
import static org.bft.bftresend.utils.Const.ZAGS_RESND_BY_A_3_JOB;
import static org.bft.bftresend.utils.FileUtils.getProofreaderLogsDir;
import static org.bft.bftresend.utils.FileUtils.idempotentFileRepository;
@Component
@RequiredArgsConstructor
public class ZagsJobRouter extends RouteBuilder {
private final ZagsReadFileByMessageIdProcessor zagsReadFileByMessageIdProcessor;
private final ZagsReadFileByFileUUIDProcessor zagsReadFileByFileUUIDProcessor;
private final ZagsReadFileForAnalyzeProcessor zagsReadFileForAnalyzeProcessor;
private final ZagsLoadFilesForAnalyzerProcessor zagsLoadFilesForAnalyzerProcessor;
private final ZagsGetAllMessagesProcessor zagsGetAllMessagesProcessor;
private final ZagsConfig config;
@DynamicRouter
public String route(@ExchangeProperties Map<String, Object> properties) {
String jobName = properties.getOrDefault("job", "").toString();
switch (jobName) {
case ZAGS_RESENDS_BY_MESSAGE_ID_JOB:
return ZAGS_BY_MESSAGE_ID.getUri();
case ZAGS_RESENDS_BY_FILE_ID_JOB:
return ZAGS_BY_FILE_ID.getUri();
case ZAGS_RESND_BY_A_3_JOB:
return ZAGS_BY_A3.getUri();
// TODO to Constants
case "GetAllMessagesJob":
return "direct:get-all-messages-job";
default:
return EMPTY;
}
}
@Override
public void configure() throws Exception {
onException(Exception.class)
.handled(true)
.log(LoggingLevel.INFO, "Error: ${exception.message}");
onCompletion()
.log(LoggingLevel.INFO, "Completed: ${exchangeId}");
from(ZAGS_JOB.getUri())
.routeId(ZAGS_JOB.getId())
.process(e -> ((ResendJob) e.getIn().getBody()).enrich(e))
.bean(ZagsJobRouter.class, "route")
.end();
from("direct:get-all-messages-job")
.routeId("get-all-messages-job")
.pollEnrich()
.simple(getProofreaderLogsDir(config.getLogDir()))
.idempotentConsumer(header(CAMEL_FILE_NAME), idempotentFileRepository())
.process(zagsGetAllMessagesProcessor);
from(ZAGS_BY_MESSAGE_ID.getUri())
.routeId(ZAGS_BY_MESSAGE_ID.getId())
.pollEnrich()
.simple(getProofreaderLogsDir(config.getLogDir()))
.idempotentConsumer(header(CAMEL_FILE_NAME), idempotentFileRepository())
.process(zagsReadFileByMessageIdProcessor)
.filter(simple("${body.size} != 0"))
.split().body().threads(config.getPoolSize())
.to(ZAGS_TO_RABBIT.getUri());
from(ZAGS_BY_FILE_ID.getUri())
.routeId(ZAGS_BY_FILE_ID.getId())
.pollEnrich()
.simple(getProofreaderLogsDir(config.getLogDir()))
.idempotentConsumer(header(CAMEL_FILE_NAME), idempotentFileRepository())
.process(zagsReadFileByFileUUIDProcessor)
.filter(simple("${body.size} != 0"))
.split().body().threads(config.getPoolSize())
.to(ZAGS_TO_RABBIT.getUri());
from(ZAGS_BY_A3.getUri())
.routeId(ZAGS_BY_A3.getId())
.pollEnrich()
.simple(getProofreaderLogsDir(config.getLogDir()))
.idempotentConsumer(header(CAMEL_FILE_NAME), idempotentFileRepository())
.process(zagsReadFileForAnalyzeProcessor)
.filter(simple("${body.size} != 0"))
.split().body().threads(config.getPoolSize())
.process(zagsLoadFilesForAnalyzerProcessor);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment