Last active
December 20, 2015 12:29
-
-
Save lucacervasio/6131446 to your computer and use it in GitHub Desktop.
Hercules is a CPE simulator (multiple client implementation in groovy with java nio socket select
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Hercules is a CPE simulator built by Luca Cervasio <[email protected]> | |
within mosesacs project (see http://mosesacs.org, [email protected]) | |
*/ | |
import java.util.concurrent.ArrayBlockingQueue | |
import java.nio.channels.Selector | |
import java.nio.channels.SelectionKey | |
import java.nio.channels.SocketChannel | |
import java.nio.ByteBuffer | |
import java.nio.charset.Charset | |
import java.nio.charset.CharsetDecoder | |
import java.nio.charset.CharsetEncoder | |
import java.nio.CharBuffer | |
class Hercules { | |
// parameters from command line or configuration file | |
int NUM_CPES = 20 | |
int NUM_PARALLEL_CPES = 8 | |
String acs_host = "cwmp.mosesacs.org" | |
int acs_port = 80 | |
String acs_url = '/' | |
String user_agent = "Hercules 0.3 by Luca Cervasio" | |
String ip = "10.19.0.2" // CPEs ip | |
int connect_port = 9600 | |
int periodic_interval = 120 | |
boolean debug = true | |
// global data structures | |
def cpes = [:] | |
public final ArrayBlockingQueue<QueueElement> queue = new ArrayBlockingQueue<QueueElement>(2 * NUM_CPES) | |
public synchronized int active_connections = 0 | |
Selector selector = null | |
Charset charset = Charset.forName("UTF-8") | |
CharsetDecoder decoder = charset.newDecoder() | |
CharsetEncoder encoder = charset.newEncoder() | |
// schedule a BOOTSTRAP event for all CPEs, then keep on yielding PERIODIC traffic | |
def scheduler = { | |
// schedule a bootstrap event for al CPEs | |
for (int i in 1..NUM_CPES) { | |
queue.add(new QueueElement(serial: i, event: '0 BOOTSTRAP')) | |
} | |
// schedule periodic | |
int cont = 0 | |
while(true) { | |
cont++ | |
if (cont == periodic_interval) { | |
cont = 0 | |
for (i in 1..NUM_CPES) { | |
queue.add(new QueueElement(serial: i, event: '2 PERIODIC')) | |
} | |
} | |
sleep(1000) // 1 second | |
} | |
} | |
// check for ACS CONNECTION REQUEST requests | |
//Thread.start { | |
//} | |
// start connection towards ACS | |
def connection_starter = { QueueElement work -> | |
println "Issueing new connection from CPE sn ${work.serial} and event ${work.event}" | |
SocketChannel sock = SocketChannel.open() | |
sock.configureBlocking(false) | |
//sock.socket().setTcpNoDelay(true) // ma serve ? | |
sock.connect(new InetSocketAddress(acs_host, acs_port)) // non blocking connect | |
sock.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ) // selector register | |
CPE t = cpes.get(work.serial) | |
t.socketNum = sock.hashCode() // store sockets in CPEs data structure | |
t.events = work.event // store which event to send | |
active_connections++ // increment active connections counter | |
} | |
def loop() { | |
this.selector = Selector.open() | |
// main loop checks sockets and shared queue | |
Thread.start { | |
while (true) { | |
if (active_connections == 0 && queue.size() == 0) { | |
// non ho connessioni e non ho messaggi in coda, attendo in coda in modo indefinito | |
QueueElement work = queue.take() | |
connection_starter(work) | |
} else if (active_connections == 0 || (active_connections > 0 && queue.size() > 0)) { | |
// non ho connessioni ma ci sono richieste, allora le scodo tutte | |
// oppure ho delle connessioni e ho anche delle richieste in coda: anche in questo caso le scodo tutte | |
// Per quanto riguarda l'operazione di scodamento continuo a farla finchè la coda è piena, fino | |
// ad avere non più di NUM_PARALLEL_CPES connessioni contemporanee e soprattutto non scodo | |
// mai più di 3 richieste di seguito, in modo tale da lasciare sempre lo spazio alla select per gestire | |
// i socket attivi. Se non ve ne fossero, la select esce dopo 0.5s e ritorno a questo if di controllo | |
while (queue.size() > 0 && active_connections < NUM_PARALLEL_CPES) { | |
QueueElement work = queue.take() | |
connection_starter(work) | |
} | |
} else { | |
// ho delle connessioni da gestire e nessuna richiesta in coda: vado direttamente alla select | |
} | |
try { | |
selector.select(200); // timeout a 200ms | |
for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) { | |
SelectionKey key = i.next(); | |
try { | |
i.remove(); | |
// trovo l'oggetto che afferisce a questo socket | |
CPE tmp | |
cpes.each { k,v -> | |
if (((CPE) v).socketNum == key.channel().hashCode()) | |
tmp = (CPE) v | |
} | |
if (key.isConnectable()) { | |
((SocketChannel)key.channel()).finishConnect() | |
String payload = """<?xml version="1.0" encoding="UTF-8"?><soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:soap-enc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema"><soap:Header/><soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"><cwmp:Inform><DeviceId><Manufacturer>PIRELLI BROADBAND SOLUTIONS</Manufacturer><OUI>0013C8</OUI><ProductClass>Router</ProductClass><SerialNumber>${tmp.serial}</SerialNumber></DeviceId><Event><EventStruct><EventCode>${tmp.events}</EventCode><CommandKey/></EventStruct></Event><MaxEnvelopes>1</MaxEnvelopes><CurrentTime>2003-01-01T05:36:55Z</CurrentTime><RetryCount>0</RetryCount><ParameterList soap-enc:arrayType="cwmp:ParameterValueStruct[7]"><ParameterValueStruct xsi:type="cwmp:ParameterValueStruct"><Name>InternetGatewayDevice.DeviceInfo.HardwareVersion</Name><Value xsi:type="xsd:string">NGRG 2009</Value></ParameterValueStruct><ParameterValueStruct xsi:type="cwmp:ParameterValueStruct"><Name>InternetGatewayDevice.DeviceInfo.ProvisioningCode</Name><Value xsi:type="xsd:string">ABCD</Value></ParameterValueStruct><ParameterValueStruct xsi:type="cwmp:ParameterValueStruct"><Name>InternetGatewayDevice.DeviceInfo.SoftwareVersion</Name><Value xsi:type="xsd:string">3_RGF09_06_00_00_0013</Value></ParameterValueStruct><ParameterValueStruct xsi:type="cwmp:ParameterValueStruct"><Name>InternetGatewayDevice.DeviceInfo.SpecVersion</Name><Value xsi:type="xsd:string">1.0</Value></ParameterValueStruct><ParameterValueStruct xsi:type="cwmp:ParameterValueStruct"><Name>InternetGatewayDevice.ManagementServer.ConnectionRequestURL</Name><Value xsi:type="xsd:string">http://10.19.0.2:9600/1</Value></ParameterValueStruct><ParameterValueStruct xsi:type="cwmp:ParameterValueStruct"><Name>InternetGatewayDevice.ManagementServer.ParameterKey</Name><Value xsi:type="xsd:string"/></ParameterValueStruct><ParameterValueStruct xsi:type="cwmp:ParameterValueStruct"><Name>InternetGatewayDevice.WANDevice.1.WANConnectionDevice.1.WANIPConnection.1.ExternalIPAddress</Name><Value xsi:type="xsd:string">10.19.0.2</Value></ParameterValueStruct></ParameterList></cwmp:Inform></soap:Body></soap:Envelope>""" | |
String request = """POST ${acs_url} HTTP/1.1 | |
Host: ${acs_host}:${acs_port} | |
User-Agent: ${user_agent} | |
Content-Length: ${payload.length()} | |
SOAPAction: | |
Content-Type: text/xml; charset=utf-8 | |
Connection: Keep-Alive | |
""" | |
((SocketChannel)key.channel()).write(encoder.encode(CharBuffer.wrap(request+payload))) | |
} else if (key.isReadable()) { | |
// Read what's ready in response | |
((SocketChannel)key.channel()).read(tmp.buffer) | |
tmp.buffer.flip() | |
// Decode buffer | |
decoder.decode(tmp.buffer, tmp.charBuffer, false) | |
// Display | |
tmp.charBuffer.flip() | |
//println charBuffer | |
if (tmp.charBuffer.toString().contains('204 No Content')) { | |
key.channel().close() | |
// ho chiuso il canale, un connessione attiva in meno | |
active_connections-- | |
} else { | |
if (tmp.charBuffer.toString().contains('GetParameterValues')) { | |
// sending GetParameterValuesResponse | |
def m = tmp.charBuffer.toString() =~ /Set-Cookie: MosesAcsSession=([\w|\d]+);/ | |
def cookieValue = m[0][1] | |
def response = """<?xml version="1.0" encoding="UTF-8"?><soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:soap-enc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:cwmp="urn:dslforum-org:cwmp-1-0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema"><soap:Header/><soap:Body soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"><cwmp:GetParameterValuesResponse><ParameterList soap-enc:arrayType="cwmp:ParameterValueStruct[20]"><ParameterValueStruct xsi:type="cwmp:ParameterValueStruct"> <Name>InternetGatewayDevice.DeviceInfo.DeviceJejeEnable</Name> <Value xsi:type="xsd:int"> Superpettine</Value> </ParameterValueStruct> </ParameterList> </cwmp:GetParameterValuesResponse></soap:Body></soap:Envelope>""" | |
// sending empty post | |
String request = """POST ${acs_url} HTTP/1.1 | |
Host: ${acs_host}:${acs_port} | |
User-Agent: ${user_agent} | |
Content-Length: ${response.length()} | |
SOAPAction: | |
Cookie: MosesAcsSession=${cookieValue} | |
Content-Type: text/xml; charset=utf-8 | |
Connection: Keep-Alive | |
""" | |
((SocketChannel)key.channel()).write(encoder.encode(CharBuffer.wrap(request+response))) | |
} else { | |
// sending empty post | |
def m = tmp.charBuffer.toString() =~ /Set-Cookie: MosesAcsSession=([\w|\d]+);/ | |
def cookieValue = m[0][1] | |
String request = """POST ${acs_url} HTTP/1.1 | |
Host: ${acs_host}:${acs_port} | |
User-Agent: ${user_agent} | |
Content-Length: 0 | |
SOAPAction: | |
Cookie: MosesAcsSession=${cookieValue} | |
Content-Type: text/xml; charset=utf-8 | |
Connection: Keep-Alive | |
""" | |
((SocketChannel)key.channel()).write(encoder.encode(CharBuffer.wrap(request))) | |
} | |
} | |
// Clear for next pass | |
tmp.buffer.clear() | |
tmp.charBuffer.clear() | |
} | |
} catch (IOException ioe) { | |
throw new RuntimeException("IO Exception "+ioe.getMessage()) | |
} | |
} | |
} catch (Throwable e) { | |
// throw new RuntimeException("Server failure: "+e.getMessage()); | |
println "Connection error" | |
} | |
} | |
} | |
} | |
def go() { | |
// initializing CPEs data structure | |
for (i in 1..50) { | |
cpes[i.toString()] = new CPE ( | |
serial: i, | |
connectionRequest: "http://${ip}:${connect_port}/${i}", | |
online: false, | |
manufacturer: 'PIRELLI BROADBAND SOLUTIONS', | |
softwareVersion: '3_RGF09_09_00_00_0013', | |
buffer: ByteBuffer.allocateDirect(1024), | |
charBuffer: CharBuffer.allocate(1024) | |
) | |
} | |
this.loop() | |
// infinite looping in scheduler function | |
this.scheduler() | |
} | |
public static void main(String[] args) { | |
Hercules hercules = new Hercules() | |
hercules.go() | |
} | |
} | |
class QueueElement { | |
String serial | |
String event | |
} | |
// this class represents a CPE | |
class CPE { | |
String serial | |
String connectionRequest | |
boolean online | |
String manufacturer | |
String softwareVersion | |
int socketNum | |
String events | |
ByteBuffer buffer | |
CharBuffer charBuffer | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment