Created
August 6, 2017 04:41
-
-
Save VMuliadi/1c1bd1e2115ad84594ca62e4f29146f0 to your computer and use it in GitHub Desktop.
Latest DecisionEngineRouter for Broadcast Message
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package routing; | |
import java.util.*; | |
import core.*; | |
/** | |
* This class overrides ActiveRouter in order to inject calls to a | |
* DecisionEngine object where needed add extract as much code from the update() | |
* method as possible. | |
* | |
* <strong>Forwarding Logic:</strong> | |
* | |
* A DecisionEngineRouter maintains a List of Tuple<Message, Connection> in | |
* support of a call to ActiveRouter.tryMessagesForConnected() in | |
* DecisionEngineRouter.update(). Since update() is called so frequently, we'd | |
* like as little computation done in it as possible; hence the List that gets | |
* updated when events happen. Four events cause the List to be updated: a new | |
* message from this host, a new received message, a connection goes up, or a | |
* connection goes down. On a new message (either from this host or received | |
* from a peer), the collection of open connections is examined to see if the | |
* message should be forwarded along them. If so, a new Tuple is added to the | |
* List. When a connection goes up, the collection of messages is examined to | |
* determine if any should be sent to this new peer, adding a Tuple | |
* to the list if so. When a connection goes down, any Tuple in the list | |
* associated with that connection is removed from the List. | |
* | |
* <strong>Decision Engines</strong> | |
* | |
* Most (if not all) routing decision making is provided by a | |
* RoutingDecisionEngine object. The DecisionEngine Interface defines methods | |
* that enact computation and return decisions as follows: | |
* | |
* <ul> | |
* <li>In createNewMessage(), a call to RoutingDecisionEngine.newMessage() is | |
* made. A return value of true indicates that the message should be added to | |
* the message store for routing. A false value indicates the message should | |
* be discarded. | |
* </li> | |
* <li>changedConnection() indicates either a connection went up or down. The | |
* appropriate connectionUp() or connectionDown() method is called on the | |
* RoutingDecisionEngine object. Also, on connection up events, this first | |
* peer to call changedConnection() will also call | |
* RoutingDecisionEngine.doExchangeForNewConnection() so that the two | |
* decision engine objects can simultaneously exchange information and update | |
* their routing tables (without fear of this method being called a second | |
* time). | |
* </li> | |
* <li>Starting a Message transfer, a protocol first asks the neighboring peer | |
* if it's okay to send the Message. If the peer indicates that the Message is | |
* OLD or DELIVERED, call to RoutingDecisionEngine.shouldDeleteOldMessage() is | |
* made to determine if the Message should be removed from the message store. | |
* <em>Note: if tombstones are enabled or deleteDelivered is disabled, the | |
* Message will be deleted and no call to this method will be made.</em> | |
* </li> | |
* <li>When a message is received (in messageTransferred), a call to | |
* RoutingDecisionEngine.isFinalDest() to determine if the receiving (this) | |
* host is an intended recipient of the Message. Next, a call to | |
* RoutingDecisionEngine.shouldSaveReceivedMessage() is made to determine if | |
* the new message should be stored and attempts to forward it on should be | |
* made. If so, the set of Connections is examined for transfer opportunities | |
* as described above. | |
* </li> | |
* <li> When a message is sent (in transferDone()), a call to | |
* RoutingDecisionEngine.shouldDeleteSentMessage() is made to ask if the | |
* departed Message now residing on a peer should be removed from the message | |
* store. | |
* </li> | |
* </ul> | |
* | |
* <strong>Tombstones</strong> | |
* | |
* The ONE has the the deleteDelivered option that lets a host delete a message | |
* if it comes in contact with the message's destination. More aggressive | |
* approach lets a host remember that a given message was already delivered by | |
* storing the message ID in a list of delivered messages (which is called the | |
* tombstone list here). Whenever any node tries to send a message to a host | |
* that has a tombstone for the message, the sending node receives the | |
* tombstone. | |
* | |
* @author PJ Dillon, University of Pittsburgh | |
*/ | |
public class DecisionEngineRouter extends ActiveRouter { | |
public static final String PUBSUB_NS = "DecisionEngineRouter"; | |
public static final String ENGINE_SETTING = "decisionEngine"; | |
public static final String TOMBSTONE_SETTING = "tombstones"; | |
public static final String CONNECTION_STATE_SETTING = ""; | |
protected boolean tombstoning; | |
protected RoutingDecisionEngine decider; | |
protected List<Tuple<Message, Connection>> outgoingMessages; | |
protected Set<String> tombstones; | |
/** | |
* Used to save state machine when new connections are made. See comment in | |
* changedConnection() | |
*/ | |
protected Map<Connection, Integer> conStates; | |
public DecisionEngineRouter(Settings s) { | |
super(s); | |
Settings routeSettings = new Settings(PUBSUB_NS); | |
outgoingMessages = new LinkedList<Tuple<Message, Connection>>(); | |
decider = (RoutingDecisionEngine)routeSettings.createIntializedObject | |
("routing." + routeSettings.getSetting(ENGINE_SETTING)); | |
if (routeSettings.contains(TOMBSTONE_SETTING)) tombstoning = routeSettings.getBoolean(TOMBSTONE_SETTING); | |
else tombstoning = false; | |
if (tombstoning) tombstones = new HashSet<String>(10); | |
conStates = new HashMap<Connection, Integer>(4); | |
} | |
public DecisionEngineRouter(DecisionEngineRouter r) { | |
super(r); | |
outgoingMessages = new LinkedList<>(); | |
decider = r.decider.replicate(); | |
tombstoning = r.tombstoning; | |
if (this.tombstoning) tombstones = new HashSet<String>(10); | |
conStates = new HashMap<>(4); | |
} | |
@Override | |
public MessageRouter replicate() | |
{ | |
return new DecisionEngineRouter(this); | |
} | |
@Override | |
public boolean createNewMessage(Message m) { | |
if (decider.newMessage(m)) { | |
makeRoomForNewMessage(m.getSize()); | |
m.setTtl(this.msgTtl); | |
addToMessages(m, true); | |
findConnectionsForNewMessage(m, getHost()); | |
return true; | |
} return false; | |
} | |
@Override | |
public void changedConnection(Connection con) { | |
DTNHost myHost = getHost(); | |
DTNHost otherNode = con.getOtherNode(myHost); | |
DecisionEngineRouter otherRouter = (DecisionEngineRouter)otherNode.getRouter(); | |
if (con.isUp()) { | |
decider.connectionUp(myHost, otherNode); | |
/* | |
* This part is a little confusing because there's a problem we have to | |
* avoid. When a connection comes up, we're assuming here that the two | |
* hosts who are now connected will exchange some routing information and | |
* update their own based on what the get from the peer. So host A updates | |
* its routing table with info from host B, and vice versa. In the real | |
* world, A would send its *old* routing information to B and compute new | |
* routing information later after receiving B's *old* routing information. | |
* In ONE, changedConnection() is called twice, once for each host A and | |
* B, in a serial fashion. If it's called for A first, A uses B's old info | |
* to compute its new info, but B later uses A's *new* info to compute its | |
* new info.... and this can lead to some nasty problems. | |
* | |
* To combat this, whichever host calls changedConnection() first calls | |
* doExchange() once. doExchange() interacts with the DecisionEngine to | |
* initiate the exchange of information, and it's assumed that this code | |
* will update the information on both peers simultaneously using the old | |
* information from both peers. | |
*/ | |
if(shouldNotifyPeer(con)) { | |
this.doExchange(con, otherNode); | |
otherRouter.didExchange(con); | |
} | |
/* | |
* Once we have new information computed for the peer, we figure out if | |
* there are any messages that should get sent to this peer. | |
*/ | |
Collection<Message> msgs = getMessageCollection(); | |
for (Message m : msgs) if (decider.shouldSendMessageToHost(m, otherNode)) outgoingMessages.add(new Tuple<Message,Connection>(m, con)); | |
} else { | |
decider.connectionDown(myHost, otherNode); | |
conStates.remove(con); | |
/* | |
* If we were trying to send message to this peer, we need to remove them | |
* from the outgoing List. | |
*/ | |
for (Iterator<Tuple<Message,Connection>> i = outgoingMessages.iterator(); i.hasNext();) { | |
Tuple<Message, Connection> t = i.next(); | |
if (t.getValue() == con) i.remove(); | |
} | |
} | |
} | |
protected void doExchange(Connection con, DTNHost otherHost) { | |
conStates.put(con, 1); | |
decider.doExchangeForNewConnection(con, otherHost); | |
} | |
/** | |
* Called by a peer DecisionEngineRouter to indicated that it already | |
* performed an information exchange for the given connection. | |
* | |
* @param con Connection on which the exchange was performed | |
*/ | |
protected void didExchange(Connection con) | |
{ | |
conStates.put(con, 1); | |
} | |
@Override | |
protected int startTransfer(Message m, Connection con) { | |
int retVal; | |
if (!con.isReadyForTransfer()) return TRY_LATER_BUSY; | |
retVal = con.startTransfer(getHost(), m); | |
if (retVal == RCV_OK) addToSendingConnections(con); // started transfer | |
else if(tombstoning && retVal == DENIED_DELIVERED) { | |
this.deleteMessage(m.getId(), false); | |
tombstones.add(m.getId()); | |
} else if (deleteDelivered && (retVal == DENIED_OLD || retVal == DENIED_DELIVERED) && | |
decider.shouldDeleteOldMessage(m, con.getOtherNode(getHost()))) { | |
/* final recipient has already received the msg -> delete it */ | |
// for Diffie - Hellman protocol, this line must be commented | |
// Diffie - Hellman prohibit deleted message after sent | |
// this.deleteMessage(m.getId(), false); | |
} return retVal; | |
} | |
@Override | |
public int receiveMessage(Message m, DTNHost from) { | |
if(isDeliveredMessage(m) || (tombstoning && tombstones.contains(m.getId()))) return DENIED_DELIVERED; | |
return super.receiveMessage(m, from); | |
} | |
@Override | |
public Message messageTransferred(String id, DTNHost from) { | |
Message incoming = removeFromIncomingBuffer(id, from); | |
if (incoming == null) { // no incoming connection | |
throw new SimError("No message with ID " + id + " in the incoming buffer of " + getHost()); | |
} | |
incoming.setReceiveTime(SimClock.getTime()); | |
Message outgoing = incoming; | |
for (Application app : getApplications(incoming.getAppID())) { | |
// Note that the order of applications is significant | |
// since the next one gets the output of the previous. | |
outgoing = app.handle(outgoing, getHost()); | |
if (outgoing == null) break; // Some app wanted to drop the message | |
} | |
Message aMessage = (outgoing==null)?(incoming):(outgoing); | |
// boolean isFinalRecipient = decider.isFinalDest(aMessage, getHost()); | |
boolean isFinalRecipient = true; | |
boolean isFirstDelivery = isFinalRecipient && !isDeliveredMessage(aMessage); | |
if (outgoing!=null && decider.shouldSaveReceivedMessage(aMessage, getHost())) { | |
// not the final recipient and app doesn't want to drop the message | |
// -> put to buffer | |
addToMessages(aMessage, false); | |
// Determine any other connections to which to forward a message | |
findConnectionsForNewMessage(aMessage, from); | |
} | |
if (isFirstDelivery) this.deliveredMessages.put(id, aMessage); | |
for (MessageListener ml : this.mListeners) ml.messageTransferred(aMessage, from, getHost(), isFirstDelivery); | |
return aMessage; | |
} | |
@Override | |
protected void transferDone(Connection con) { | |
Message transferred = this.getMessage(con.getMessage().getId()); | |
if (transferred != null) { | |
for (Iterator<Tuple<Message, Connection>> i = outgoingMessages.iterator(); i.hasNext(); ) { | |
Tuple<Message, Connection> t = i.next(); | |
if (t.getKey().getId().equals(transferred.getId()) && t.getValue().equals(con)) { | |
i.remove(); | |
break; | |
} | |
} | |
} | |
if (decider.shouldDeleteSentMessage(transferred, con.getOtherNode(getHost()))) { | |
this.deleteMessage(transferred.getId(), false); | |
} // remove transferred message | |
} | |
@Override | |
public void update() { | |
super.update(); | |
if (!canStartTransfer() || isTransferring()) return; | |
tryMessagesForConnected(outgoingMessages); | |
for (Iterator<Tuple<Message, Connection>> i = outgoingMessages.iterator(); i.hasNext();) { | |
Tuple<Message, Connection> t = i.next(); | |
if (!this.hasMessage(t.getKey().getId())) i.remove(); | |
} | |
} | |
@Override | |
public void deleteMessage(String id, boolean drop) { | |
super.deleteMessage(id, drop); | |
for (Iterator<Tuple<Message, Connection>> i = outgoingMessages.iterator(); i.hasNext();) { | |
Tuple <Message, Connection> t = i.next(); | |
if (t.getKey().getId().equals(id)) i.remove(); | |
} | |
} | |
public RoutingDecisionEngine getDecisionEngine() | |
{ | |
return this.decider; | |
} | |
protected boolean shouldNotifyPeer (Connection con) { | |
Integer i = conStates.get(con); | |
return i == null || i < 1; | |
} | |
protected void findConnectionsForNewMessage(Message m, DTNHost from) { | |
for (Connection c : getConnections()) { | |
DTNHost other = c.getOtherNode(getHost()); | |
if (other != from && decider.shouldSendMessageToHost(m, other)) { | |
outgoingMessages.add(new Tuple<Message, Connection>(m, c)); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment