Created
August 21, 2016 17:53
-
-
Save jalp/db89f95378fe88c3ed9f1089be36ef5b to your computer and use it in GitHub Desktop.
Initial configuration for sns, sqs and kinesis services
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.schibsted.notification.queueconsumer.config; | |
import com.amazonaws.regions.Region; | |
import com.amazonaws.regions.Regions; | |
import com.amazonaws.services.kinesis.AmazonKinesisClient; | |
import com.amazonaws.services.sns.AmazonSNSClient; | |
import com.amazonaws.services.sqs.AmazonSQSAsync; | |
import com.amazonaws.services.sqs.AmazonSQSAsyncClient; | |
import com.amazonaws.services.sqs.buffered.AmazonSQSBufferedAsyncClient; | |
import com.amazonaws.services.sqs.buffered.QueueBufferConfig; | |
import com.netflix.appinfo.AmazonInfo; | |
import com.schibsted.notification.queueconsumer.QueueType; | |
import com.schibsted.notification.queueconsumer.handlers.DeleteSubscriptionMessageHandler; | |
import com.schibsted.notification.queueconsumer.handlers.NotificationMessageHandler; | |
import com.schibsted.notification.queueconsumer.handlers.NotificationMessageRouterHandler; | |
import com.schibsted.notification.queueconsumer.handlers.PostSubscriptionMessageHandler; | |
import com.schibsted.notification.queueconsumer.handlers.SendRequestMessageHandler; | |
import com.schibsted.notification.queueconsumer.pushservices.PushService; | |
import com.schibsted.notification.queueconsumer.pushservices.SNSPushService; | |
import com.schibsted.notification.queueconsumer.queueservices.QueueService; | |
import com.schibsted.notification.queueconsumer.queueservices.SQSQueueService; | |
import com.schibsted.notification.queueconsumer.readers.DeleteSubscriptionReader; | |
import com.schibsted.notification.queueconsumer.readers.NotificationAndroidReader; | |
import com.schibsted.notification.queueconsumer.readers.NotificationIOSReader; | |
import com.schibsted.notification.queueconsumer.readers.NotificationRouterReader; | |
import com.schibsted.notification.queueconsumer.readers.PostSubscriptionReader; | |
import com.schibsted.notification.queueconsumer.readers.SQSReader; | |
import com.schibsted.notification.queueconsumer.readers.SendRequestReader; | |
import com.schibsted.notification.queueconsumer.trackingservices.KinesisTrackingService; | |
import com.schibsted.notification.queueconsumer.trackingservices.TrackingService; | |
import org.apache.commons.configuration.ConfigurationException; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.cloud.aws.messaging.config.QueueMessageHandlerFactory; | |
import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory; | |
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate; | |
import org.springframework.cloud.aws.messaging.listener.QueueMessageHandler; | |
import org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer; | |
import org.springframework.cloud.client.loadbalancer.LoadBalanced; | |
import org.springframework.cloud.commons.util.InetUtils; | |
import org.springframework.cloud.commons.util.InetUtilsProperties; | |
import org.springframework.cloud.netflix.eureka.EurekaInstanceConfigBean; | |
import org.springframework.context.ApplicationContext; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.context.annotation.Lazy; | |
import org.springframework.context.annotation.Profile; | |
import org.springframework.core.task.AsyncTaskExecutor; | |
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; | |
import org.springframework.web.client.RestTemplate; | |
@Configuration | |
public class QueueConsumerConfig { | |
@Autowired | |
ApplicationContext appContext; | |
@Autowired | |
QueuesProperties queuesProperties; | |
@Value("${server.port}") | |
String port; | |
@Value("${cloud.region.static}") | |
private String region; | |
@Value("${notification.executor.core.pool}") | |
private int corePoolSize; | |
@Bean | |
@LoadBalanced | |
public RestTemplate restTemplate() { | |
return new RestTemplate(); | |
} | |
@Bean(name = "amazonKinesis", destroyMethod = "shutdown") | |
public AmazonKinesisClient amazonKinesisClient() { | |
AmazonKinesisClient client = new AmazonKinesisClient(); | |
client.setRegion(Region.getRegion(Regions.fromName(region))); | |
return client; | |
} | |
@Bean | |
@Profile("cloud") | |
public EurekaInstanceConfigBean eurekaInstanceConfig() { | |
EurekaInstanceConfigBean eurekaInstanceConfigBean = new EurekaInstanceConfigBean( | |
new InetUtils(new InetUtilsProperties())); | |
AmazonInfo amazonInfo = AmazonInfo.Builder.newBuilder().autoBuild("eureka"); | |
eurekaInstanceConfigBean.setDataCenterInfo(amazonInfo); | |
String baseUrl = "http://" + amazonInfo.getMetadata().get("local-hostname") + ":" + port; | |
eurekaInstanceConfigBean.setHealthCheckUrl(baseUrl + "/healthcheck"); | |
eurekaInstanceConfigBean.setStatusPageUrl(baseUrl + "/Status"); | |
return eurekaInstanceConfigBean; | |
} | |
@Bean | |
public QueueMessagingTemplate queueMessagingTemplate() { | |
return new QueueMessagingTemplate(amazonSQSClient()); | |
} | |
@Bean | |
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory() { | |
SimpleMessageListenerContainerFactory msgListenerContainerFactory = new SimpleMessageListenerContainerFactory(); | |
msgListenerContainerFactory.setAmazonSqs(amazonSQSClient()); | |
return msgListenerContainerFactory; | |
} | |
@Bean | |
public QueueMessageHandler queueMessageHandler() { | |
QueueMessageHandlerFactory queueMsgHandlerFactory = new QueueMessageHandlerFactory(); | |
queueMsgHandlerFactory.setAmazonSqs(amazonSQSClient()); | |
return queueMsgHandlerFactory.createQueueMessageHandler(); | |
} | |
@Bean | |
public AsyncTaskExecutor notificationTaskExecutor() { | |
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); | |
threadPoolTaskExecutor.setThreadNamePrefix("NotificationExecutor"); | |
threadPoolTaskExecutor.setCorePoolSize(corePoolSize); | |
// No use of a thread pool executor queue to avoid retaining message to long in memory | |
threadPoolTaskExecutor.setQueueCapacity(0); | |
return threadPoolTaskExecutor; | |
} | |
@Bean | |
public SimpleMessageListenerContainer simpleMessageListenerContainer() { | |
SimpleMessageListenerContainer msgListenerContainer = simpleMessageListenerContainerFactory() | |
.createSimpleMessageListenerContainer(); | |
msgListenerContainer.setMessageHandler(queueMessageHandler()); | |
msgListenerContainer.setTaskExecutor(notificationTaskExecutor()); | |
msgListenerContainer.setMaxNumberOfMessages(10); | |
return msgListenerContainer; | |
} | |
@Lazy | |
@Bean(name = "amazonSQS", destroyMethod = "shutdown") | |
public AmazonSQSAsync amazonSQSClient() { | |
AmazonSQSAsyncClient awsSQSAsyncClient; | |
awsSQSAsyncClient = new AmazonSQSAsyncClient(); | |
awsSQSAsyncClient.setRegion(Region.getRegion(Regions.fromName(region))); | |
QueueBufferConfig config = new QueueBufferConfig() | |
.withMaxInflightReceiveBatches(20) | |
.withMaxDoneReceiveBatches(15); | |
return new AmazonSQSBufferedAsyncClient(awsSQSAsyncClient, config); | |
} | |
@Lazy | |
@Bean(name = "amazonSNS") | |
public AmazonSNSClient amazonSNSClient() { | |
AmazonSNSClient client = new AmazonSNSClient(); | |
client.setRegion(Region.getRegion(Regions.fromName(region))); | |
return client; | |
} | |
@Bean | |
public SQSReader sqsReader(@Value("${notification.queueListener.active}") String queueType) | |
throws ConfigurationException { | |
QueueType qt = QueueType.fromString(queueType); | |
switch (qt) { | |
case SUBSCRIBE_REQUEST: | |
return new PostSubscriptionReader(appContext.getBean(PostSubscriptionMessageHandler.class)); | |
case UNSUBSCRIBE_REQUEST: | |
return new DeleteSubscriptionReader(appContext.getBean(DeleteSubscriptionMessageHandler.class)); | |
case SEND_REQUEST: | |
return new SendRequestReader(appContext.getBean(SendRequestMessageHandler.class)); | |
case SEND_NOTIFICATION_ANDROID: | |
return new NotificationAndroidReader( | |
appContext.getBean(NotificationMessageHandler.class)); | |
case SEND_NOTIFICATION_IOS: | |
return new NotificationIOSReader( | |
appContext.getBean(NotificationMessageHandler.class)); | |
case SEND_NOTIFICATION_ROUTER: | |
return new NotificationRouterReader( | |
appContext.getBean(NotificationMessageRouterHandler.class)); | |
default: | |
throw new ConfigurationException("Message type is not found"); | |
} | |
} | |
@Bean | |
public QueueService androidQueueService() { | |
return new SQSQueueService( | |
queueMessagingTemplate(), | |
queuesProperties.getNotificationAndroidQueue()); | |
} | |
@Bean | |
public QueueService iosQueueService() { | |
return new SQSQueueService( | |
queueMessagingTemplate(), | |
queuesProperties.getNotificationIOSQueue()); | |
} | |
@Bean | |
public QueueService routerQueueService() { | |
return new SQSQueueService( | |
queueMessagingTemplate(), | |
queuesProperties.getNotificationRouterQueue()); | |
} | |
@Bean | |
public PushService snsPushService(SNSPushService pushService) { | |
return pushService; | |
} | |
@Bean | |
public TrackingService trackingService(KinesisTrackingService trackingService) { | |
return trackingService; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment