Created
November 17, 2014 20:53
-
-
Save NiteshKant/9cfec84d491c228b8713 to your computer and use it in GitHub Desktop.
EurekaServerWithDashboard
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.reactivex.lab.services; | |
import java.util.HashMap; | |
import java.util.HashSet; | |
import java.util.Map; | |
import java.util.stream.Collectors; | |
import com.netflix.eureka2.interests.ChangeNotification; | |
import com.netflix.eureka2.registry.InstanceInfo; | |
import io.netty.buffer.ByteBuf; | |
import io.netty.handler.codec.http.HttpHeaders; | |
import io.netty.handler.codec.http.HttpResponseStatus; | |
import io.reactivex.lab.services.common.SimpleJson; | |
import io.reactivex.netty.RxNetty; | |
import rx.Observable; | |
import rx.functions.Func1; | |
import rx.schedulers.Schedulers; | |
import com.netflix.eureka2.client.Eureka; | |
import com.netflix.eureka2.client.EurekaClient; | |
import com.netflix.eureka2.client.resolver.ServerResolver; | |
import com.netflix.eureka2.client.resolver.ServerResolvers; | |
import com.netflix.eureka2.interests.Interests; | |
import com.netflix.eureka2.registry.NetworkAddress.ProtocolType; | |
import com.netflix.eureka2.registry.ServicePort; | |
import com.netflix.eureka2.registry.datacenter.LocalDataCenterInfo; | |
import com.netflix.eureka2.server.EurekaWriteServer; | |
import com.netflix.eureka2.server.WriteServerConfig; | |
import com.netflix.eureka2.transport.EurekaTransports.Codec; | |
/** | |
* Eureka discovery server for services and gateway to use for instance discovery. | |
*/ | |
public class StartEurekaServer { | |
/* properties for overriding config */ | |
public static final String EUREKA_SERVER_HOST = System.getProperty("reactivelab.eureka.server.host", "127.0.0.1"); | |
public static final Integer EUREKA_SERVER_READ_PORT = Integer.getInteger("reactivelab.eureka.server.read.port", 7001); | |
public static final Integer EUREKA_SERVER_WRITE_PORT = Integer.getInteger("reactivelab.eureka.server.write.port", 7002); | |
public static void main(String[] args) throws Exception { | |
/* configure read/write Eureka server */ | |
System.setProperty("eureka2.eviction.allowedPercentage", "100"); // turn off eviction protection so during demos we don't hold on to hosts | |
System.setProperty("eureka2.eviction.timeoutMillis", "4000"); // set far lower than normal for demo/playground purposes | |
System.setProperty("eureka2.heartbeat.intervalMillis", "3000"); // set lower for demo/playground purposes | |
WriteServerConfig.WriteServerConfigBuilder builder = new WriteServerConfig.WriteServerConfigBuilder(); | |
builder.withReadServerPort(EUREKA_SERVER_READ_PORT).withWriteServerPort(EUREKA_SERVER_WRITE_PORT) | |
.withWriteClusterAddresses(new String[] { EUREKA_SERVER_HOST }) | |
.withCodec(Codec.Avro) | |
.withDataCenterType(LocalDataCenterInfo.DataCenterType.Basic); | |
EurekaWriteServer eurekaWriteServer = new EurekaWriteServer(builder.build()); | |
/* start the server */ | |
eurekaWriteServer.start(); | |
EurekaClient client = Eureka.newClientBuilder(ServerResolvers.from(new ServerResolver.Server(EUREKA_SERVER_HOST, EUREKA_SERVER_READ_PORT))).build(); | |
/** | |
* Background query for logging all events. | |
*/ | |
client.forInterest(Interests.forFullRegistry()) | |
.subscribeOn(Schedulers.computation()) | |
.forEach(n -> { | |
String vip = n.getData().getVipAddress(); | |
String app = n.getData().getApp(); | |
if (app == null) { | |
app = "none"; | |
} | |
String name = n.getData().getDataCenterInfo().getName(); | |
String ipAddress = n.getData().getDataCenterInfo() | |
.getAddresses().stream() | |
.filter(na -> na.getProtocolType() == ProtocolType.IPv4) | |
.collect(Collectors.toList()).get(0).getIpAddress(); | |
HashSet<ServicePort> servicePorts = n.getData().getPorts(); | |
int port = servicePorts.iterator().next().getPort(); | |
System.out.println("Eureka => " + n.getKind() + " => App: " + app + " VIP: " + vip + " Name: " + name + " IP: " + ipAddress + " Port: " + port); | |
}); | |
startEurekaDashboard(8088, client); | |
eurekaWriteServer.waitTillShutdown(); | |
} | |
private static void startEurekaDashboard(final int port, EurekaClient client) { | |
RxNetty.createHttpServer(port, (request, response) -> { | |
if (request.getUri().startsWith("/dashboard")) { | |
return Observable.empty(); | |
} else if (request.getUri().startsWith("/data")) { | |
response.getHeaders().set(HttpHeaders.Names.CONTENT_TYPE, "text/event-stream"); | |
return client.forInterest(Interests.forFullRegistry()) | |
.flatMap(notification -> { | |
ByteBuf data = response.getAllocator().buffer(); | |
data.writeBytes("data: ".getBytes()); | |
Map<String, String> dataAttributes = new HashMap<>(); | |
dataAttributes.put("type", notification.getKind().toString()); | |
dataAttributes.put("instance-id", notification.getData().getId()); | |
dataAttributes.put("vip", notification.getData().getVipAddress()); | |
HashSet<ServicePort> servicePorts = notification.getData().getPorts(); | |
int port1 = servicePorts.iterator().next().getPort(); | |
dataAttributes.put("port", String.valueOf(port1)); | |
String jsonData = SimpleJson.mapToJson(dataAttributes); | |
data.writeBytes(jsonData.getBytes()); | |
data.writeChar('\n'); | |
return response.writeBytesAndFlush(data); | |
}); | |
} else { | |
response.setStatus(HttpResponseStatus.NOT_FOUND); | |
return Observable.empty(); | |
} | |
}).start(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment