Skip to content

Instantly share code, notes, and snippets.

@trajano
Last active February 9, 2019 19:30
Show Gist options
  • Save trajano/4bfeec46566d2ea2d98553d31a76e889 to your computer and use it in GitHub Desktop.
Save trajano/4bfeec46566d2ea2d98553d31a76e889 to your computer and use it in GitHub Desktop.
Docker Swarm Discovery Client
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.DefaultServiceInstance;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.gateway.event.RefreshRoutesEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.annotation.Order;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* This is a discovery client that queries the services and then listens to the event stream for changes.
* Presently this just relies on hard coded values, but will change to use Docker Swarm later.
*/
@Component
@Slf4j
@Order(-1)
public class DockerSwarmDiscoveryClient implements DiscoveryClient, ApplicationListener<RefreshRoutesEvent> {
/**
* This is the network name to get the host aliases from. The network is expected to be unique.
*/
@Value("${swarm.discovery.network}")
private String network = "ljbackend";
/**
* This is the Docker daemon endpoint to connect to. This is left as a string in order to support {@code /var/run/docker.sock} in the future.
*/
@Value("${swarm.discovery.endpoint}")
private String daemonEndpoint = "http://daemon:2375/";
/**
* If set to true, the discovery will be required and will throw an exception if the system is not running in a swarm.
*/
@Value("${swarm.discovery.required:false}")
private boolean required;
/**
* Map from service ID to a list of service instances.
*/
private final Map<String, List<ServiceInstance>> serviceInstanceMap = new ConcurrentHashMap<>();
/**
* The network ID that is determined in {@link #init()}.
*/
private String networkId;
/**
* Initializes the discovery client.
*/
@PostConstruct
public void init() {
final WebClient client = WebClient.create(daemonEndpoint);
try {
client.get().uri("/_ping").retrieve().bodyToMono(String.class).block();
} catch (RuntimeException e) {
log.error("Unable to ping {}", daemonEndpoint, e);
if (required) {
throw e;
} else {
log.warn("Ignoring error as discovery is not required");
return;
}
}
final List<Map<String, Object>> networks = client.get()
.uri("/networks?name" + network)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(new ParameterizedTypeReference<List<Map<String, Object>>>() {
})
.block();
assert networks != null;
networkId = networks
.parallelStream()
.filter(n -> network.equals(n.get("Name")))
.map(n -> (String) n.get("Id"))
.findAny()
.orElseThrow(IllegalStateException::new);
}
/**
* {@inheritDoc}
*/
@Override
public List<ServiceInstance> getInstances(String serviceId) {
List<ServiceInstance> serviceInstances = serviceInstanceMap.get(serviceId);
log.debug("{}={}", serviceId, serviceInstances);
return serviceInstances;
}
/**
* {@inheritDoc}
*/
@Override
public String description() {
return "Docker Swarm Discovery Client";
}
/**
* {@inheritDoc}
*/
@Override
public List<String> getServices() {
Set<String> services = serviceInstanceMap.keySet();
log.info("services={}", services);
return new ArrayList<>(services);
}
/**
* Updates the service instance map. This performs calls to the daemon endpoint to get the current state of the
* services map.
*
* @param event event
*/
@Override
public void onApplicationEvent(RefreshRoutesEvent event) {
final WebClient client = WebClient.create(daemonEndpoint);
client.get()
.uri("/services?label=com.docker.stack.namespace")
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(new ParameterizedTypeReference<List<Map<String, Object>>>() {
})
.subscribe(services -> services.parallelStream()
.map(e -> {
final String id = (String) e.get("ID");
@SuppressWarnings("unchecked") final Map<String, Object> spec = (Map<String, Object>) e.get("Spec");
log.debug("{}={}", id, spec);
@SuppressWarnings("unchecked") final Map<String, String> labels = (Map<String, String>) spec.get("Labels");
final String serviceId = labels.get(Labels.SERVICE_ID);
if (serviceId == null) {
return null;
}
final int servicePort = Integer.parseInt(labels.getOrDefault(Labels.SERVICE_PORT, "8080"));
final boolean serviceSecure = Boolean.parseBoolean(labels.get(Labels.SERVICE_SECURE));
labels.putIfAbsent(Labels.SERVICE_PATH, "/" + serviceId + "/**");
labels.putIfAbsent(Labels.SERVICE_PATH_REGEXP, labels.get(Labels.SERVICE_PATH).replace("/**", "/(?<remaining>.*)"));
labels.putIfAbsent(Labels.SERVICE_PATH_REPLACEMENT, "/${remaining}");
@SuppressWarnings("unchecked") final Map<String, Object> taskTemplate = (Map<String, Object>) spec.get("TaskTemplate");
@SuppressWarnings("unchecked") final List<Map<String, Object>> taskNetworks = (List<Map<String, Object>>) taskTemplate.get("Networks");
Optional<String> networkAlias = taskNetworks
.parallelStream()
.filter(n -> networkId.startsWith((String) n.get("Target")))
.map(n -> {
@SuppressWarnings("unchecked") final List<String> aliases = (List<String>) n.get("Aliases");
return aliases.get(0);
})
.findAny();
if (networkAlias.isEmpty()) {
return null;
}
log.info("labels={}", labels);
return new DefaultServiceInstance(
id,
serviceId,
networkAlias.get(),
servicePort,
serviceSecure,
labels
);
})
.filter(Objects::nonNull)
.sequential()
.forEach(instance -> {
final List<ServiceInstance> serviceInstances = serviceInstanceMap.computeIfAbsent(instance.getServiceId(), v -> new ArrayList<>());
serviceInstances.add(instance);
serviceInstanceMap.put(instance.getServiceId(), serviceInstances);
}));
}
}
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.gateway.event.RefreshRoutesEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* This watches the {@code /events} endpoint for service create, update and remove events and if it detects a change it will publish
*/
@Component
@Slf4j
public class DockerSwarmEventWatcher {
/**
* This is the network name to get the host aliases from. The network is expected to be unique.
*/
@Value("${swarm.discovery.network}")
private String network = "ljbackend";
/**
* This is the Docker daemon endpoint to connect to. This is left as a string in order to support {@code /var/run/docker.sock} in the future.
*/
@Value("${swarm.discovery.endpoint}")
private String daemonEndpoint;
@Autowired
private ApplicationEventPublisher publisher;
@Autowired
private ObjectMapper mapper;
@PostConstruct
public void setUpStreamer() throws JsonProcessingException {
final Map<String, List<String>> filters = new HashMap<>();
filters.put("scope", Collections.singletonList("swarm"));
filters.put("type", Collections.singletonList("service"));
WebClient.create(daemonEndpoint)
.get()
.uri("/events?filters={filters}", mapper.writeValueAsString(filters))
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class)
.flatMap(Mono::justOrEmpty)
.map(s -> {
try {
return mapper.readValue(s, Map.class);
} catch (IOException e) {
log.warn("unable to parse {} as JSON", s);
return null;
}
})
.flatMap(Mono::justOrEmpty)
.bufferTimeout(20, Duration.ofSeconds(1))
.subscribe(
events -> {
log.info("{} service event(s) detected, starting update", events.size());
log.trace("events={}", events);
publisher.publishEvent(new RefreshRoutesEvent(this));
},
throwable -> log.error("Error on event stream: {}", throwable.getMessage(), throwable),
() -> log.warn("event stream completed")
);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment