Last active
February 9, 2019 19:30
-
-
Save trajano/4bfeec46566d2ea2d98553d31a76e889 to your computer and use it in GitHub Desktop.
Docker Swarm Discovery Client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.cloud.client.DefaultServiceInstance; | |
import org.springframework.cloud.client.ServiceInstance; | |
import org.springframework.cloud.client.discovery.DiscoveryClient; | |
import org.springframework.cloud.gateway.event.RefreshRoutesEvent; | |
import org.springframework.context.ApplicationListener; | |
import org.springframework.core.ParameterizedTypeReference; | |
import org.springframework.core.annotation.Order; | |
import org.springframework.http.MediaType; | |
import org.springframework.stereotype.Component; | |
import org.springframework.web.reactive.function.client.WebClient; | |
import javax.annotation.PostConstruct; | |
import java.util.*; | |
import java.util.concurrent.ConcurrentHashMap; | |
/** | |
* This is a discovery client that queries the services and then listens to the event stream for changes. | |
* Presently this just relies on hard coded values, but will change to use Docker Swarm later. | |
*/ | |
@Component | |
@Slf4j | |
@Order(-1) | |
public class DockerSwarmDiscoveryClient implements DiscoveryClient, ApplicationListener<RefreshRoutesEvent> { | |
/** | |
* This is the network name to get the host aliases from. The network is expected to be unique. | |
*/ | |
@Value("${swarm.discovery.network}") | |
private String network = "ljbackend"; | |
/** | |
* This is the Docker daemon endpoint to connect to. This is left as a string in order to support {@code /var/run/docker.sock} in the future. | |
*/ | |
@Value("${swarm.discovery.endpoint}") | |
private String daemonEndpoint = "http://daemon:2375/"; | |
/** | |
* If set to true, the discovery will be required and will throw an exception if the system is not running in a swarm. | |
*/ | |
@Value("${swarm.discovery.required:false}") | |
private boolean required; | |
/** | |
* Map from service ID to a list of service instances. | |
*/ | |
private final Map<String, List<ServiceInstance>> serviceInstanceMap = new ConcurrentHashMap<>(); | |
/** | |
* The network ID that is determined in {@link #init()}. | |
*/ | |
private String networkId; | |
/** | |
* Initializes the discovery client. | |
*/ | |
@PostConstruct | |
public void init() { | |
final WebClient client = WebClient.create(daemonEndpoint); | |
try { | |
client.get().uri("/_ping").retrieve().bodyToMono(String.class).block(); | |
} catch (RuntimeException e) { | |
log.error("Unable to ping {}", daemonEndpoint, e); | |
if (required) { | |
throw e; | |
} else { | |
log.warn("Ignoring error as discovery is not required"); | |
return; | |
} | |
} | |
final List<Map<String, Object>> networks = client.get() | |
.uri("/networks?name" + network) | |
.accept(MediaType.APPLICATION_JSON) | |
.retrieve() | |
.bodyToMono(new ParameterizedTypeReference<List<Map<String, Object>>>() { | |
}) | |
.block(); | |
assert networks != null; | |
networkId = networks | |
.parallelStream() | |
.filter(n -> network.equals(n.get("Name"))) | |
.map(n -> (String) n.get("Id")) | |
.findAny() | |
.orElseThrow(IllegalStateException::new); | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public List<ServiceInstance> getInstances(String serviceId) { | |
List<ServiceInstance> serviceInstances = serviceInstanceMap.get(serviceId); | |
log.debug("{}={}", serviceId, serviceInstances); | |
return serviceInstances; | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public String description() { | |
return "Docker Swarm Discovery Client"; | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public List<String> getServices() { | |
Set<String> services = serviceInstanceMap.keySet(); | |
log.info("services={}", services); | |
return new ArrayList<>(services); | |
} | |
/** | |
* Updates the service instance map. This performs calls to the daemon endpoint to get the current state of the | |
* services map. | |
* | |
* @param event event | |
*/ | |
@Override | |
public void onApplicationEvent(RefreshRoutesEvent event) { | |
final WebClient client = WebClient.create(daemonEndpoint); | |
client.get() | |
.uri("/services?label=com.docker.stack.namespace") | |
.accept(MediaType.APPLICATION_JSON) | |
.retrieve() | |
.bodyToMono(new ParameterizedTypeReference<List<Map<String, Object>>>() { | |
}) | |
.subscribe(services -> services.parallelStream() | |
.map(e -> { | |
final String id = (String) e.get("ID"); | |
@SuppressWarnings("unchecked") final Map<String, Object> spec = (Map<String, Object>) e.get("Spec"); | |
log.debug("{}={}", id, spec); | |
@SuppressWarnings("unchecked") final Map<String, String> labels = (Map<String, String>) spec.get("Labels"); | |
final String serviceId = labels.get(Labels.SERVICE_ID); | |
if (serviceId == null) { | |
return null; | |
} | |
final int servicePort = Integer.parseInt(labels.getOrDefault(Labels.SERVICE_PORT, "8080")); | |
final boolean serviceSecure = Boolean.parseBoolean(labels.get(Labels.SERVICE_SECURE)); | |
labels.putIfAbsent(Labels.SERVICE_PATH, "/" + serviceId + "/**"); | |
labels.putIfAbsent(Labels.SERVICE_PATH_REGEXP, labels.get(Labels.SERVICE_PATH).replace("/**", "/(?<remaining>.*)")); | |
labels.putIfAbsent(Labels.SERVICE_PATH_REPLACEMENT, "/${remaining}"); | |
@SuppressWarnings("unchecked") final Map<String, Object> taskTemplate = (Map<String, Object>) spec.get("TaskTemplate"); | |
@SuppressWarnings("unchecked") final List<Map<String, Object>> taskNetworks = (List<Map<String, Object>>) taskTemplate.get("Networks"); | |
Optional<String> networkAlias = taskNetworks | |
.parallelStream() | |
.filter(n -> networkId.startsWith((String) n.get("Target"))) | |
.map(n -> { | |
@SuppressWarnings("unchecked") final List<String> aliases = (List<String>) n.get("Aliases"); | |
return aliases.get(0); | |
}) | |
.findAny(); | |
if (networkAlias.isEmpty()) { | |
return null; | |
} | |
log.info("labels={}", labels); | |
return new DefaultServiceInstance( | |
id, | |
serviceId, | |
networkAlias.get(), | |
servicePort, | |
serviceSecure, | |
labels | |
); | |
}) | |
.filter(Objects::nonNull) | |
.sequential() | |
.forEach(instance -> { | |
final List<ServiceInstance> serviceInstances = serviceInstanceMap.computeIfAbsent(instance.getServiceId(), v -> new ArrayList<>()); | |
serviceInstances.add(instance); | |
serviceInstanceMap.put(instance.getServiceId(), serviceInstances); | |
})); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.fasterxml.jackson.core.JsonProcessingException; | |
import com.fasterxml.jackson.databind.ObjectMapper; | |
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.cloud.gateway.event.RefreshRoutesEvent; | |
import org.springframework.context.ApplicationEventPublisher; | |
import org.springframework.http.MediaType; | |
import org.springframework.stereotype.Component; | |
import org.springframework.web.reactive.function.client.WebClient; | |
import reactor.core.publisher.Mono; | |
import javax.annotation.PostConstruct; | |
import java.io.IOException; | |
import java.time.Duration; | |
import java.util.Collections; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
/** | |
* This watches the {@code /events} endpoint for service create, update and remove events and if it detects a change it will publish | |
*/ | |
@Component | |
@Slf4j | |
public class DockerSwarmEventWatcher { | |
/** | |
* This is the network name to get the host aliases from. The network is expected to be unique. | |
*/ | |
@Value("${swarm.discovery.network}") | |
private String network = "ljbackend"; | |
/** | |
* This is the Docker daemon endpoint to connect to. This is left as a string in order to support {@code /var/run/docker.sock} in the future. | |
*/ | |
@Value("${swarm.discovery.endpoint}") | |
private String daemonEndpoint; | |
@Autowired | |
private ApplicationEventPublisher publisher; | |
@Autowired | |
private ObjectMapper mapper; | |
@PostConstruct | |
public void setUpStreamer() throws JsonProcessingException { | |
final Map<String, List<String>> filters = new HashMap<>(); | |
filters.put("scope", Collections.singletonList("swarm")); | |
filters.put("type", Collections.singletonList("service")); | |
WebClient.create(daemonEndpoint) | |
.get() | |
.uri("/events?filters={filters}", mapper.writeValueAsString(filters)) | |
.accept(MediaType.TEXT_EVENT_STREAM) | |
.retrieve() | |
.bodyToFlux(String.class) | |
.flatMap(Mono::justOrEmpty) | |
.map(s -> { | |
try { | |
return mapper.readValue(s, Map.class); | |
} catch (IOException e) { | |
log.warn("unable to parse {} as JSON", s); | |
return null; | |
} | |
}) | |
.flatMap(Mono::justOrEmpty) | |
.bufferTimeout(20, Duration.ofSeconds(1)) | |
.subscribe( | |
events -> { | |
log.info("{} service event(s) detected, starting update", events.size()); | |
log.trace("events={}", events); | |
publisher.publishEvent(new RefreshRoutesEvent(this)); | |
}, | |
throwable -> log.error("Error on event stream: {}", throwable.getMessage(), throwable), | |
() -> log.warn("event stream completed") | |
); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment