Last active
December 27, 2015 08:59
-
-
Save hellojinjie/7300093 to your computer and use it in GitHub Desktop.
TrafficGenerator.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.collection.mutable.LinkedList | |
import scala.collection.mutable.ListBuffer | |
import scala.compat._ | |
import java.io._ | |
import java.net._ | |
import java.util._ | |
import scala.util.Random | |
import java.nio.charset.Charset | |
import java.util.concurrent._ | |
import java.util.concurrent.atomic._ | |
import com.typesafe.config._ | |
object TrafficGenerator { | |
val conf = ConfigFactory.load(); | |
val clientCount = conf.getInt("tg.clientCount") | |
val receiverURL = conf.getStringList("tg.receiverURL") | |
val sendThreadNum = conf.getInt("tg.sendThreadNum") | |
val mode = conf.getString("tg.mode") | |
val siteIDs = conf.getStringList("tg.siteID") | |
val sender = if ("immortal".equals(mode)) new ImmortalViewSender else new SineViewSender | |
val rand = new Random | |
val queue = new Array[ListBuffer[Client]](30) | |
val messageQueue = new LinkedBlockingQueue[Client] | |
val sendPermit = new Semaphore(0) | |
val roundFinish = new Semaphore(0) | |
var roundStartSecond = Platform.currentTime | |
var tStartSecond =Platform.currentTime / 1000 | |
def main(args: Array[String]) { | |
println("Run in mode " + mode + " with client count " + clientCount + " sendThreadNum " + sendThreadNum) | |
println("receiverURL " + receiverURL) | |
println("site " + siteIDs) | |
generateClient | |
1 to sendThreadNum foreach { num => new Thread(new Grunt(num)).start} | |
while (true) { | |
tStartSecond = Platform.currentTime / 1000 | |
sender.send | |
val tEndSecond = Platform.currentTime / 1000 | |
val lag = tStartSecond + 30 - tEndSecond | |
if (lag > 0) { | |
println("Ah. I'm " + lag + " seconds advanced, I'm going to have a rest.") | |
TimeUnit.SECONDS.sleep(lag) | |
} else if (lag == 0 ){ | |
println("Yeah, just in time.") | |
} else { | |
println("Oops, I'm " + (-lag) + " seconds lag, I am trying...") | |
} | |
} | |
} | |
def generateClient = { | |
for (i <- 0 until 29 ) { | |
queue(i) = new ListBuffer[Client] | |
for (j <- 1 to clientCount / 30) { | |
val client = new Client | |
client.init | |
queue(i) append (client) | |
} | |
} | |
queue(29) = new ListBuffer[Client] | |
for (k <- 1 to (clientCount - (clientCount / 30) * 29)) { | |
val client = new Client | |
client.init | |
queue(29) append (client) | |
} | |
} | |
def getRandomIP: String = { | |
return "" + (rand.nextInt(254) + 1) + "." + (rand.nextInt(254) + 1) + "." + (rand.nextInt(254) + 1) + "." + (rand.nextInt(254) + 1) | |
} | |
} | |
trait Sender { | |
def send | |
def beforeRound { | |
TrafficGenerator.sendPermit.release(TrafficGenerator.sendThreadNum) | |
TrafficGenerator.roundStartSecond = Platform.currentTime | |
} | |
def endRound(size: Integer) { | |
1 to TrafficGenerator.sendThreadNum foreach { _ => { | |
val syncClient = new Client | |
syncClient.nextSecond = true | |
TrafficGenerator.messageQueue.put(syncClient) | |
} | |
} | |
TrafficGenerator.roundFinish.acquire(TrafficGenerator.sendThreadNum) | |
val roundEndSecond = Platform.currentTime | |
if (roundEndSecond / 1000 == TrafficGenerator.roundStartSecond / 1000) { | |
TimeUnit.MILLISECONDS.sleep((1000 - roundEndSecond + TrafficGenerator.roundStartSecond - 8)) | |
} | |
println("" + (Platform.currentTime / 1000)+ " send " + size) | |
} | |
} | |
// y = midline / 2 * sin(x / 900 * PI) + midline | |
class SineViewSender extends Sender { | |
var firstSecond = 0l | |
override def send { | |
if (firstSecond == 0) { | |
firstSecond = Platform.currentTime / 1000 | |
} | |
val x = Platform.currentTime / 1000 - firstSecond | |
val y = TrafficGenerator.clientCount / 2 * math.sin(x.toDouble / 600.0 * math.Pi) + TrafficGenerator.clientCount | |
val roundCount = (y / 30).toInt | |
for (i <- TrafficGenerator.queue) { | |
beforeRound | |
val delta = roundCount - i.size | |
if (delta > 0) { | |
for (j <- 1 to delta) { | |
val client = new Client | |
client.init | |
i append (client) | |
} | |
} else if (delta < 0) { | |
val index = TrafficGenerator.rand.nextInt(roundCount) | |
for (clientToStop <- i.view(index, index - delta)) { | |
clientToStop.stop | |
TrafficGenerator.messageQueue.put(clientToStop) | |
} | |
i.remove(index, -delta) | |
} | |
for (client <- i) { | |
TrafficGenerator.messageQueue.put(client) | |
} | |
endRound(i.size) | |
} | |
} | |
} | |
class ImmortalViewSender extends Sender { | |
override def send { | |
for (i <- TrafficGenerator.queue) { | |
beforeRound | |
for (client <- i) { | |
TrafficGenerator.messageQueue.put(client) | |
} | |
endRound(i.size) | |
} | |
} | |
} | |
class Grunt(num: Integer) extends Runnable { | |
var socket = new Socket() | |
val r = new URL(TrafficGenerator.receiverURL.get(num % TrafficGenerator.receiverURL.size)) | |
val address = new InetSocketAddress(r.getHost(), r.getPort()) | |
val headBytes = Array.concat(("POST " + r.getPath() + " HTTP/1.1\n").getBytes(), ("Host: " + r.getHost() + "\n").getBytes(), "Content-Type: application/x-www-form-urlencoded\n".getBytes()) | |
def run() = { | |
println("I'am sending to " + r) | |
try { | |
socket.connect(address) | |
} catch { | |
case e: Exception => { | |
println("Error while connect to " + address.toString) | |
} | |
} | |
while (true) { | |
TrafficGenerator.sendPermit.acquire | |
var flag = true | |
while (flag) { | |
val client = TrafficGenerator.messageQueue.take | |
if (client.nextSecond) { | |
TrafficGenerator.roundFinish.release | |
flag = false | |
} else { | |
try { | |
send(client) | |
} catch { | |
case e: Exception => { | |
println("Error while sending to " + address.toString) | |
TrafficGenerator.messageQueue.put(client) | |
TimeUnit.SECONDS.sleep(1) | |
try { | |
socket = new Socket() | |
socket.connect(address) | |
} catch { | |
case e: Exception => { | |
println("Error while reconnect to " + address.toString) | |
} | |
} | |
} | |
} | |
} | |
} | |
} | |
} | |
def send(client: Client) = { | |
try { | |
var url = client.urlPartial + "&msgID=" + client.msgID + "&bytesLoaded=" + client.bytesLoaded + "&playTime=" + client.playtime | |
client.msgID = client.msgID + 1 | |
client.playtime = client.playtime + 1 | |
client.bytesLoaded += client.bytesLoadedDelta | |
val body = url.getBytes(Charset.forName("UTF-8")) | |
val os = socket.getOutputStream() | |
os.write(headBytes) | |
os.write(("Content-Length: " + body.length + "\n\n").getBytes()) | |
os.write(body) | |
os.write("\n".getBytes()) | |
// we should not flush here, message object is too small | |
//os.flush() | |
val buffer = new Array[Byte](1024) | |
socket.getInputStream().read(buffer) | |
} catch { | |
case e: Exception => { | |
println(e.getMessage) | |
throw new RuntimeException(e) | |
} | |
} | |
} | |
} | |
class Client { | |
def init() = { | |
val content = Dimension.contents(TrafficGenerator.rand.nextInt(Dimension.contents.length)) | |
urlPartial = "appType=" + Dimension.appTypes(TrafficGenerator.rand.nextInt(Dimension.appTypes.length)) | |
urlPartial += "&bitrate=" + Dimension.bitrates(TrafficGenerator.rand.nextInt(Dimension.bitrates.length)) | |
urlPartial += "&streamLength=11057" | |
urlPartial += "&streamURL=" + content.streamURL | |
urlPartial += "&progType=home" | |
urlPartial += "&streamDescription=" + content.streamDescription | |
urlPartial += "&dropFrameCount=1&os=Mac OS 10.6.8&productID=nhlgc&eventType=HEARTBEAT&gameDate=10/12/2013&" | |
urlPartial += "siteID=" + TrafficGenerator.siteIDs.get(TrafficGenerator.rand.nextInt(TrafficGenerator.siteIDs.size)) | |
urlPartial += "&windowMode=full normal&homeTeam=VAN&browserVersion=Safari 5.1&startupTime=1698&player=MAC 11,9,900,117&streamType=0&awayTeam=MON&bandwidth=12876&gameID=2013020073&updateInterval=30000&mockupIp=" | |
urlPartial += TrafficGenerator.getRandomIP | |
urlPartial += ("&viewID=" + UUID.randomUUID.toString) | |
urlPartial += ("&clientID=" + UUID.randomUUID.toString) | |
urlPartial += ("&cdnName=" + Dimension.cdnNames(TrafficGenerator.rand.nextInt(Dimension.cdnNames.length))) | |
urlPartial += ("&bytesLoadedDelta=100000") | |
} | |
def stop { | |
urlPartial = urlPartial.replace("HEARTBEAT", "STOP") | |
} | |
var nextSecond = false | |
var urlPartial = "" | |
var msgID = 0 | |
var bytesLoaded = 0 | |
var bytesLoadedDelta = 100000 | |
var playtime = 1 | |
} | |
case class Content ( | |
gameDate: String = "2013-11-04", | |
gameID: Long = 1, | |
homeTeam: String = "HDP", | |
awayTeam: String = "CDH", | |
streamURL: String = "http://CDHvsHDP.live.m3u8", | |
streamDescription: String = "CDH -- HDP", | |
productID: String = "hd", | |
programID: Long = 3, | |
streamType: Short = 0 | |
) | |
object Dimension { | |
val bitrates = Array[Integer](1000, 2000, 3000, 3000, 4000, 5000) | |
val appTypes = Array[String]("desktop","desktop", "iphone", "android_phone", "ipad", "ipad", "ipad", "android_pad", "xbox") | |
val cdnNames = Array[String]("nlds12.cdnl3nl.ooo.com", "cdnak", "cdnak", "cdncd", "cdncd", "cdncd", "cdncd") | |
val contents = Array[Content]( | |
new Content(), | |
new Content(streamDescription="JIN vs KJI", streamURL="http://JINvsKJI.live.m3u8", programID=1), | |
new Content(streamDescription="JIE vs KJI", streamURL="http://JHNvsKJI.live.m3u8", programID=14), | |
new Content(streamDescription="JRN vs KJI", streamURL="http://HINvsKJI.live.m3u8", programID=10), | |
new Content(streamDescription="JFN vs KJI", streamURL="http://JQNvsKJI.live.m3u8", programID=15), | |
new Content(streamDescription="DIN vs KJI", streamURL="http://JTNvsKJI.live.m3u8", programID=17), | |
new Content(streamDescription="VIN vs KJI", streamURL="http://JQNvsKJI.live.m3u8", programID=41), | |
new Content(streamDescription="JXN vs KJI", streamURL="http://JWNvsKJI.live.m3u8", programID=71), | |
new Content(streamDescription="DDN vs KJI", streamURL="http://JGNvsKJI.live.m3u8", programID=81), | |
new Content(streamDescription="AIN vs KJI", streamURL="http://DINvsKJI.live.m3u8", programID=21), | |
new Content(streamDescription="GIN vs KJI", streamURL="http://FFNvsKJI.live.m3u8", programID=1), | |
new Content(streamDescription="RIN vs KJI", streamURL="http://JFNvsKJI.live.m3u8", programID=187), | |
new Content(streamDescription="EIN vs KJI", streamURL="http://JFNvsKJI.live.m3u8", programID=144), | |
new Content(streamDescription="HIN vs KJI", streamURL="http://EINvsKJI.live.m3u8", programID=143), | |
new Content(streamDescription="JIN vs ERF", streamURL="http://JINvsERF.live.m3u8", programID=1), | |
new Content(streamDescription="JIE vs ERF", streamURL="http://JHNvsERF.live.m3u8", programID=14), | |
new Content(streamDescription="JRN vs ERF", streamURL="http://HINvsERF.live.m3u8", programID=10), | |
new Content(streamDescription="JFN vs ERF", streamURL="http://JQNvsERF.live.m3u8", programID=15), | |
new Content(streamDescription="DIN vs ERF", streamURL="http://JTNvsERF.live.m3u8", programID=17), | |
new Content(streamDescription="VIN vs ERF", streamURL="http://JQNvsERF.live.m3u8", programID=41), | |
new Content(streamDescription="JXN vs ERF", streamURL="http://JWNvsERF.live.m3u8", programID=71), | |
new Content(streamDescription="DDN vs ERF", streamURL="http://JGNvsERF.live.m3u8", programID=81), | |
new Content(streamDescription="AIN vs ERF", streamURL="http://DINvsERF.live.m3u8", programID=21), | |
new Content(streamDescription="GIN vs ERF", streamURL="http://FFNvsERF.live.m3u8", programID=1), | |
new Content(streamDescription="RIN vs ERF", streamURL="http://JFNvsERF.live.m3u8", programID=187), | |
new Content(streamDescription="EIN vs ERF", streamURL="http://JFNvsERF.live.m3u8", programID=144), | |
new Content(streamDescription="HIN vs ERF", streamURL="http://EINvsERF.live.m3u8", programID=143), | |
new Content(streamDescription="JIN vs EWD", streamURL="http://JINvsEWD.live.m3u8", programID=6), | |
new Content(streamDescription="JIE vs EWD", streamURL="http://JHNvsEWD.live.m3u8", programID=64), | |
new Content(streamDescription="JRN vs EWD", streamURL="http://HINvsEWD.live.m3u8", programID=60), | |
new Content(streamDescription="JFN vs EWD", streamURL="http://JQNvsEWD.live.m3u8", programID=65), | |
new Content(streamDescription="DIN vs EWD", streamURL="http://JTNvsEWD.live.m3u8", programID=67), | |
new Content(streamDescription="VIN vs EWD", streamURL="http://JQNvsEWD.live.m3u8", programID=46), | |
new Content(streamDescription="JXN vs EWD", streamURL="http://JWNvsEWD.live.m3u8", programID=76), | |
new Content(streamDescription="DDN vs EWD", streamURL="http://JGNvsEWD.live.m3u8", programID=86), | |
new Content(streamDescription="AIN vs EWD", streamURL="http://DINvsEWD.live.m3u8", programID=26), | |
new Content(streamDescription="GIN vs EWD", streamURL="http://FFNvsEWD.live.m3u8", programID=6), | |
new Content(streamDescription="RIN vs EWD", streamURL="http://JFNvsEWD.live.m3u8", programID=687), | |
new Content(streamDescription="EIN vs EWD", streamURL="http://JFNvsEWD.live.m3u8", programID=644), | |
new Content(streamDescription="HIN vs EWD", streamURL="http://EINvsEWD.live.m3u8", programID=643), | |
new Content(streamDescription="JIN vs EWD", streamURL="http://JINvsEWD.vod.m3u8", programID=6), | |
new Content(streamDescription="JIE vs EWD", streamURL="http://JHNvsEWD.vod.m3u8", programID=64), | |
new Content(streamDescription="JRN vs EWD", streamURL="http://HINvsEWD.vod.m3u8", programID=60), | |
new Content(streamDescription="JFN vs EWD", streamURL="http://JQNvsEWD.vod.m3u8", programID=65), | |
new Content(streamDescription="DIN vs EWD", streamURL="http://JTNvsEWD.vod.m3u8", programID=67), | |
new Content(streamDescription="VIN vs EWD", streamURL="http://JQNvsEWD.vod.m3u8", programID=46), | |
new Content(streamDescription="JXN vs EWD", streamURL="http://JWNvsEWD.vod.m3u8", programID=76), | |
new Content(streamDescription="DDN vs EWD", streamURL="http://JGNvsEWD.vod.m3u8", programID=86), | |
new Content(streamDescription="AIN vs EWD", streamURL="http://DINvsEWD.vod.m3u8", programID=26), | |
new Content(streamDescription="GIN vs EWD", streamURL="http://FFNvsEWD.vod.m3u8", programID=6), | |
new Content(streamDescription="RIN vs EWD", streamURL="http://JFNvsEWD.vod.m3u8", programID=687), | |
new Content(streamDescription="EIN vs EWD", streamURL="http://JFNvsEWD.vod.m3u8", programID=644), | |
new Content(streamDescription="HIN vs EWD", streamURL="http://EINvsEWD.vod.m3u8", programID=643) | |
) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment