Created
June 19, 2020 14:39
-
-
Save korchoon/8aa3902ddb55d7b8b0ebb6e43e8a2646 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.Linq; | |
using System.Reflection; | |
using MessagePack; | |
using Mk.Routines; | |
using Unity.Collections; | |
using Unity.Networking.Transport; | |
using UnityEngine; | |
using Debug = UnityEngine.Debug; | |
// serializable for debugging purposes | |
[Serializable] | |
class BroadcastPeer : IDisposable { | |
const string ScriptsAssembly = "Assembly-CSharp"; | |
// public - for debugging | |
public List<Routine> _incomingRoutines; | |
public Routine _outRoutine; | |
public ushort IncomingPort; | |
// internal state | |
int _count; | |
List<NetworkConnection> _incoming; | |
// arrays and their index list | |
List<int> _outIndices; | |
NetworkDriver[] _outDrivers; | |
NetworkConnection[] _outConnections; | |
public int[] __debugOutPorts; | |
NetworkDriver _outgoingDriver; | |
public BroadcastPeer(ushort startPort, int count) { | |
_outIndices = new List<int>(); | |
_incomingRoutines = new List<Routine>(); | |
_outDrivers = new NetworkDriver[count]; | |
_outConnections = new NetworkConnection[count]; | |
__debugOutPorts = new int[count]; | |
_count = count; | |
Application.runInBackground = true; | |
_outgoingDriver = NetworkDriver.Create(); | |
var endpoint = NetworkEndPoint.LoopbackIpv4; | |
var success = false; | |
for (ushort i = 0; i < count; i++) { | |
endpoint.Port = (ushort) (startPort + i); | |
success = _outgoingDriver.Bind(endpoint) == 0; | |
if (!success) continue; | |
IncomingPort = endpoint.Port; | |
break; | |
} | |
Asr.IsTrue(success, $"Failed to bind to ports [{startPort}, {startPort + _count}]"); | |
_outgoingDriver.Listen(); | |
_outRoutine = Repeat(() => Hub()); | |
for (int i = 0; i < count; i++) { | |
ushort port = (ushort) (startPort + i); | |
if (port == IncomingPort) | |
continue; | |
var copy = port; | |
var routine = Repeat(() => Outgoing(copy)); | |
_incomingRoutines.Add(routine); | |
} | |
InitBindings(); | |
} | |
public void OnReceive<T>(Action<T> callback) where T : ILocalhostMsg { | |
AssertMsgPack(typeof(T)); | |
GetBinding<T>().Subscribe(callback); | |
} | |
void AssertMsgPack(Type msgType) { | |
foreach (var attribute in msgType.CustomAttributes) { | |
if (attribute.AttributeType == typeof(MessagePackObjectAttribute)) | |
return; | |
} | |
Asr.Fail($"'{msgType.Name}' must have 'MessagePackObject' attribute"); | |
} | |
public void Send<T>(T msg, Action<T> _dbg) where T : ILocalhostMsg { | |
AssertMsgPack(typeof(T)); | |
var binding = GetBinding<T>(); | |
var typeId = binding.TypeId; | |
var bytes = MessagePackSerializer.Serialize(msg); | |
using (var arr = new NativeArray<byte>(bytes, Allocator.Temp)) | |
foreach (var i in _outIndices) { | |
Send(typeId, arr, ref _outDrivers[i], ref _outConnections[i]); | |
_dbg?.Invoke(msg); | |
} | |
} | |
public void SetHeaders(Action<DataStreamWriter> write, Action<DataStreamReader> read) { | |
_writeHeader += write; | |
_readHeader += read; | |
} | |
void Send(byte typeId, NativeArray<byte> bytes, ref NetworkDriver dr, ref NetworkConnection con) { | |
Asr.IsTrue(dr.IsCreated); | |
Asr.IsTrue(con.IsCreated); | |
if (dr.GetConnectionState(con) != NetworkConnection.State.Connected) | |
return; | |
var writer = dr.BeginSend(con); | |
try { | |
_writeHeader?.Invoke(writer); | |
writer.WriteByte(typeId); | |
writer.WriteByte((byte) bytes.Length); | |
writer.WriteBytes(bytes); | |
} | |
finally { | |
dr.EndSend(writer); | |
} | |
} | |
void Send<T>(T msg, ref NetworkDriver dr, ref NetworkConnection con) where T : ILocalhostMsg { | |
var binding = GetBinding<T>(); | |
var typeId = binding.TypeId; | |
var bytes = MessagePackSerializer.Serialize(msg); | |
using (var arr = new NativeArray<byte>(bytes, Allocator.Temp)) | |
Send(typeId, arr, ref dr, ref con); | |
} | |
// Fetch new messages, publish to subscribers | |
public void Update() { | |
_outRoutine.Update(); | |
foreach (var c in _incomingRoutines) | |
c.Update(); | |
} | |
static async Routine Repeat(Func<Routine> retry) { | |
while (true) | |
await retry.Invoke(); | |
} | |
async Routine Hub() { | |
_incoming = new List<NetworkConnection>(); | |
(await Routine.SelfScope()).Subscribe(_Dispose); | |
void _Dispose() { | |
foreach (var networkConnection in _incoming) | |
networkConnection.Disconnect(_outgoingDriver); | |
_incoming.Clear(); | |
} | |
while (true) { | |
_outgoingDriver.ScheduleUpdate().Complete(); | |
// AcceptNewConnections | |
NetworkConnection c; | |
while ((c = _outgoingDriver.Accept()) != default) { | |
_incoming.Add(c); | |
} | |
for (var i = 0; i < _incoming.Count; i++) { | |
var conn = _incoming[i]; | |
if (!conn.IsCreated) continue; | |
NetworkEvent.Type cmd; | |
while ((cmd = _outgoingDriver.PopEventForConnection(conn, out var stream)) != NetworkEvent.Type.Empty) { | |
switch (cmd) { | |
case NetworkEvent.Type.Data: { | |
ReceiveAndPublish(ref stream); | |
break; | |
} | |
case NetworkEvent.Type.Disconnect: | |
_incoming[i] = default; | |
break; | |
case NetworkEvent.Type.Empty: | |
break; | |
case NetworkEvent.Type.Connect: | |
Read(ref stream, out HandShake shake); | |
if (shake.ProcessId == Process.GetCurrentProcess().Id) { | |
_outgoingDriver.Disconnect(conn); | |
} | |
break; | |
} | |
} | |
} | |
await Routine.Yield; | |
} | |
} | |
async Routine Outgoing(ushort port) { | |
var driver = NetworkDriver.Create(new NetworkConfigParameter() { | |
maxConnectAttempts = NetworkParameterConstants.MaxConnectAttempts, | |
connectTimeoutMS = NetworkParameterConstants.ConnectTimeoutMS, | |
disconnectTimeoutMS = Int32.MaxValue, | |
maxFrameTimeMS = 0 | |
}); | |
var id = _outIndices.Count; | |
_outIndices.Add(id); | |
_outDrivers[id] = driver; | |
var endpoint = NetworkEndPoint.LoopbackIpv4; | |
endpoint.Port = port; | |
var networkConnection = driver.Connect(endpoint); | |
_outConnections[id] = networkConnection; | |
__debugOutPorts[id] = port; | |
(await Routine.SelfScope()).Subscribe(_Dispose); | |
void _Dispose() { | |
_outIndices.Remove(id); | |
_outDrivers[id] = default; | |
_outConnections[id] = default; | |
networkConnection.Disconnect(driver); | |
driver.Dispose(); | |
} | |
// wait for Connect msg type | |
while (true) { | |
driver.ScheduleUpdate().Complete(); | |
NetworkEvent.Type cmd; | |
while ((cmd = driver.PopEventForConnection(networkConnection, out var t)) != NetworkEvent.Type.Empty) | |
if (cmd == NetworkEvent.Type.Connect) { | |
Send(new HandShake() {ProcessId = Process.GetCurrentProcess().Id}, ref driver, ref networkConnection); | |
goto CONNECTED; | |
} | |
await Routine.Yield; | |
} | |
CONNECTED: | |
while (true) { | |
NetworkEvent.Type cmd; | |
while ((cmd = driver.PopEventForConnection(networkConnection, out var stream)) != NetworkEvent.Type.Empty) { | |
switch (cmd) { | |
case NetworkEvent.Type.Data: { | |
ReceiveAndPublish(ref stream); | |
break; | |
} | |
case NetworkEvent.Type.Disconnect: | |
return; | |
} | |
} | |
await Routine.Yield; | |
driver.ScheduleUpdate().Complete(); | |
} | |
} | |
public void Dispose() { | |
_outRoutine.Dispose(); | |
foreach (var c in _incomingRoutines) | |
c.Dispose(); | |
_outgoingDriver.Dispose(); | |
} | |
Action<DataStreamWriter> _writeHeader; | |
Action<DataStreamReader> _readHeader; | |
Dictionary<byte, TypeBindingBase> _byteToBinding; | |
Dictionary<Type, TypeBindingBase> _typeToBinding; | |
void InitBindings() { | |
_byteToBinding = new Dictionary<byte, TypeBindingBase>(); | |
_typeToBinding = new Dictionary<Type, TypeBindingBase>(); | |
var methodInfo = GetType().GetMethod(nameof(AddBinding), BindingFlags.Instance | BindingFlags.NonPublic); | |
var meth = methodInfo.GetGenericMethodDefinition(); | |
var types = Assembly.Load(ScriptsAssembly) | |
.DefinedTypes | |
.Where(info => typeof(ILocalhostMsg).IsAssignableFrom(info) && !info.IsInterface) | |
.Select((ti, i) => ti.AsType()) | |
.ToArray(); | |
Asr.IsTrue(types.Length > 0); | |
for (byte index = 0; index < types.Length; index++) { | |
var type = types[index]; | |
var tt = meth.MakeGenericMethod(type); | |
tt.Invoke(this, new object[] {index}); | |
} | |
} | |
// used in reflection call | |
[UnityEngine.Scripting.Preserve] | |
void AddBinding<T>(byte typeId) where T : ILocalhostMsg { | |
var type = typeof(T); | |
var binding = new TypeBinding<T>(typeId); | |
_byteToBinding.Add(typeId, binding); | |
_typeToBinding.Add(type, binding); | |
} | |
TypeBinding<T> GetBinding<T>() where T : ILocalhostMsg { | |
if (_typeToBinding.TryGetValue(typeof(T), out var pubSub)) | |
return (TypeBinding<T>) pubSub; | |
Asr.Fail($"{typeof(T).Name} msg binding not found"); | |
return default; | |
} | |
void Read<T>(ref DataStreamReader reader, out T result) { | |
var s = TryRead(ref reader, out result); | |
Asr.IsTrue(s); | |
} | |
bool TryRead<T>(ref DataStreamReader reader, out T value) { | |
_readHeader?.Invoke(reader); | |
var typeId = reader.ReadByte(); | |
var size = reader.ReadByte(); | |
if (_byteToBinding.TryGetValue(typeId, out var objBind) && objBind is TypeBinding<T> bind) { | |
value = bind.Read(ref reader, size); | |
return true; | |
} | |
// just read, don't publish | |
using (var buf = new NativeArray<byte>(size, Allocator.Temp)) | |
reader.ReadBytes(buf); | |
value = default; | |
return false; | |
} | |
void ReceiveAndPublish(ref DataStreamReader reader) { | |
_readHeader?.Invoke(reader); | |
var typeId = reader.ReadByte(); | |
var size = reader.ReadByte(); | |
if (_byteToBinding.TryGetValue(typeId, out var b)) { | |
b.ReadAndPublish(ref reader, size); | |
} | |
else { | |
Debug.LogWarning($"unknown code: {typeId} "); | |
// just read, don't publish | |
using (var buf = new NativeArray<byte>(size, Allocator.Temp)) | |
reader.ReadBytes(buf); | |
} | |
} | |
abstract class TypeBindingBase { | |
public abstract void ReadAndPublish(ref DataStreamReader reader, byte size); | |
} | |
static byte[] _buf = new byte[0]; | |
class TypeBinding<T> : TypeBindingBase, IDisposable { | |
public byte TypeId; | |
Action<T> _subscribers; | |
public TypeBinding(byte typeId) { | |
TypeId = typeId; | |
} | |
public void Subscribe(Action<T> subscriber) { | |
_subscribers += subscriber; | |
} | |
public T Read(ref DataStreamReader reader, byte size) { | |
using (var buf = new NativeArray<byte>(size, Allocator.Temp)) { | |
reader.ReadBytes(buf); | |
Array.Resize(ref _buf, size); | |
buf.CopyTo(_buf); | |
return MessagePackSerializer.Deserialize<T>(_buf); | |
} | |
} | |
public override void ReadAndPublish(ref DataStreamReader reader, byte size) { | |
T msg; | |
using (var buf = new NativeArray<byte>(size, Allocator.Temp)) { | |
reader.ReadBytes(buf); | |
Array.Resize(ref _buf, size); | |
buf.CopyTo(_buf); | |
msg = MessagePackSerializer.Deserialize<T>(_buf); | |
} | |
_subscribers?.Invoke(msg); | |
} | |
public void Dispose() { | |
_subscribers = null; | |
} | |
} | |
[MessagePackObject()] | |
public struct HandShake : ILocalhostMsg { | |
[Key(0)] public int ProcessId; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment