-
-
Save bennage/c60ed468e99234d493b3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Net.Sockets; | |
using System.Data; | |
using System.Net; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using System.Reactive.Linq; | |
using System.Reactive.Subjects; | |
namespace RxWeb | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
var server = Task.Factory.StartNew(() => | |
{ | |
try | |
{ | |
//create a netactor on port 8081 | |
NetActor a = new NetActor(8081); | |
//write out to the console when you get sent data | |
a.Incoming.Subscribe(i => Console.Write(i)); | |
//create an observable from the console and bind it to the netactor | |
var o=Observable.Start<string>(()=>Console.ReadLine()); | |
//publish from the console to the actor | |
o.Subscribe<string>(a.Outgoing.OnNext); | |
} | |
catch (Exception exc) | |
{ | |
throw; | |
} | |
}); | |
while (true) | |
{ | |
; | |
} | |
} | |
} | |
/// <summary> | |
/// Binds to localhost. Pretty much 100% based on msdn code | |
/// </summary> | |
/// <seealso cref="http://msdn.microsoft.com/en-us/library/fx6588te.aspx"/> | |
public class NetActor | |
{ | |
//this socket is us | |
Socket _Listener; | |
//this socket is them | |
Socket _Client; | |
//Incoming messages from the client | |
private ISubject<string> _Incoming {get; set;} | |
//allow the ability to subscribe to incoming messages | |
public IObservable<string> Incoming | |
{ | |
get | |
{ | |
return _Incoming.AsObservable(); | |
} | |
} | |
//outgoing messages | |
public ISubject<string> Outgoing { get; private set; } | |
public NetActor(int port) | |
{ | |
_Incoming = new Subject<string>(); | |
Outgoing = new Subject<string>(); | |
_Listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); | |
_Listener.Bind(new IPEndPoint(IPAddress.Parse("127.0.0.1"), port)); | |
_Listener.Listen(20); | |
_Listener.BeginAccept((o)=>BeginAccept(o),_Listener); | |
} | |
private void BeginAccept(IAsyncResult ar) | |
{ | |
var listener = (Socket)ar.AsyncState; | |
//create a state that contains the ability to listen | |
var state=new StateObject() | |
{ | |
workSocket=_Listener.EndAccept(ar) | |
}; | |
//create the ability to push data out | |
Outgoing.Subscribe((s) => Send(state.workSocket, s)); | |
//classic microsoft code, gotta love [object] state | |
state.workSocket.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(BeginRead), state); | |
//lets try to override this with Observable.FromAsyncPattern.... | |
// worthless. this func returns just iasyncresult and not a tangible something... anything... | |
// like in the microsoft example docs. anger face | |
//var o = Observable.FromAsyncPattern<byte[], int, int, SocketFlags>( | |
// (byt, offset, size, flags, cb, st) => state.workSocket.BeginReceive(byt, offset, size, flags, cb, st), | |
// (a) => state.workSocket.EndReceive(a)); | |
} | |
private void BeginRead(IAsyncResult ar) | |
{ | |
try | |
{ | |
var state = (StateObject)ar.AsyncState; | |
int bytes = state.workSocket.EndReceive(ar); | |
if (bytes > 0) | |
{ | |
state.sb.Append(Encoding.ASCII.GetString(state.buffer, 0, bytes)); | |
var content = state.sb.ToString(); | |
_Incoming.OnNext(content); | |
if (content.IndexOf("<EOF>") != -1) | |
{ | |
_Incoming.OnCompleted(); | |
} | |
} | |
else | |
{ | |
state.workSocket.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(BeginRead), state); | |
} | |
} | |
catch (Exception exc) | |
{ | |
_Incoming.OnError(exc); | |
} | |
} | |
private void Send(Socket handler, String data) | |
{ | |
// Convert the string data to byte data using ASCII encoding. | |
byte[] byteData = Encoding.ASCII.GetBytes(data); | |
// Begin sending the data to the remote device. | |
handler.BeginSend(byteData, 0, byteData.Length, 0, | |
new AsyncCallback(SendCallback), handler); | |
} | |
private void SendCallback(IAsyncResult ar) | |
{ | |
try | |
{ | |
// Retrieve the socket from the state object. | |
Socket handler = (Socket)ar.AsyncState; | |
// Complete sending the data to the remote device. | |
int bytesSent = handler.EndSend(ar); | |
handler.Shutdown(SocketShutdown.Both); | |
handler.Close(); | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e.ToString()); | |
} | |
} | |
// State object for reading client data asynchronously | |
private class StateObject | |
{ | |
// Client socket. | |
public Socket workSocket = null; | |
// Size of receive buffer. | |
public const int BufferSize = 1024; | |
// Receive buffer. | |
public byte[] buffer = new byte[BufferSize]; | |
// Received data string. | |
public StringBuilder sb = new StringBuilder(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment