Created
May 1, 2017 17:32
-
-
Save fastnsilver/20209f7bcdeaa5084b72a5e8857b66fd to your computer and use it in GitHub Desktop.
Working w/ Spring Cloud and SQS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.List; | |
import lombok.AccessLevel; | |
import lombok.AllArgsConstructor; | |
import lombok.Builder; | |
import lombok.Getter; | |
import lombok.NoArgsConstructor; | |
import org.json.JSONArray; | |
@Getter | |
@NoArgsConstructor(access = AccessLevel.PACKAGE) | |
@AllArgsConstructor(access = AccessLevel.PACKAGE) | |
@Builder | |
public class AwsMetricCollectionRequest implements HasResourceType { | |
private String accountNumber; | |
private String region; | |
private String iamRoleArn; | |
private String resourceType; | |
private List<String> tagNames; | |
private JSONArray resource; | |
public static AwsMetricCollectionRequest of(AwsSurveillanceRequest origin, JSONArray resource) { | |
return AwsMetricCollectionRequest | |
.builder() | |
.accountNumber(origin.getAccountNumber()) | |
.region(origin.getRegion()) | |
.iamRoleArn(origin.getIamRoleArn()) | |
.resourceType(origin.getResourceType()) | |
.tagNames(origin.getTagNames()) | |
.resource(resource) | |
.build(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate; | |
import org.springframework.messaging.Message; | |
import org.springframework.util.Assert; | |
@Slf4j | |
public class AwsMetricCollectorMessageSender<P extends HasResourceType> implements MessageSender<P> { | |
private final QueueMessagingTemplate queueMessagingTemplate; | |
private final String queueName; | |
public AwsMetricCollectorMessageSender(String queueName, QueueMessagingTemplate queueMessagingTemplate) { | |
this.queueMessagingTemplate = queueMessagingTemplate; | |
this.queueName = queueName; | |
} | |
@Override | |
public void send(Message<P> message) { | |
Assert.notNull(message.getPayload(), "Cannot send metric collection request with a null payload!"); | |
log.debug("Sent message! queueName={}, headers={}, payload={}", queueName, message.getHeaders().toString(), | |
message.getPayload().toString()); | |
queueMessagingTemplate.convertAndSend(queueName, message.getPayload(), message.getHeaders()); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public interface HasResourceType { | |
String getResourceType(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Override | |
@SqsListener(value = "${sqs.inbound.name}", deletionPolicy = SqsMessageDeletionPolicy.NEVER) | |
public void onMessage(@Payload String payload, Acknowledgment ack, @Headers MessageHeaders headers) { | |
Assert.notNull(payload, "Cannot process a null metric collection request!"); | |
Assert.notNull(headers, "Message headers cannot be null"); | |
Assert.notEmpty(headers, "Message headers cannot be empty"); | |
log.info("Received metric collection request! payload={}", payload); | |
try { | |
onMessageInternal(mapper.readValue(payload, AwsMetricCollectionRequest.class), ack, headers); | |
} catch (IOException e) { | |
log.warn("Corrupt payload! Was expecting an AwsMetricCollectionRequest."); | |
ack.acknowledge(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public interface MessageSender<P extends HasResourceType> { | |
void send(Message<P> message); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.amazonaws.services.sqs.AmazonSQSAsync; | |
import com.fasterxml.jackson.databind.ObjectMapper; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.List; | |
import org.springframework.beans.BeansException; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.beans.factory.config.BeanPostProcessor; | |
import org.springframework.cloud.aws.core.env.ResourceIdResolver; | |
import org.springframework.cloud.aws.messaging.config.annotation.EnableSqs; | |
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate; | |
import org.springframework.cloud.aws.messaging.listener.QueueMessageHandler; | |
import org.springframework.cloud.aws.messaging.listener.support.AcknowledgmentHandlerMethodArgumentResolver; | |
import org.springframework.cloud.aws.messaging.support.NotificationMessageArgumentResolver; | |
import org.springframework.cloud.aws.messaging.support.NotificationSubjectArgumentResolver; | |
import org.springframework.cloud.aws.messaging.support.converter.ObjectMessageConverter; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.context.annotation.Profile; | |
import org.springframework.messaging.converter.CompositeMessageConverter; | |
import org.springframework.messaging.converter.MappingJackson2MessageConverter; | |
import org.springframework.messaging.converter.MessageConverter; | |
import org.springframework.messaging.converter.SimpleMessageConverter; | |
import org.springframework.messaging.converter.StringMessageConverter; | |
import org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver; | |
import org.springframework.messaging.handler.annotation.support.HeadersMethodArgumentResolver; | |
import org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver; | |
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; | |
import org.springframework.validation.Errors; | |
import org.springframework.validation.Validator; | |
@Configuration | |
@EnableSqs | |
class SqsConfig { | |
@Autowired | |
private QueueMessagingTemplate queueMessagingTemplate; | |
@Bean | |
public QueueMessagingTemplate queueMessagingTemplate( | |
AmazonSQSAsync amazonSqs, ResourceIdResolver resourceIdResolver, ObjectMapper objectMapper) { | |
return new QueueMessagingTemplate( | |
amazonSqs, resourceIdResolver, createJacksonConverter(objectMapper)); | |
} | |
@Bean("inboundMessageSender") | |
public MessageSender<AwsMetricCollectionRequest> outboundMessageSender( | |
@Value("${sqs.inbound.name}") String queueName) { | |
return new AwsMetricCollectorMessageSender<>(queueName, queueMessagingTemplate); | |
} | |
// @see https://github.com/spring-cloud/spring-cloud-aws/issues/193 | |
@Bean | |
public QueueMessageHandlerPostProcessor queueMessageHandlerPostProcessor(ObjectMapper objectMapper) { | |
return new QueueMessageHandlerPostProcessor(getMethodArgumentResolvers(objectMapper)); | |
} | |
private List<HandlerMethodArgumentResolver> getMethodArgumentResolvers(ObjectMapper objectMapper) { | |
MessageConverter jacksonMessageConverter = createJacksonConverter(objectMapper); | |
MessageConverter compositeMessageConverter = createCompositeConverter(objectMapper); | |
return | |
Arrays.asList( | |
new HeaderMethodArgumentResolver(null, null), | |
new HeadersMethodArgumentResolver(), | |
new NotificationSubjectArgumentResolver(), | |
new AcknowledgmentHandlerMethodArgumentResolver("Acknowledgment"), | |
new NotificationMessageArgumentResolver(compositeMessageConverter), | |
new PayloadArgumentResolver(jacksonMessageConverter, new NoOpValidator())); | |
} | |
private MessageConverter createJacksonConverter(ObjectMapper objectMapper) { | |
MappingJackson2MessageConverter jacksonMessageConverter = new MappingJackson2MessageConverter(); | |
jacksonMessageConverter.setObjectMapper(objectMapper); | |
jacksonMessageConverter.setSerializedPayloadClass(String.class); | |
jacksonMessageConverter.setStrictContentTypeMatch(true); | |
return jacksonMessageConverter; | |
} | |
private MessageConverter createCompositeConverter(ObjectMapper objectMapper) { | |
List<MessageConverter> payloadArgumentConverters = new ArrayList<>(); | |
payloadArgumentConverters.add(createJacksonConverter(objectMapper)); | |
StringMessageConverter stringMessageConverter = new StringMessageConverter(); | |
stringMessageConverter.setSerializedPayloadClass(String.class); | |
payloadArgumentConverters.add(stringMessageConverter); | |
ObjectMessageConverter objectMessageConverter = new ObjectMessageConverter(); | |
objectMessageConverter.setStrictContentTypeMatch(true); | |
payloadArgumentConverters.add(objectMessageConverter); | |
payloadArgumentConverters.add(new SimpleMessageConverter()); | |
return new CompositeMessageConverter(payloadArgumentConverters); | |
} | |
private static final class NoOpValidator implements Validator { | |
@Override | |
public boolean supports(Class<?> clazz) { | |
return false; | |
} | |
@Override | |
public void validate(Object target, Errors errors) { | |
} | |
} | |
private static final class QueueMessageHandlerPostProcessor implements BeanPostProcessor { | |
private List<HandlerMethodArgumentResolver> argumentResolvers; | |
public QueueMessageHandlerPostProcessor(List<HandlerMethodArgumentResolver> argumentResolvers) { | |
this.argumentResolvers = argumentResolvers; | |
} | |
@Override | |
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { | |
return bean; | |
} | |
@Override | |
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { | |
if (bean instanceof QueueMessageHandler) { | |
((QueueMessageHandler) bean).setArgumentResolvers(argumentResolvers); | |
} | |
return bean; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment