Created
July 18, 2013 18:12
-
-
Save mfcollins3/6031590 to your computer and use it in GitHub Desktop.
Sample TPL Dataflow block that shows a custom dataflow block for Neuron ESB parties. The NeuronEsbBlock class can be used either as a source block or a target block in a dataflow pipeline.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Threading.Tasks; | |
using System.Threading.Tasks.Dataflow; | |
using Neuron.Esb; | |
public sealed class NeuronEsbBlock : IDisposable, | |
IPropagatorBlock<ESBMessage, ESBMessage>, | |
IReceivableSourceBlock<ESBMessage> | |
{ | |
private readonly IReceivableSourceBlock<ESBMessage> innerSourceBlock; | |
private readonly ITargetBlock<ESBMessage> innerTargetBlock; | |
private readonly Party party; | |
private bool disposed; | |
public NeuronEsbBlock( | |
SubscriberConfiguration clientConfig, SubscriberOptions options) | |
{ | |
this.party = new Party(clientConfig, options); | |
var actionBlock = new ActionBlock<ESBMessage>( | |
message => this.party.SendMessage(message)); | |
this.innerTargetBlock = actionBlock; | |
var bufferBlock = new BufferBlock<ESBMessage>(); | |
this.innerSourceBlock = bufferBlock; | |
actionBlock.Completion.ContinueWith( | |
t => | |
{ | |
if (t.IsFaulted) | |
{ | |
((IDataflowBlock)bufferBlock).Fault(t.Exception); | |
} | |
else | |
{ | |
bufferBlock.Complete(); | |
} | |
}); | |
this.party.OnReceive += (sender, e) => | |
{ | |
if (null != e.Exception) | |
{ | |
this.Fault(e.Exception); | |
} | |
else | |
{ | |
bufferBlock.Post(e.Message); | |
} | |
}; | |
} | |
~NeuronEsbBlock() | |
{ | |
this.Dispose(false); | |
} | |
public Task Completion | |
{ | |
get | |
{ | |
return this.innerSourceBlock.Completion; | |
} | |
} | |
public void Complete() | |
{ | |
this.innerTargetBlock.Complete(); | |
} | |
public void Connect() | |
{ | |
this.party.Connect(); | |
} | |
public void Dispose() | |
{ | |
this.Dispose(true); | |
GC.SuppressFinalize(this); | |
} | |
public void Fault(Exception exception) | |
{ | |
this.innerTargetBlock.Fault(exception); | |
} | |
bool IReceivableSourceBlock<ESBMessage>.TryReceive( | |
Predicate<ESBMessage> filter, out ESBMessage item) | |
{ | |
return this.innerSourceBlock.TryReceive(filter, out item); | |
} | |
bool IReceivableSourceBlock<ESBMessage>.TryReceiveAll(out IList<ESBMessage> items) | |
{ | |
return this.innerSourceBlock.TryReceiveAll(out items); | |
} | |
ESBMessage ISourceBlock<ESBMessage>.ConsumeMessage( | |
DataflowMessageHeader messageHeader, ITargetBlock<ESBMessage> target, | |
out bool messageConsumed) | |
{ | |
return this.innerSourceBlock.ConsumeMessage( | |
messageHeader, target, out messageConsumed); | |
} | |
IDisposable ISourceBlock<ESBMessage>.LinkTo( | |
ITargetBlock<ESBMessage> target, DataflowLinkOptions linkOptions) | |
{ | |
return this.innerSourceBlock.LinkTo(target, linkOptions); | |
} | |
void ISourceBlock<ESBMessage>.ReleaseReservation( | |
DataflowMessageHeader messageHeader, ITargetBlock<ESBMessage> target) | |
{ | |
this.innerSourceBlock.ReleaseReservation(messageHeader, target); | |
} | |
bool ISourceBlock<ESBMessage>.ReserveMessage( | |
DataflowMessageHeader messageHeader, ITargetBlock<ESBMessage> target) | |
{ | |
return this.innerSourceBlock.ReserveMessage(messageHeader, target); | |
} | |
DataflowMessageStatus ITargetBlock<ESBMessage>.OfferMessage( | |
DataflowMessageHeader messageHeader, | |
ESBMessage messageValue, | |
ISourceBlock<ESBMessage> source, | |
bool consumeToAccept) | |
{ | |
return this.innerTargetBlock.OfferMessage( | |
messageHeader, messageValue, source, consumeToAccept); | |
} | |
private void Dispose(bool disposing) | |
{ | |
if (this.disposed) | |
{ | |
return; | |
} | |
this.party.Dispose(); | |
if (!disposing) | |
{ | |
return; | |
} | |
this.disposed = true; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading.Tasks.Dataflow; | |
using Neuron.Esb; | |
internal class Program | |
{ | |
private static void Main() | |
{ | |
var publisherConfig = new SubscriberConfiguration( | |
"Publisher1", "Enterprise", "net.tcp://localhost:50000", null); | |
var subscriberConfig = new SubscriberConfiguration( | |
"Subscriber1", "Enterprise", "net.tcp://localhost:50000", null); | |
using (var publisherBlock = new NeuronEsbBlock(publisherConfig, SubscriberOptions.None)) | |
using (var subscriberBlock = new NeuronEsbBlock(subscriberConfig, SubscriberOptions.None)) | |
{ | |
var outputBlock = new ActionBlock<ESBMessage>( | |
message => Console.Out.WriteLine("RECEIVED: {0}", message.Text)); | |
using (subscriberBlock.LinkTo(outputBlock)) | |
{ | |
publisherBlock.Completion.ContinueWith(t => subscriberBlock.Complete()); | |
subscriberBlock.Completion.ContinueWith(t => outputBlock.Complete()); | |
publisherBlock.Connect(); | |
subscriberBlock.Connect(); | |
Console.Error.WriteLine("Enter the messages to publish."); | |
var line = Console.In.ReadLine(); | |
while (!string.IsNullOrEmpty(line)) | |
{ | |
publisherBlock.Post(new ESBMessage("Topic1", line)); | |
line = Console.In.ReadLine(); | |
} | |
publisherBlock.Complete(); | |
outputBlock.Completion.Wait(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment