Skip to content

Instantly share code, notes, and snippets.

@zregvart
Created December 10, 2020 12:45
Show Gist options
  • Save zregvart/f7697cfd3c325afa6f941dfd32087bd0 to your computer and use it in GitHub Desktop.
Save zregvart/f7697cfd3c325afa6f941dfd32087bd0 to your computer and use it in GitHub Desktop.
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.salesforce.internal.streaming;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.CamelContext;
import org.apache.camel.component.salesforce.AuthenticationType;
import org.apache.camel.component.salesforce.SalesforceComponent;
import org.apache.camel.component.salesforce.SalesforceConsumer;
import org.apache.camel.component.salesforce.SalesforceEndpoint;
import org.apache.camel.component.salesforce.SalesforceEndpointConfig;
import org.apache.camel.component.salesforce.api.SalesforceException;
import org.apache.camel.impl.DefaultCamelContext;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.eclipse.jetty.io.RuntimeIOException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.http.trafficlistener.DoNothingWiremockNetworkTrafficListener;
import static org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelperIntegrationTest.MessageArgumentMatcher.messageForAccountCreationWithName;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.post;
@TestInstance(Lifecycle.PER_CLASS)
public class SubscriptionHelperIntegrationTest {
final CamelContext camel;
final SalesforceEndpointConfig config = new SalesforceEndpointConfig();
final SalesforceComponent salesforce;
final SubscriptionHelper subscription;
final WireMockServer wiremock;
private final Outage outage;
static class MessageArgumentMatcher implements ArgumentMatcher<Message> {
private final String name;
public MessageArgumentMatcher(final String name) {
this.name = name;
}
@Override
public boolean matches(final Message message) {
final Map<String, Object> data = message.getDataAsMap();
@SuppressWarnings("unchecked")
final Map<String, Object> event = (Map<String, Object>) data.get("event");
@SuppressWarnings("unchecked")
final Map<String, Object> sobject = (Map<String, Object>) data.get("sobject");
return "created".equals(event.get("type")) && name.equals(sobject.get("Name"));
}
static Message messageForAccountCreationWithName(final String name) {
return argThat(new MessageArgumentMatcher(name));
}
}
static final class Outage extends DoNothingWiremockNetworkTrafficListener {
private final ReentrantLock inOutage = new ReentrantLock();
public void create() {
inOutage.lock();
}
public void end() {
inOutage.unlock();
}
@Override
public void incoming(final Socket socket, final ByteBuffer bytes) {
if (inOutage.isLocked()) {
try {
// if we're in outage, close the socket immediately
socket.close();
} catch (final IOException e) {
throw new RuntimeIOException(e);
}
}
}
@Override
public void opened(final Socket socket) {
if (inOutage.isLocked()) {
try {
// if we're in outage, close the socket immediately
socket.close();
} catch (final IOException e) {
throw new RuntimeIOException(e);
}
}
}
@Override
public void outgoing(final Socket socket, final ByteBuffer bytes) {
try {
// if we're in outage, block
inOutage.lock();
} finally {
inOutage.unlock();
}
}
}
public SubscriptionHelperIntegrationTest() throws SalesforceException {
outage = new Outage();
final WireMockConfiguration configuration = WireMockConfiguration.wireMockConfig()
.dynamicPort()
.networkTrafficListener(outage);
wiremock = new WireMockServer(configuration);
wiremock.start();
System.out.println("Port for wireshark to filter: " + wiremock.port());
final String instanceUrl = "http://localhost:" + wiremock.port();
wiremock.givenThat(post("/services/oauth2/token").willReturn(aResponse()
.withBody("{\"instance_url\":\"" + instanceUrl + "\",\"access_token\":\"token\"}").withStatus(200)));
wiremock.givenThat(get("/services/oauth2/revoke?token=token").willReturn(aResponse().withStatus(200)));
wiremock.givenThat(post("/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/handshake")
.willReturn(aResponse().withBody("[\n"
+ " {\n"
+ " \"channel\": \"/meta/handshake\",\n"
+ " \"version\": \"1.0\",\n"
+ " \"minimumVersion\": \"1.0\",\n"
+ " \"supportedConnectionTypes\": [\"long-polling\"],\n"
+ " \"clientId\": \"client-id\",\n"
+ " \"successful\": true,\n"
+ " \"authSuccessful\": true,\n"
+ " \"advice\": { \"reconnect\": \"retry\" }\n"
+ " }\n"
+ "]").withStatus(200)));
wiremock.givenThat(post("/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/connect")
.willReturn(aResponse().withBody("[\n"
+ " {\n"
+ " \"channel\": \"/meta/connect\",\n"
+ " \"successful\": true,\n"
+ " \"error\": \"\",\n"
+ " \"clientId\": \"client-id\",\n"
+ " \"advice\": { \"reconnect\": \"retry\" },\n"
+ " \"id\": 2,\n"
+ " }\n"
+ "]").withStatus(200)));
wiremock.givenThat(post("/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/subscribe")
.willReturn(aResponse().withBody("[\n"
+ " {\n"
+ " \"channel\": \"/meta/subscribe\",\n"
+ " \"clientId\": \"client-id\",\n"
+ " \"subscription\": \"/topic/Account\",\n"
+ " \"successful\": true\n"
+ " },\n"
+ " {\n"
+ " \"channel\": \"/topic/Account\",\n"
+ " \"data\": {\"event\": {\"createdDate\":\"2020-12-10T12:00:00.000Z\",\"replayId\":1,\"type\":\"created\"},\"sobject\":{\"Id\":\"account-id-1\",\"Name\":\"Account Name\"},\n"
+ " },\n"
+ " \"id\": \"msg-1\"\n"
+ " }\n"
+ "]").withStatus(200)));
wiremock.givenThat(post("/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/disconnect")
.willReturn(aResponse().withBody("[\n"
+ " {\n"
+ " \"channel\": \"/meta/disconnect\",\n"
+ " \"clientId\": \"client-id\"\n"
+ " }\n"
+ "]").withStatus(200)));
camel = new DefaultCamelContext();
camel.start();
salesforce = new SalesforceComponent(camel);
salesforce.setLoginUrl(instanceUrl);
salesforce.setClientId("clientId");
salesforce.setClientSecret("clientSecret");
salesforce.setRefreshToken("refreshToken");
salesforce.setAuthenticationType(AuthenticationType.REFRESH_TOKEN);
salesforce.setConfig(config);
salesforce.start();
subscription = new SubscriptionHelper(salesforce);
}
@AfterAll
public void stop() {
subscription.stop();
salesforce.stop();
camel.stop();
wiremock.stop();
}
@Test
void shouldResubscribeOnConnectionFailures() throws InterruptedException {
// handshake and connect
subscription.start();
// subscribe
final SalesforceConsumer consumer = mock(SalesforceConsumer.class);
final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class);
when(consumer.getEndpoint()).thenReturn(endpoint);
when(endpoint.getTopicName()).thenReturn("Account");
when(endpoint.getConfiguration()).thenReturn(config);
when(endpoint.getComponent()).thenReturn(salesforce);
subscription.subscribe("Account", consumer);
// when subscribed we immediately expect one message, this confirms that
// WireMock is setup correctly
verify(consumer).getEndpoint();
verify(consumer, Mockito.timeout(100)).processMessage(any(ClientSessionChannel.class), messageForAccountCreationWithName("Account Name"));
outage.create();
// need a way to push messages here
outage.end();
verifyNoMoreInteractions(consumer);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment