Skip to content

Instantly share code, notes, and snippets.

@korchoon
Created June 19, 2020 14:39
Show Gist options
  • Save korchoon/8aa3902ddb55d7b8b0ebb6e43e8a2646 to your computer and use it in GitHub Desktop.
Save korchoon/8aa3902ddb55d7b8b0ebb6e43e8a2646 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using MessagePack;
using Mk.Routines;
using Unity.Collections;
using Unity.Networking.Transport;
using UnityEngine;
using Debug = UnityEngine.Debug;
// serializable for debugging purposes
[Serializable]
class BroadcastPeer : IDisposable {
const string ScriptsAssembly = "Assembly-CSharp";
// public - for debugging
public List<Routine> _incomingRoutines;
public Routine _outRoutine;
public ushort IncomingPort;
// internal state
int _count;
List<NetworkConnection> _incoming;
// arrays and their index list
List<int> _outIndices;
NetworkDriver[] _outDrivers;
NetworkConnection[] _outConnections;
public int[] __debugOutPorts;
NetworkDriver _outgoingDriver;
public BroadcastPeer(ushort startPort, int count) {
_outIndices = new List<int>();
_incomingRoutines = new List<Routine>();
_outDrivers = new NetworkDriver[count];
_outConnections = new NetworkConnection[count];
__debugOutPorts = new int[count];
_count = count;
Application.runInBackground = true;
_outgoingDriver = NetworkDriver.Create();
var endpoint = NetworkEndPoint.LoopbackIpv4;
var success = false;
for (ushort i = 0; i < count; i++) {
endpoint.Port = (ushort) (startPort + i);
success = _outgoingDriver.Bind(endpoint) == 0;
if (!success) continue;
IncomingPort = endpoint.Port;
break;
}
Asr.IsTrue(success, $"Failed to bind to ports [{startPort}, {startPort + _count}]");
_outgoingDriver.Listen();
_outRoutine = Repeat(() => Hub());
for (int i = 0; i < count; i++) {
ushort port = (ushort) (startPort + i);
if (port == IncomingPort)
continue;
var copy = port;
var routine = Repeat(() => Outgoing(copy));
_incomingRoutines.Add(routine);
}
InitBindings();
}
public void OnReceive<T>(Action<T> callback) where T : ILocalhostMsg {
AssertMsgPack(typeof(T));
GetBinding<T>().Subscribe(callback);
}
void AssertMsgPack(Type msgType) {
foreach (var attribute in msgType.CustomAttributes) {
if (attribute.AttributeType == typeof(MessagePackObjectAttribute))
return;
}
Asr.Fail($"'{msgType.Name}' must have 'MessagePackObject' attribute");
}
public void Send<T>(T msg, Action<T> _dbg) where T : ILocalhostMsg {
AssertMsgPack(typeof(T));
var binding = GetBinding<T>();
var typeId = binding.TypeId;
var bytes = MessagePackSerializer.Serialize(msg);
using (var arr = new NativeArray<byte>(bytes, Allocator.Temp))
foreach (var i in _outIndices) {
Send(typeId, arr, ref _outDrivers[i], ref _outConnections[i]);
_dbg?.Invoke(msg);
}
}
public void SetHeaders(Action<DataStreamWriter> write, Action<DataStreamReader> read) {
_writeHeader += write;
_readHeader += read;
}
void Send(byte typeId, NativeArray<byte> bytes, ref NetworkDriver dr, ref NetworkConnection con) {
Asr.IsTrue(dr.IsCreated);
Asr.IsTrue(con.IsCreated);
if (dr.GetConnectionState(con) != NetworkConnection.State.Connected)
return;
var writer = dr.BeginSend(con);
try {
_writeHeader?.Invoke(writer);
writer.WriteByte(typeId);
writer.WriteByte((byte) bytes.Length);
writer.WriteBytes(bytes);
}
finally {
dr.EndSend(writer);
}
}
void Send<T>(T msg, ref NetworkDriver dr, ref NetworkConnection con) where T : ILocalhostMsg {
var binding = GetBinding<T>();
var typeId = binding.TypeId;
var bytes = MessagePackSerializer.Serialize(msg);
using (var arr = new NativeArray<byte>(bytes, Allocator.Temp))
Send(typeId, arr, ref dr, ref con);
}
// Fetch new messages, publish to subscribers
public void Update() {
_outRoutine.Update();
foreach (var c in _incomingRoutines)
c.Update();
}
static async Routine Repeat(Func<Routine> retry) {
while (true)
await retry.Invoke();
}
async Routine Hub() {
_incoming = new List<NetworkConnection>();
(await Routine.SelfScope()).Subscribe(_Dispose);
void _Dispose() {
foreach (var networkConnection in _incoming)
networkConnection.Disconnect(_outgoingDriver);
_incoming.Clear();
}
while (true) {
_outgoingDriver.ScheduleUpdate().Complete();
// AcceptNewConnections
NetworkConnection c;
while ((c = _outgoingDriver.Accept()) != default) {
_incoming.Add(c);
}
for (var i = 0; i < _incoming.Count; i++) {
var conn = _incoming[i];
if (!conn.IsCreated) continue;
NetworkEvent.Type cmd;
while ((cmd = _outgoingDriver.PopEventForConnection(conn, out var stream)) != NetworkEvent.Type.Empty) {
switch (cmd) {
case NetworkEvent.Type.Data: {
ReceiveAndPublish(ref stream);
break;
}
case NetworkEvent.Type.Disconnect:
_incoming[i] = default;
break;
case NetworkEvent.Type.Empty:
break;
case NetworkEvent.Type.Connect:
Read(ref stream, out HandShake shake);
if (shake.ProcessId == Process.GetCurrentProcess().Id) {
_outgoingDriver.Disconnect(conn);
}
break;
}
}
}
await Routine.Yield;
}
}
async Routine Outgoing(ushort port) {
var driver = NetworkDriver.Create(new NetworkConfigParameter() {
maxConnectAttempts = NetworkParameterConstants.MaxConnectAttempts,
connectTimeoutMS = NetworkParameterConstants.ConnectTimeoutMS,
disconnectTimeoutMS = Int32.MaxValue,
maxFrameTimeMS = 0
});
var id = _outIndices.Count;
_outIndices.Add(id);
_outDrivers[id] = driver;
var endpoint = NetworkEndPoint.LoopbackIpv4;
endpoint.Port = port;
var networkConnection = driver.Connect(endpoint);
_outConnections[id] = networkConnection;
__debugOutPorts[id] = port;
(await Routine.SelfScope()).Subscribe(_Dispose);
void _Dispose() {
_outIndices.Remove(id);
_outDrivers[id] = default;
_outConnections[id] = default;
networkConnection.Disconnect(driver);
driver.Dispose();
}
// wait for Connect msg type
while (true) {
driver.ScheduleUpdate().Complete();
NetworkEvent.Type cmd;
while ((cmd = driver.PopEventForConnection(networkConnection, out var t)) != NetworkEvent.Type.Empty)
if (cmd == NetworkEvent.Type.Connect) {
Send(new HandShake() {ProcessId = Process.GetCurrentProcess().Id}, ref driver, ref networkConnection);
goto CONNECTED;
}
await Routine.Yield;
}
CONNECTED:
while (true) {
NetworkEvent.Type cmd;
while ((cmd = driver.PopEventForConnection(networkConnection, out var stream)) != NetworkEvent.Type.Empty) {
switch (cmd) {
case NetworkEvent.Type.Data: {
ReceiveAndPublish(ref stream);
break;
}
case NetworkEvent.Type.Disconnect:
return;
}
}
await Routine.Yield;
driver.ScheduleUpdate().Complete();
}
}
public void Dispose() {
_outRoutine.Dispose();
foreach (var c in _incomingRoutines)
c.Dispose();
_outgoingDriver.Dispose();
}
Action<DataStreamWriter> _writeHeader;
Action<DataStreamReader> _readHeader;
Dictionary<byte, TypeBindingBase> _byteToBinding;
Dictionary<Type, TypeBindingBase> _typeToBinding;
void InitBindings() {
_byteToBinding = new Dictionary<byte, TypeBindingBase>();
_typeToBinding = new Dictionary<Type, TypeBindingBase>();
var methodInfo = GetType().GetMethod(nameof(AddBinding), BindingFlags.Instance | BindingFlags.NonPublic);
var meth = methodInfo.GetGenericMethodDefinition();
var types = Assembly.Load(ScriptsAssembly)
.DefinedTypes
.Where(info => typeof(ILocalhostMsg).IsAssignableFrom(info) && !info.IsInterface)
.Select((ti, i) => ti.AsType())
.ToArray();
Asr.IsTrue(types.Length > 0);
for (byte index = 0; index < types.Length; index++) {
var type = types[index];
var tt = meth.MakeGenericMethod(type);
tt.Invoke(this, new object[] {index});
}
}
// used in reflection call
[UnityEngine.Scripting.Preserve]
void AddBinding<T>(byte typeId) where T : ILocalhostMsg {
var type = typeof(T);
var binding = new TypeBinding<T>(typeId);
_byteToBinding.Add(typeId, binding);
_typeToBinding.Add(type, binding);
}
TypeBinding<T> GetBinding<T>() where T : ILocalhostMsg {
if (_typeToBinding.TryGetValue(typeof(T), out var pubSub))
return (TypeBinding<T>) pubSub;
Asr.Fail($"{typeof(T).Name} msg binding not found");
return default;
}
void Read<T>(ref DataStreamReader reader, out T result) {
var s = TryRead(ref reader, out result);
Asr.IsTrue(s);
}
bool TryRead<T>(ref DataStreamReader reader, out T value) {
_readHeader?.Invoke(reader);
var typeId = reader.ReadByte();
var size = reader.ReadByte();
if (_byteToBinding.TryGetValue(typeId, out var objBind) && objBind is TypeBinding<T> bind) {
value = bind.Read(ref reader, size);
return true;
}
// just read, don't publish
using (var buf = new NativeArray<byte>(size, Allocator.Temp))
reader.ReadBytes(buf);
value = default;
return false;
}
void ReceiveAndPublish(ref DataStreamReader reader) {
_readHeader?.Invoke(reader);
var typeId = reader.ReadByte();
var size = reader.ReadByte();
if (_byteToBinding.TryGetValue(typeId, out var b)) {
b.ReadAndPublish(ref reader, size);
}
else {
Debug.LogWarning($"unknown code: {typeId} ");
// just read, don't publish
using (var buf = new NativeArray<byte>(size, Allocator.Temp))
reader.ReadBytes(buf);
}
}
abstract class TypeBindingBase {
public abstract void ReadAndPublish(ref DataStreamReader reader, byte size);
}
static byte[] _buf = new byte[0];
class TypeBinding<T> : TypeBindingBase, IDisposable {
public byte TypeId;
Action<T> _subscribers;
public TypeBinding(byte typeId) {
TypeId = typeId;
}
public void Subscribe(Action<T> subscriber) {
_subscribers += subscriber;
}
public T Read(ref DataStreamReader reader, byte size) {
using (var buf = new NativeArray<byte>(size, Allocator.Temp)) {
reader.ReadBytes(buf);
Array.Resize(ref _buf, size);
buf.CopyTo(_buf);
return MessagePackSerializer.Deserialize<T>(_buf);
}
}
public override void ReadAndPublish(ref DataStreamReader reader, byte size) {
T msg;
using (var buf = new NativeArray<byte>(size, Allocator.Temp)) {
reader.ReadBytes(buf);
Array.Resize(ref _buf, size);
buf.CopyTo(_buf);
msg = MessagePackSerializer.Deserialize<T>(_buf);
}
_subscribers?.Invoke(msg);
}
public void Dispose() {
_subscribers = null;
}
}
[MessagePackObject()]
public struct HandShake : ILocalhostMsg {
[Key(0)] public int ProcessId;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment