Created
June 25, 2014 07:12
-
-
Save pveentjer/4072e4b7bd14072083d6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.hazelcast.projectx.io.maggot; | |
import com.hazelcast.projectx.InvocationFuture; | |
import com.hazelcast.projectx.config.Config; | |
import com.hazelcast.projectx.io.Address; | |
import com.hazelcast.projectx.io.AddressComparator; | |
import com.hazelcast.projectx.io.Network; | |
import com.hazelcast.projectx.io.PartitionEndpoint; | |
import com.hazelcast.projectx.spi.SpiInvoker; | |
import io.netty.bootstrap.Bootstrap; | |
import io.netty.bootstrap.ServerBootstrap; | |
import io.netty.buffer.ByteBuf; | |
import io.netty.buffer.Unpooled; | |
import io.netty.channel.ChannelFuture; | |
import io.netty.channel.ChannelHandlerContext; | |
import io.netty.channel.ChannelInboundHandlerAdapter; | |
import io.netty.channel.ChannelInitializer; | |
import io.netty.channel.ChannelOption; | |
import io.netty.channel.EventLoopGroup; | |
import io.netty.channel.SimpleChannelInboundHandler; | |
import io.netty.channel.nio.NioEventLoopGroup; | |
import io.netty.channel.socket.SocketChannel; | |
import io.netty.channel.socket.nio.NioServerSocketChannel; | |
import io.netty.channel.socket.nio.NioSocketChannel; | |
import io.netty.handler.codec.ByteToMessageDecoder; | |
import io.netty.util.ReferenceCountUtil; | |
import java.net.InetSocketAddress; | |
import java.net.SocketAddress; | |
import java.nio.ByteBuffer; | |
import java.util.Collections; | |
import java.util.HashMap; | |
import java.util.LinkedList; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.ConcurrentLinkedQueue; | |
import java.util.concurrent.ConcurrentMap; | |
import java.util.concurrent.atomic.AtomicLong; | |
public class MaggotNetwork implements Network { | |
private final Config config; | |
private final SpiInvoker invoker; | |
private final Address[] addresses; | |
private final Endpoint[] endpoints; | |
public MaggotNetwork(Settings settings) { | |
this.config = settings.config; | |
this.invoker = settings.invoker; | |
this.addresses = initAddresses(); | |
this.endpoints = new Endpoint[config.partitionCount]; | |
} | |
private Address[] initAddresses() { | |
List<Address> addresses = new LinkedList<Address>(); | |
for (Address baseAddress : config.addresses) { | |
for (int port = baseAddress.port; port < baseAddress.port + config.partitionThreadCount; port++) { | |
addresses.add(new Address(baseAddress.hostname, port)); | |
} | |
} | |
Collections.sort(addresses, new AddressComparator()); | |
return addresses.toArray(new Address[addresses.size()]); | |
} | |
@Override | |
public PartitionEndpoint getEndpoint(int partitionId) { | |
return endpoints[partitionId]; | |
} | |
@Override | |
public void start() { | |
//start the server threads. | |
for (int k = 0; k < config.partitionThreadCount; k++) { | |
try { | |
int port = k + config.port; | |
startServerThread(port); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
startEndpoints(); | |
//start the client threads. | |
for (int k = 0; k < config.partitionThreadCount; k++) { | |
try { | |
startClientThread(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
System.out.println("Maggot Network started"); | |
} | |
private void startClientThread() throws InterruptedException { | |
Bootstrap b = new Bootstrap(); | |
EventLoopGroup workerGroup = new NioEventLoopGroup(); | |
b.group(workerGroup); | |
b.channel(NioSocketChannel.class); | |
b.option(ChannelOption.SO_KEEPALIVE, true); | |
b.handler(new ChannelInitializer<SocketChannel>() { | |
@Override | |
public void initChannel(SocketChannel ch) throws Exception { | |
ch.pipeline().addLast(new OperationClientHandler()); | |
} | |
}); | |
// Start the client. | |
//ChannelFuture f = b.connect(host, port).sync(); // (5) | |
} | |
/** | |
* Starts a server thread on a given port. To add more parallelization, we just open more ports. Each port will | |
* be responsible for handling a set of partitions. On the client side you calculate the position, then you calculate | |
* the port of the remote machine and send the operation directly to the right cpu. | |
* | |
* @param port | |
* @throws InterruptedException | |
*/ | |
private void startServerThread(int port) throws InterruptedException { | |
//we only want to have 1 server thread per port. | |
int threadCount = 1; | |
EventLoopGroup bossGroup = new NioEventLoopGroup(threadCount); | |
EventLoopGroup workerGroup = new NioEventLoopGroup(); | |
ServerBootstrap b = new ServerBootstrap(); | |
b.group(bossGroup, workerGroup) | |
.channel(NioServerSocketChannel.class) | |
.childHandler(new ChannelInitializer<SocketChannel>() { | |
@Override | |
public void initChannel(SocketChannel ch) throws Exception { | |
ch.pipeline().addLast(new OperationDecoder(), new OperationServerHandler()); | |
} | |
}) | |
.option(ChannelOption.SO_BACKLOG, 128) | |
.childOption(ChannelOption.SO_KEEPALIVE, true); | |
// Bind and start to accept incoming connections. | |
ChannelFuture f = b.bind(port).sync(); // (7) | |
System.out.println("started: " + port); | |
// Wait until the server socket is closed. | |
// In this example, this does not happen, but you can do that to gracefully | |
// shut down your server. | |
//f.channel().closeFuture().sync(); | |
} | |
private void startEndpoints() { | |
Map<Address, Endpoint> lookup = new HashMap<Address, Endpoint>(); | |
for (int partitionId = 0; partitionId < endpoints.length; partitionId++) { | |
Address address = getAddress(partitionId); | |
Endpoint endpoint = lookup.get(address); | |
if (endpoint == null) { | |
endpoint = new Endpoint(address); | |
} | |
endpoints[partitionId] = endpoint; | |
} | |
} | |
private Address getAddress(int partitionId) { | |
int index = partitionId % addresses.length; | |
return addresses[index]; | |
} | |
@Override | |
public void shutdown() { | |
} | |
/** | |
* Responsible for decoding an operation. This is run server-side. | |
*/ | |
public class OperationDecoder extends ByteToMessageDecoder { | |
private int size = -1; | |
@Override | |
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { | |
if (size == -1) { | |
if (in.readableBytes() < 4) { | |
return; | |
} | |
size = in.readInt(); | |
} | |
if (in.readableBytes() < size) { | |
return; | |
} | |
byte[] operation = new byte[size]; | |
in.readBytes(operation); | |
out.add(operation); | |
size = -1; | |
} | |
} | |
/** | |
* This functionality works fine. | |
* <p/> | |
* Few enhancements: | |
* - instead of creating a response buffer, we should reuse the existing one | |
* - instead op copying the operation into an array, we should pass the byte-buffer (to prevent unwanted copying). | |
*/ | |
public class OperationServerHandler extends ChannelInboundHandlerAdapter { | |
//todo: currently not used. | |
private ByteBuf buf; | |
@Override | |
public void handlerAdded(ChannelHandlerContext ctx) { | |
buf = ctx.alloc().buffer(1024); | |
// System.out.println("Channel handler added"); | |
} | |
@Override | |
public void handlerRemoved(ChannelHandlerContext ctx) { | |
buf.release(); | |
buf = null; | |
} | |
@Override | |
public void channelRead(ChannelHandlerContext ctx, Object msg) { | |
try { | |
ByteBuffer response = ByteBuffer.allocate(1024); | |
byte[] operation = (byte[]) msg; | |
invoker.invoke(operation, response); | |
response.flip(); | |
ctx.write(Unpooled.wrappedBuffer(response)); | |
} finally { | |
ReferenceCountUtil.release(msg); | |
} | |
} | |
@Override | |
public void channelReadComplete(ChannelHandlerContext ctx) { | |
ctx.flush(); | |
} | |
@Override | |
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { | |
// Close the connection when an exception is raised. | |
cause.printStackTrace(); | |
ctx.close(); | |
} | |
} | |
/** | |
* Responsible for translating a client request, e.g. map.get(foo) to an operation, sending | |
* it to the right port on the right machine and waiting for a response. | |
* | |
* The Endpoint is fixed, but the address is points to can change over time (in the future) e.g. when | |
* partitions are moving around. So you only need to get the PartitionEndpoint once and then you can | |
* stick it into field for example instead of looking it up for every call. | |
*/ | |
public class Endpoint implements PartitionEndpoint { | |
private final ConcurrentMap<Long, InvocationFuture> calls = new ConcurrentHashMap<>(); | |
private final AtomicLong nextCallId = new AtomicLong(); | |
private final SocketAddress address; | |
private final OperationClientHandler clientHandler = null; | |
public Endpoint(Address address) { | |
this.address = new InetSocketAddress(address.hostname, address.port); | |
} | |
@Override | |
public ByteBuffer newOperationByteBuffer() { | |
return ByteBuffer.allocate(1024); | |
} | |
@Override | |
public long register(InvocationFuture invocationFuture) { | |
long callId = nextCallId.incrementAndGet(); | |
calls.put(callId, invocationFuture); | |
return callId; | |
} | |
@Override | |
public void write(ByteBuffer operation) { | |
//switch to reading mode | |
operation.flip(); | |
} | |
} | |
//todo: work in progress | |
/** | |
* Responsible for receiving operations from an endpoint, and sending them to the right machine. | |
* In our nio implementation, the selection-key for a channel we give a kick after we offer | |
* some work on the work-queue. This triggers the client-thread to drain the work-queue and | |
* send the operations to the right machine. The OperationClientHandler also is responsible for receiving | |
* the result and notifying the future associated with that operation (through a call-id) | |
*/ | |
public class OperationClientHandler extends SimpleChannelInboundHandler { | |
private ChannelHandlerContext ctx; | |
private int receivedMessages; | |
private int next = 1; | |
final ConcurrentLinkedQueue answer = new ConcurrentLinkedQueue(); | |
@Override | |
public void channelActive(ChannelHandlerContext ctx) { | |
this.ctx = ctx; | |
} | |
@Override | |
public void channelRead0(ChannelHandlerContext ctx, final Object msg) { | |
} | |
@Override | |
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { | |
cause.printStackTrace(); | |
ctx.close(); | |
} | |
} | |
public static class ConnectRequest { | |
public Address address; | |
public MaggotNetwork.Endpoint endpoint; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment