Created
July 12, 2018 10:18
-
-
Save KasunDon/9d5eb911221e10b669c910fb0ff7e01a to your computer and use it in GitHub Desktop.
EmbededPubSub for Java Integration Test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.kasundon.pubsub; | |
import com.google.api.gax.core.CredentialsProvider; | |
import com.google.api.gax.core.NoCredentialsProvider; | |
import com.google.api.gax.grpc.GrpcTransportChannel; | |
import com.google.api.gax.rpc.AlreadyExistsException; | |
import com.google.api.gax.rpc.FixedTransportChannelProvider; | |
import com.google.api.gax.rpc.TransportChannelProvider; | |
import com.google.cloud.pubsub.v1.MessageReceiver; | |
import com.google.cloud.pubsub.v1.Publisher; | |
import com.google.cloud.pubsub.v1.Subscriber; | |
import com.google.cloud.pubsub.v1.SubscriptionAdminClient; | |
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; | |
import com.google.cloud.pubsub.v1.TopicAdminClient; | |
import com.google.cloud.pubsub.v1.TopicAdminSettings; | |
import com.google.common.util.concurrent.MoreExecutors; | |
import com.google.protobuf.ByteString; | |
import com.google.pubsub.v1.ProjectSubscriptionName; | |
import com.google.pubsub.v1.ProjectTopicName; | |
import com.google.pubsub.v1.PubsubMessage; | |
import com.google.pubsub.v1.PushConfig; | |
import com.google.pubsub.v1.Subscription; | |
import com.google.pubsub.v1.Topic; | |
import io.grpc.ManagedChannel; | |
import io.grpc.ManagedChannelBuilder; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.testcontainers.containers.GenericContainer; | |
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; | |
import java.io.IOException; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.TimeUnit; | |
import java.util.function.Consumer; | |
public class EmbededPubSub { | |
private final static Logger LOGGER = LoggerFactory.getLogger(EmbededPubSub.class); | |
private final static int PUBSUB_PORT = 5189; | |
private final static String PROJECT_ID = "my-project-id"; | |
private final static String TOPIC_ID = "my-test-topic"; | |
private final static String SUBSCRIPTION_ID = "my-subscription-id"; | |
public static final GenericContainer PUBSUB_CONTAINER = | |
new GenericContainer("google/cloud-sdk:latest") | |
.withExposedPorts(PUBSUB_PORT) | |
.withCommand( | |
"/bin/sh", | |
"-c", | |
String.format( | |
"gcloud beta emulators pubsub start --project %s --host-port=0.0.0.0:%d", | |
PROJECT_ID, | |
PUBSUB_PORT | |
) | |
) | |
.waitingFor(new LogMessageWaitStrategy().withRegEx("(?s).*started.*$")); | |
private String serviceHostname; | |
private ManagedChannel channel; | |
private TransportChannelProvider channelProvider; | |
private CredentialsProvider credentialsProvider = NoCredentialsProvider.create(); | |
public void start() { | |
PUBSUB_CONTAINER.start(); | |
serviceHostname = String.format("127.0.0.1:%d", PUBSUB_CONTAINER.getMappedPort(PUBSUB_PORT)); | |
channel = ManagedChannelBuilder | |
.forTarget(serviceHostname) | |
.usePlaintext(true) | |
.build(); | |
channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)); | |
createTopicAndSubscription(TOPIC_ID); | |
LOGGER.debug(" @@ PubSub service successfully started!"); | |
} | |
public void stop() { | |
try { | |
channel.awaitTermination(2000, TimeUnit.MILLISECONDS); | |
} catch (InterruptedException e) { | |
//noop | |
} | |
PUBSUB_CONTAINER.stop(); | |
} | |
public void createTopicAndSubscription( | |
String topicName | |
) { | |
ProjectTopicName projectTopicName = ProjectTopicName.of(PROJECT_ID, topicName); | |
ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of(PROJECT_ID, SUBSCRIPTION_ID); | |
SubscriptionAdminSettings subscriptionAdminSettings; | |
try { | |
TopicAdminClient topicAdminClient = TopicAdminClient.create( | |
TopicAdminSettings.newBuilder() | |
.setTransportChannelProvider(channelProvider) | |
.setCredentialsProvider(credentialsProvider) | |
.build()); | |
Topic topic = topicAdminClient.createTopic(projectTopicName); | |
LOGGER.debug(" @@ Topic created : {} ", topic.getName()); | |
subscriptionAdminSettings = SubscriptionAdminSettings | |
.newBuilder() | |
.setTransportChannelProvider(channelProvider) | |
.setCredentialsProvider(credentialsProvider) | |
.build(); | |
SubscriptionAdminClient subscriptionAdminClient = | |
SubscriptionAdminClient | |
.create(subscriptionAdminSettings); | |
Subscription subscription = | |
subscriptionAdminClient | |
.createSubscription(projectSubscriptionName, projectTopicName, PushConfig.getDefaultInstance(), 0); | |
LOGGER.debug(" @@ Subscription created : {} ", subscription.getName()); | |
} catch (IOException | AlreadyExistsException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
public void publish( | |
String message | |
) { | |
try { | |
ProjectTopicName topic = ProjectTopicName.of(PROJECT_ID, TOPIC_ID); | |
Publisher publisher = | |
Publisher.newBuilder(topic) | |
.setChannelProvider(channelProvider) | |
.setCredentialsProvider(credentialsProvider) | |
.build(); | |
ByteString data = ByteString.copyFromUtf8(message); | |
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); | |
publisher.publish(pubsubMessage); | |
publisher.shutdown(); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
public void subscriber( | |
Consumer<String> consumer, | |
long timeoutMs | |
) { | |
ExecutorService executorService = Executors.newSingleThreadExecutor(); | |
executorService.execute(() -> { | |
ProjectSubscriptionName subscription = ProjectSubscriptionName.of(PROJECT_ID, SUBSCRIPTION_ID); | |
MessageReceiver receiver = (message, ackReplyConsumer) -> { | |
consumer.accept(message.getData().toStringUtf8()); | |
//ackReplyConsumer.ack(); | |
}; | |
Subscriber subscriber = null; | |
try { | |
subscriber = | |
Subscriber | |
.newBuilder(subscription, receiver) | |
.setChannelProvider(channelProvider) | |
.setCredentialsProvider(credentialsProvider) | |
.build(); | |
subscriber.addListener(new Subscriber.Listener() { | |
@Override | |
public void failed(Subscriber.State from, Throwable failure) { | |
LOGGER.error(" @@ failure detected : ", failure); | |
} | |
}, MoreExecutors.directExecutor()); | |
subscriber.startAsync().awaitRunning(); | |
Thread.sleep(timeoutMs); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} finally { | |
if (subscriber != null) { | |
subscriber.stopAsync().awaitTerminated(); | |
} | |
} | |
}); | |
} | |
public String getServiceHostname() { | |
if (serviceHostname == null) { | |
throw new IllegalStateException("Embedded PubSub Service not started yet."); | |
} | |
return String.format("http://%s", serviceHostname); | |
} | |
public String getFullyQualifiedTopicName() { | |
return String.format("projects/%s/topics/%s", PROJECT_ID, TOPIC_ID); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment