Skip to content

Instantly share code, notes, and snippets.

@MiloszKrajewski
Last active June 14, 2017 13:11
Show Gist options
  • Select an option

  • Save MiloszKrajewski/3395589fdf06bfe3c8e58bf10ae1cc3f to your computer and use it in GitHub Desktop.

Select an option

Save MiloszKrajewski/3395589fdf06bfe3c8e58bf10ae1cc3f to your computer and use it in GitHub Desktop.
private static IObservable<T> Observe<T>(
this NetMQSocket socket,
ISocketPollableCollection poller,
Func<NetMQSocket, T> reader)
{
IEnumerable<T> ReadMany(NetMQSocket sckt)
{
while (sckt.HasIn)
yield return reader(sckt);
}
return Observable.FromEventPattern<NetMQSocketEventArgs>(
h => {
socket.ReceiveReady += h;
poller.Add(socket);
},
h => {
poller.Remove(socket);
socket.ReceiveReady -= h;
})
.SelectMany(e => ReadMany(e.EventArgs.Socket))
.Publish().RefCount();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment