Last active
March 28, 2018 11:40
-
-
Save SzymonPobiega/d461c8b1e4257f754e736081c81ad885 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using NServiceBus; | |
using NServiceBus.Satellites; | |
using NServiceBus.Transports; | |
using NServiceBus.Transports.SQLServer; | |
using NServiceBus.Unicast; | |
using NServiceBus.Unicast.Transport; | |
namespace TaskStarvationTest | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
var config = new BusConfiguration(); | |
config.EndpointName("TaskStarvationTest"); | |
var transport = config.UseTransport<SqlServerTransport>(); | |
transport.ConnectionString(@"initial catalog=nservicebus; data source=.\sqlexpress; integrated security=true"); | |
config.UsePersistence<InMemoryPersistence>(); | |
var startableBus = Bus.Create(config); | |
var endpoint = startableBus.Start(); | |
var unicastBus = (UnicastBus) endpoint; | |
var satellite = unicastBus.Builder.Build<Satellite>(); | |
var notifications = unicastBus.Builder.Build<TransportNotifications>(); | |
//Delays start of the transport very 3rd time to trigger calling stop before the start sequence completes | |
notifications.ReceiveTaskStarted.Subscribe(new Observer()); | |
while (true) | |
{ | |
Console.WriteLine("Processing a message"); | |
endpoint.Send(Address.Parse("StartStopSatellite"), new MyMessage()); | |
satellite.Run(); | |
Thread.Sleep(100); | |
} | |
} | |
} | |
class Observer : IObserver<ReceiveTaskStarted> | |
{ | |
int counter; | |
public void OnNext(ReceiveTaskStarted value) | |
{ | |
var v = Interlocked.Increment(ref counter); | |
if (v % 3 == 0) | |
{ | |
Thread.Sleep(1000); | |
} | |
} | |
public void OnError(Exception error) | |
{ | |
} | |
public void OnCompleted() | |
{ | |
} | |
} | |
class MyMessage : IMessage | |
{ | |
} | |
class Satellite : IAdvancedSatellite | |
{ | |
DequeuerWrapper wrapper; | |
ManualResetEventSlim manualResetEvent = new ManualResetEventSlim(false); | |
public void Run() | |
{ | |
try | |
{ | |
manualResetEvent.Reset(); | |
wrapper.StartInternal(); | |
} | |
finally | |
{ | |
manualResetEvent.Wait(CancellationToken.None); | |
} | |
} | |
public bool Handle(TransportMessage message) | |
{ | |
Console.WriteLine("Got message"); | |
Task.Run(() => StopInternal()); | |
return true; | |
} | |
void StopInternal() | |
{ | |
wrapper.StopInternal(); | |
manualResetEvent.Set(); | |
} | |
public void Start() | |
{ | |
} | |
public void Stop() | |
{ | |
} | |
public Address InputAddress => Address.Parse("StartStopSatellite"); | |
public bool Disabled => false; | |
public Action<TransportReceiver> GetReceiverCustomization() | |
{ | |
return r => | |
{ | |
wrapper = new DequeuerWrapper(r.Receiver); | |
r.Receiver = wrapper; | |
}; | |
} | |
class DequeuerWrapper : IDequeueMessages | |
{ | |
readonly IDequeueMessages realDequeuer; | |
int maximumConcurrencyLevel; | |
int disposeSignaled; | |
public DequeuerWrapper(IDequeueMessages realDequeuer) | |
{ | |
this.realDequeuer = realDequeuer; | |
} | |
public void Init(Address address, TransactionSettings transactionSettings, Func<TransportMessage, bool> tryProcessMessage, Action<TransportMessage, Exception> endProcessMessage) | |
{ | |
realDequeuer.Init(address, transactionSettings, tryProcessMessage, endProcessMessage); | |
} | |
public void StartInternal() | |
{ | |
Interlocked.Exchange(ref disposeSignaled, 0); | |
realDequeuer.Start(maximumConcurrencyLevel); | |
} | |
public void Start(int maximumConcurrencyLevel) | |
{ | |
this.maximumConcurrencyLevel = maximumConcurrencyLevel; | |
} | |
public void Stop() | |
{ | |
} | |
public void StopInternal() | |
{ | |
if (Interlocked.Exchange(ref disposeSignaled, 1) != 0) | |
{ | |
return; | |
} | |
try | |
{ | |
realDequeuer.Stop(); | |
} | |
catch (Exception) | |
{ | |
// Making build go green. | |
var r = 1 + 1; | |
Interlocked.Increment(ref r); | |
// We are shutting down, race condition can result in an exception in the real dequeuer. | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment