Created
July 23, 2014 15:42
-
-
Save sixeyed/adb156ae3b91091e98a2 to your computer and use it in GitHub Desktop.
SoakController - a simple WebAPI controller for remote-controlling a Kafka soak test
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="utf-8"?> | |
<packages> | |
<package id="kafka-net" version="0.8.0.31-beta" targetFramework="net45" /> | |
</packages> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using KafkaNet; | |
using KafkaNet.Model; | |
using KafkaNet.Protocol; | |
using System; | |
using System.Collections.Generic; | |
using System.Configuration; | |
using System.Diagnostics; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using System.Web.Http; | |
namespace KafkaScratchpad.Api.Controllers | |
{ | |
public class SoakController : ApiController | |
{ | |
private static List<Task> _SoakTasks = new List<Task>(); | |
private static CancellationTokenSource _CancellationTokenSource = new CancellationTokenSource(); | |
private static int _MessageCount; | |
private static int _ProducerCount; | |
[Route("soak/start")] | |
[HttpPost] | |
public IHttpActionResult Start() | |
{ | |
Debug.WriteLine("SoakController.Start called"); | |
_SoakTasks.Add(Task.Factory.StartNew(() => SendMessages(_CancellationTokenSource.Token), _CancellationTokenSource.Token)); | |
return Ok(); | |
} | |
[Route("soak/stats")] | |
[HttpGet] | |
public IHttpActionResult GetStats() | |
{ | |
return Ok(new | |
{ | |
MessagesSent = _MessageCount, | |
ProducerCount = _ProducerCount | |
}); | |
} | |
[Route("soak/stopall")] | |
[HttpPost] | |
public IHttpActionResult StopAll() | |
{ | |
Debug.WriteLine("SoakController.StopAll called. Task Count: {0}, MessageCount: {1}", _SoakTasks.Count, _MessageCount); | |
_CancellationTokenSource.Cancel(); | |
_CancellationTokenSource = new CancellationTokenSource(); | |
_SoakTasks = new List<Task>(); | |
_MessageCount = 0; | |
_ProducerCount = 0; | |
return Ok(); | |
} | |
private static void SendMessages(CancellationToken token) | |
{ | |
try | |
{ | |
Interlocked.Increment(ref _ProducerCount); | |
var producerId = _ProducerCount; | |
Debug.WriteLine("SoakController.SendMessages - starting"); | |
var options = new KafkaOptions(new Uri(ConfigurationManager.AppSettings["KafkaUrl"])) | |
{ | |
Log = new DebugLog() | |
}; | |
var router = new BrokerRouter(options); | |
var client = new Producer(router); | |
var topic = ConfigurationManager.AppSettings["KafkaTopic"]; | |
Debug.WriteLine("SoakController.SendMessages - client initialised"); | |
while (true) | |
{ | |
Interlocked.Increment(ref _MessageCount); | |
var message = string.Format ("producer-{0}:msg-{1}:{2}", producerId, _MessageCount, Guid.NewGuid()); | |
Debug.WriteLine("SoakController.SendMessages - sending: " + message); | |
client.SendMessageAsync(topic, new[] { new Message { Value = message } }) | |
.ContinueWith(x => Debug.WriteLine("SoakController.SendMessages - got result, response count: {0}, first -partitionId: {1}, offset: {2}", x.Result.Count, x.Result[0].PartitionId, x.Result[0].Offset)); | |
Thread.Sleep(100); | |
if (token.IsCancellationRequested) | |
{ | |
break; | |
} | |
} | |
client.Dispose(); | |
router.Dispose(); | |
} | |
catch(Exception ex) | |
{ | |
Debug.WriteLine("SoakController.SendMessages errored. MessageCount: {0}, ex: {1}", _MessageCount, ex); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment