Last active
July 12, 2018 20:24
-
-
Save kenanhancer/8a024397e985a34236b2bbe1edc921c8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using StackExchange.Redis; | |
using System; | |
using System.Threading.Tasks; | |
namespace CSharp_From_EAP_To_TAP_With_Redis_PubSub | |
{ | |
class Program | |
{ | |
static SimphonyService _simphonyService; | |
static BroccoliService _broccoliService; | |
const string _requestChannel = "SimphonyRequestChannel"; | |
const string _responseChannel = "SimphonyResponseChannel"; | |
static void Main(string[] args) | |
{ | |
IConnectionMultiplexer _connection1 = ConnectionMultiplexer.Connect("localhost:6379"); | |
IConnectionMultiplexer _connection2 = ConnectionMultiplexer.Connect("localhost:6379"); | |
_simphonyService = new SimphonyService(_connection1, _requestChannel); | |
_broccoliService = new BroccoliService(_connection2, _requestChannel, _responseChannel); | |
_simphonyService.ListenRequests(); | |
_broccoliService.ListenResponses(); | |
MakeParallelRequest(); | |
Console.WriteLine("Hello world!"); | |
Console.ReadKey(); | |
} | |
public static void MakeParallelRequest() | |
{ | |
Action<int> act1 = async (int id) => | |
{ | |
string productName1 = await _broccoliService.GetMenu(id); | |
Console.WriteLine($"Product Name : {productName1} from invocation {id}"); | |
}; | |
Parallel.For(0, 10, i => act1(i)); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Newtonsoft.Json; | |
using StackExchange.Redis; | |
using System; | |
using System.Collections.Concurrent; | |
using System.Threading.Tasks; | |
namespace CSharp_From_EAP_To_TAP_With_Redis_PubSub | |
{ | |
public class BroccoliService | |
{ | |
private IConnectionMultiplexer _connection; | |
private ISubscriber _subscriber; | |
private ConcurrentDictionary<Guid, Action<PubSubResponseMessage>> _messageCallbacks = new ConcurrentDictionary<Guid, Action<PubSubResponseMessage>>(); | |
private string _requestChannel; | |
private string _responseChannel; | |
public BroccoliService(IConnectionMultiplexer connection, string requestChannel, string responseChannel) | |
{ | |
_connection = connection; | |
_requestChannel = requestChannel; | |
_responseChannel = responseChannel; | |
_subscriber = _connection.GetSubscriber(); | |
} | |
public void ListenResponses() | |
{ | |
_subscriber.Subscribe(_responseChannel, ResponseCallback); | |
} | |
private void ResponseCallback(RedisChannel chl, RedisValue msg) | |
{ | |
PubSubResponseMessage response = JsonConvert.DeserializeObject<PubSubResponseMessage>(msg); | |
_messageCallbacks.TryGetValue(response.CorrelationId, out Action<PubSubResponseMessage> waitingCallback); | |
waitingCallback?.Invoke(response); | |
} | |
public async Task<string> GetMenu(int menuId) | |
{ | |
PubSubRequestMessage req = new PubSubRequestMessage() { Value = menuId, ResponseChannel = _responseChannel }; | |
TaskCompletionSource<string> tcs = new TaskCompletionSource<string>(); | |
_messageCallbacks.TryAdd(req.CorrelationId, response => tcs.SetResult(response.ResponseValue)); | |
long numberOfClients = await _subscriber.PublishAsync(_requestChannel, JsonConvert.SerializeObject(req)); | |
return await tcs.Task; | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace CSharp_From_EAP_To_TAP_With_Redis_PubSub | |
{ | |
public class PubSubRequestMessage : PubSubCorrelatedMessage | |
{ | |
public int Value { get; set; } | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace CSharp_From_EAP_To_TAP_With_Redis_PubSub | |
{ | |
public class PubSubResponseMessage : PubSubCorrelatedMessage | |
{ | |
public string ResponseValue { get; set; } | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Newtonsoft.Json; | |
using StackExchange.Redis; | |
using System; | |
using System.Threading; | |
namespace CSharp_From_EAP_To_TAP_With_Redis_PubSub | |
{ | |
public class SimphonyService | |
{ | |
private IConnectionMultiplexer _connection; | |
private ISubscriber _subscriber; | |
private string _requestChannel; | |
public SimphonyService(IConnectionMultiplexer connection, string requestChannel) | |
{ | |
_connection = connection; | |
_requestChannel = requestChannel; | |
_subscriber = connection.GetSubscriber(); | |
} | |
public void ListenRequests() | |
{ | |
_subscriber.Subscribe(_requestChannel, RequestCallback); | |
} | |
private void RequestCallback(RedisChannel chl, RedisValue msg) | |
{ | |
PubSubRequestMessage requestMessage = JsonConvert.DeserializeObject<PubSubRequestMessage>(msg); | |
Thread.Sleep(100 + new Random().Next(500, 2000)); | |
PubSubResponseMessage responseMessage = new PubSubResponseMessage() | |
{ | |
CorrelationId = requestMessage.CorrelationId, | |
ResponseValue = $"Hello {requestMessage.Value}" | |
}; | |
string message = JsonConvert.SerializeObject(responseMessage); | |
_subscriber.Publish(requestMessage.ResponseChannel, message); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment