Last active
November 23, 2019 16:19
-
-
Save daschl/e32e05e6abc31e450f67c23fe30c3826 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.testcontainers.couchbase; | |
import com.couchbase.client.java.Bucket; | |
import com.couchbase.client.java.Cluster; | |
import com.couchbase.client.java.CouchbaseCluster; | |
import com.couchbase.client.java.document.JsonDocument; | |
import com.couchbase.client.java.document.json.JsonObject; | |
import com.couchbase.client.java.env.CouchbaseEnvironment; | |
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment; | |
import org.junit.ClassRule; | |
import org.junit.Test; | |
public class BaseCouchbaseContainerTest { | |
@ClassRule | |
public static CouchbaseContainer container = new CouchbaseContainer(); | |
@Test | |
public void shouldInsertAndGet() { | |
CouchbaseEnvironment env = DefaultCouchbaseEnvironment.builder() | |
.bootstrapCarrierDirectPort(container.getKvPort()) | |
.bootstrapHttpDirectPort(container.getHttpPort()) | |
.build(); | |
Cluster cluster = CouchbaseCluster.create(env); | |
cluster.authenticate(container.getUsername(), container.getPassword()); | |
Bucket bucket = cluster.openBucket("foobar"); | |
bucket.insert(JsonDocument.create("foo", JsonObject.empty())); | |
System.err.println(bucket.get("foo")); | |
cluster.disconnect(); | |
env.shutdown(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.testcontainers.couchbase; | |
import java.util.EnumSet; | |
import java.util.Set; | |
public class ClusterSpec { | |
private final Set<Service> enabledServices; | |
private final int memoryQuota; | |
public Set<Service> getEnabledServices() { | |
return enabledServices; | |
} | |
public int getMemoryQuota() { | |
return memoryQuota; | |
} | |
private ClusterSpec(Builder builder) { | |
this.enabledServices = builder.enabledServices; | |
this.memoryQuota = builder.memoryQuota; | |
} | |
public static ClusterSpec.Builder builder() { | |
return new Builder(); | |
} | |
public static ClusterSpec fromDefaults() { | |
return builder().build(); | |
} | |
public static class Builder { | |
private Set<Service> enabledServices = EnumSet.of(Service.KV); | |
private int memoryQuota = 300; | |
public Builder enabledServices(Set<Service> enabled) { | |
this.enabledServices = enabled; | |
return this; | |
} | |
public Builder memoryQuota(int memoryQuota) { | |
this.memoryQuota = memoryQuota; | |
return this; | |
} | |
public ClusterSpec build() { | |
return new ClusterSpec(this); | |
} | |
} | |
enum Service { | |
KV("kv"); | |
private String identifier; | |
Service(String identifier) { | |
this.identifier = identifier; | |
} | |
public String identifier() { | |
return identifier; | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright (c) 2019 Couchbase, Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.testcontainers.couchbase; | |
import org.testcontainers.containers.ContainerLaunchException; | |
import org.testcontainers.containers.GenericContainer; | |
import org.testcontainers.containers.Network; | |
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; | |
import org.testcontainers.images.builder.Transferable; | |
import org.testcontainers.shaded.com.fasterxml.jackson.databind.JsonNode; | |
import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper; | |
import org.testcontainers.shaded.org.apache.commons.io.IOUtils; | |
import org.testcontainers.utility.ThrowingFunction; | |
import java.io.DataOutputStream; | |
import java.io.IOException; | |
import java.net.HttpURLConnection; | |
import java.net.URL; | |
import java.net.URLEncoder; | |
import java.nio.charset.StandardCharsets; | |
import java.util.Arrays; | |
import java.util.Base64; | |
import java.util.Optional; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.stream.Collectors; | |
import java.util.stream.Stream; | |
import static java.net.HttpURLConnection.HTTP_OK; | |
public class CouchbaseContainer<SELF extends CouchbaseContainer<SELF>> extends GenericContainer<SELF> { | |
public static final ObjectMapper MAPPER = new ObjectMapper(); | |
private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger(0); | |
private static final String STATIC_CONFIG = "/opt/couchbase/etc/couchbase/static_config"; | |
private static final String CAPI_CONFIG = "/opt/couchbase/etc/couchdb/default.d/capi.ini"; | |
private static final String CACHED_CONFIG = "/opt/couchbase/var/lib/couchbase/config/config.dat"; | |
private static final String IMAGE = "couchbase/server"; | |
private static final String VERSION = "enterprise-6.0.3"; | |
private int containerPortOffset; | |
private ClusterSpec clusterSpec = ClusterSpec.fromDefaults(); | |
public CouchbaseContainer() { | |
this(IMAGE + ":" + VERSION); | |
} | |
public CouchbaseContainer(final String imageName) { | |
super(imageName); | |
containerPortOffset = INSTANCE_COUNT.getAndIncrement(); | |
withNetwork(Network.SHARED); | |
Arrays.stream(Ports.values()).map(p -> p.getOriginalPort(containerPortOffset)).forEach(this::addExposedPort); | |
setWaitStrategy(new HttpWaitStrategy() | |
.forPort(Ports.REST.getOriginalPort(containerPortOffset)) | |
.forPath("/pools") | |
.forStatusCode(200) | |
); | |
} | |
public int getKvPort() { | |
return getMappedPort(Ports.MEMCACHED.getOriginalPort(containerPortOffset)); | |
} | |
public int getHttpPort() { | |
return getMappedPort(Ports.REST.getOriginalPort(containerPortOffset)); | |
} | |
@Override | |
protected void doStart() { | |
super.doStart(); | |
try { | |
initializeCluster(); | |
createBuckets(); | |
} catch (Exception ex) { | |
throw new ContainerLaunchException("Could not launch couchbase container", ex); | |
} | |
} | |
String urlBase; | |
String clusterUsername = "Administrator"; | |
String clusterPassword = "password"; | |
public String getUsername() { | |
return clusterUsername; | |
} | |
public String getPassword() { | |
return clusterPassword; | |
} | |
private void initializeCluster() throws Exception { | |
urlBase = String.format("http://%s:%s", getContainerIpAddress(), getMappedPort(Ports.REST.getOriginalPort(containerPortOffset))); | |
String poolURL = "/pools/default"; | |
String poolPayload = "memoryQuota=" | |
+ URLEncoder.encode(Integer.toString(clusterSpec.getMemoryQuota()), "UTF-8"); | |
String setupServicesURL = "/node/controller/setupServices"; | |
StringBuilder servicePayloadBuilder = new StringBuilder(); | |
for (ClusterSpec.Service service : clusterSpec.getEnabledServices()) { | |
servicePayloadBuilder.append(service.identifier()).append(","); | |
} | |
String setupServiceContent = "services=" + URLEncoder.encode(servicePayloadBuilder.toString(), "UTF-8"); | |
String webSettingsURL = "/settings/web"; | |
String webSettingsContent = "username=" + URLEncoder.encode(clusterUsername, "UTF-8") + "&password=" + URLEncoder.encode(clusterPassword, "UTF-8") + "&port=8091"; | |
callCouchbaseRestAPI(poolURL, poolPayload); | |
callCouchbaseRestAPI(setupServicesURL, setupServiceContent); | |
callCouchbaseRestAPI(webSettingsURL, webSettingsContent); | |
createNodeWaitStrategy().waitUntilReady(this); | |
} | |
private void createBuckets() { | |
} | |
private HttpWaitStrategy createNodeWaitStrategy() { | |
return new HttpWaitStrategy() | |
.forPath("/pools/default/") | |
.withBasicCredentials(clusterUsername, clusterPassword) | |
.forPort(Ports.REST.getOriginalPort(containerPortOffset)) | |
.forStatusCode(HTTP_OK) | |
.forResponsePredicate(response -> { | |
try { | |
return Optional.of(MAPPER.readTree(response)) | |
.map(n -> n.at("/nodes/0/status")) | |
.map(JsonNode::asText) | |
.map("healthy"::equals) | |
.orElse(false); | |
} catch (IOException e) { | |
//logger().error("Unable to parse response {}", response); | |
return false; | |
} | |
}); | |
} | |
public void callCouchbaseRestAPI(String url, String payload) throws IOException { | |
String fullUrl = urlBase + url; | |
HttpURLConnection httpConnection = (HttpURLConnection) ((new URL(fullUrl).openConnection())); | |
httpConnection.setDoOutput(true); | |
httpConnection.setRequestMethod("POST"); | |
httpConnection.setRequestProperty("Content-Type", | |
"application/x-www-form-urlencoded"); | |
String encoded = Base64.getEncoder().encodeToString((clusterUsername + ":" + clusterPassword).getBytes(StandardCharsets.UTF_8)); | |
httpConnection.setRequestProperty("Authorization", "Basic " + encoded); | |
DataOutputStream out = new DataOutputStream(httpConnection.getOutputStream()); | |
out.writeBytes(payload); | |
out.flush(); | |
httpConnection.getResponseCode(); | |
} | |
@Override | |
protected void containerIsCreated(String containerId) { | |
patchConfig(STATIC_CONFIG, this::addMappedPorts); | |
// capi needs a special configuration, see https://developer.couchbase.com/documentation/server/current/install/install-ports.html | |
patchConfig(CAPI_CONFIG, this::replaceCapiPort); | |
copyFileToContainer(Transferable.of(new byte[]{}), CACHED_CONFIG); | |
} | |
private void patchConfig(String configLocation, ThrowingFunction<String, String> patchFunction) { | |
String patchedConfig = copyFileFromContainer(configLocation, | |
inputStream -> patchFunction.apply(IOUtils.toString(inputStream, StandardCharsets.UTF_8))); | |
copyFileToContainer(Transferable.of(patchedConfig.getBytes(StandardCharsets.UTF_8)), configLocation); | |
} | |
private String addMappedPorts(String originalConfig) { | |
String portConfig = Stream.of(Ports.values()) | |
.map(port -> String.format("{%s, %d}.", port.name, port.getOriginalPort(containerPortOffset))) | |
.collect(Collectors.joining("\n")); | |
return String.format("%s\n%s", originalConfig, portConfig); | |
} | |
private String replaceCapiPort(String originalConfig) { | |
return Arrays.stream(originalConfig.split("\n")) | |
.map(s -> (s.matches("port\\s*=\\s*" + Ports.CAPI.getOriginalPort())) ? "port = " + Ports.CAPI.getOriginalPort(containerPortOffset) : s) | |
.collect(Collectors.joining("\n")); | |
} | |
private enum Ports { | |
REST("rest_port", 8091), | |
CAPI("capi_port", 8092), | |
QUERY("query_port", 8093), | |
FTS("fts_http_port", 8094), | |
CBAS("cbas_http_port", 8095), | |
EVENTING("eventing_http_port", 8096), | |
MEMCACHED_SSL("memcached_ssl_port", 11207), | |
MEMCACHED("memcached_port", 11210), | |
REST_SSL("ssl_rest_port", 18091), | |
CAPI_SSL("ssl_capi_port", 18092), | |
QUERY_SSL("ssl_query_port", 18093), | |
FTS_SSL("fts_ssl_port", 18094), | |
CBAS_SSL("cbas_ssl_port", 18095), | |
EVENTING_SSL("eventing_ssl_port", 18096); | |
private final String name; | |
private final int originalPort; | |
Ports(String name, int originalPort) { | |
this.name = name; | |
this.originalPort = originalPort; | |
} | |
public int getOriginalPort(int offset) { | |
return originalPort + offset; | |
} | |
public int getOriginalPort() { | |
return originalPort; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment