Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save kenanhancer/8a024397e985a34236b2bbe1edc921c8 to your computer and use it in GitHub Desktop.
Save kenanhancer/8a024397e985a34236b2bbe1edc921c8 to your computer and use it in GitHub Desktop.
using StackExchange.Redis;
using System;
using System.Threading.Tasks;
namespace CSharp_From_EAP_To_TAP_With_Redis_PubSub
{
class Program
{
static SimphonyService _simphonyService;
static BroccoliService _broccoliService;
const string _requestChannel = "SimphonyRequestChannel";
const string _responseChannel = "SimphonyResponseChannel";
static void Main(string[] args)
{
IConnectionMultiplexer _connection1 = ConnectionMultiplexer.Connect("localhost:6379");
IConnectionMultiplexer _connection2 = ConnectionMultiplexer.Connect("localhost:6379");
_simphonyService = new SimphonyService(_connection1, _requestChannel);
_broccoliService = new BroccoliService(_connection2, _requestChannel, _responseChannel);
_simphonyService.ListenRequests();
_broccoliService.ListenResponses();
MakeParallelRequest();
Console.WriteLine("Hello world!");
Console.ReadKey();
}
public static void MakeParallelRequest()
{
Action<int> act1 = async (int id) =>
{
string productName1 = await _broccoliService.GetMenu(id);
Console.WriteLine($"Product Name : {productName1} from invocation {id}");
};
Parallel.For(0, 10, i => act1(i));
}
}
}
using Newtonsoft.Json;
using StackExchange.Redis;
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
namespace CSharp_From_EAP_To_TAP_With_Redis_PubSub
{
public class BroccoliService
{
private IConnectionMultiplexer _connection;
private ISubscriber _subscriber;
private ConcurrentDictionary<Guid, Action<PubSubResponseMessage>> _messageCallbacks = new ConcurrentDictionary<Guid, Action<PubSubResponseMessage>>();
private string _requestChannel;
private string _responseChannel;
public BroccoliService(IConnectionMultiplexer connection, string requestChannel, string responseChannel)
{
_connection = connection;
_requestChannel = requestChannel;
_responseChannel = responseChannel;
_subscriber = _connection.GetSubscriber();
}
public void ListenResponses()
{
_subscriber.Subscribe(_responseChannel, ResponseCallback);
}
private void ResponseCallback(RedisChannel chl, RedisValue msg)
{
PubSubResponseMessage response = JsonConvert.DeserializeObject<PubSubResponseMessage>(msg);
_messageCallbacks.TryGetValue(response.CorrelationId, out Action<PubSubResponseMessage> waitingCallback);
waitingCallback?.Invoke(response);
}
public async Task<string> GetMenu(int menuId)
{
PubSubRequestMessage req = new PubSubRequestMessage() { Value = menuId, ResponseChannel = _responseChannel };
TaskCompletionSource<string> tcs = new TaskCompletionSource<string>();
_messageCallbacks.TryAdd(req.CorrelationId, response => tcs.SetResult(response.ResponseValue));
long numberOfClients = await _subscriber.PublishAsync(_requestChannel, JsonConvert.SerializeObject(req));
return await tcs.Task;
}
}
}
using System;
namespace CSharp_From_EAP_To_TAP_With_Redis_PubSub
{
public abstract class PubSubCorrelatedMessage
{
public Guid CorrelationId { get; set; } = Guid.NewGuid();
public string ResponseChannel { get; set; }
}
}
namespace CSharp_From_EAP_To_TAP_With_Redis_PubSub
{
public class PubSubRequestMessage : PubSubCorrelatedMessage
{
public int Value { get; set; }
}
}
namespace CSharp_From_EAP_To_TAP_With_Redis_PubSub
{
public class PubSubResponseMessage : PubSubCorrelatedMessage
{
public string ResponseValue { get; set; }
}
}
using Newtonsoft.Json;
using StackExchange.Redis;
using System;
using System.Threading;
namespace CSharp_From_EAP_To_TAP_With_Redis_PubSub
{
public class SimphonyService
{
private IConnectionMultiplexer _connection;
private ISubscriber _subscriber;
private string _requestChannel;
public SimphonyService(IConnectionMultiplexer connection, string requestChannel)
{
_connection = connection;
_requestChannel = requestChannel;
_subscriber = connection.GetSubscriber();
}
public void ListenRequests()
{
_subscriber.Subscribe(_requestChannel, RequestCallback);
}
private void RequestCallback(RedisChannel chl, RedisValue msg)
{
PubSubRequestMessage requestMessage = JsonConvert.DeserializeObject<PubSubRequestMessage>(msg);
Thread.Sleep(100 + new Random().Next(500, 2000));
PubSubResponseMessage responseMessage = new PubSubResponseMessage()
{
CorrelationId = requestMessage.CorrelationId,
ResponseValue = $"Hello {requestMessage.Value}"
};
string message = JsonConvert.SerializeObject(responseMessage);
_subscriber.Publish(requestMessage.ResponseChannel, message);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment