Last active
February 15, 2017 14:12
-
-
Save 5alamander/d06df06f008f918e236ee0b9c53f328d to your computer and use it in GitHub Desktop.
初版csp_for_unity。TODO: 1,处理异常时,在unity线程和系统线程处理方式不一致,2.系统线程中无法使用 timeout,3.外部无法手动停止系统线程中的某个coroutine,4.只使用了1个外部线程。更新:使用linkedlist替代queue,闲置时挂起线程。
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections; | |
using System.Collections.Generic; | |
using UnityEngine; | |
using System.Threading; | |
namespace Sa1 { | |
/// <summary> | |
/// ins: do not use timeout in the csp.subthread | |
/// </summary> | |
public class csp : MonoBehaviour { | |
public class CSPException : Exception { | |
public CSPException(string str) : base("[Sa1.csp] " + str) { } | |
} | |
public class ChannelCloseException : CSPException { | |
public ChannelCloseException(string fstr) : base("Channel is closed: " + fstr) { } | |
} | |
public class CoroutineExitException : CSPException { | |
public CoroutineExitException() : base("Coroutine is sub-thread is closed") { } | |
} | |
private static bool _initialized = false; | |
private static void initialize () { | |
if (!_initialized && Application.isPlaying) { | |
_initialized = true; | |
var g = new GameObject("[Sa1.csp]"); | |
_instance = g.AddComponent<csp>(); | |
} | |
} | |
private static csp _instance; | |
public static csp Instance { | |
get { | |
initialize(); | |
return _instance; | |
} | |
} | |
private int _mainThreadId; | |
private readonly Queue _queuedUnityFunctor = Queue.Synchronized(new Queue()); | |
private bool _isRunning; | |
private int _runningCount; | |
private readonly Queue _queuedCoroutines = new Queue(); | |
private readonly LinkedList<IEnumerator> _runningCoroutinesList = new LinkedList<IEnumerator>(); | |
private void Awake () { | |
_isRunning = true; | |
_mainThreadId = Thread.CurrentThread.ManagedThreadId; | |
DontDestroyOnLoad(gameObject); | |
} | |
private void Start () { | |
go(_ => { | |
try { | |
excuteCoroutines(); | |
} | |
catch (Exception e) { | |
Debug.LogError("[csp.SubThread]" + e); | |
} | |
}); | |
} | |
private void Update () { | |
// do the functor on the delay queue, from sub to unity | |
while (_queuedUnityFunctor.Count > 0) { | |
var data = (BackToUnityData) _queuedUnityFunctor.Dequeue(); | |
go(data.behaviour, data.functor); | |
} | |
} | |
private void OnApplicationQuit () { | |
_isRunning = false; | |
} | |
private void addToSubCoroutineList (IEnumerator enumerator) { | |
lock (_queuedCoroutines) { | |
_queuedCoroutines.Enqueue(enumerator); | |
} | |
} | |
private void excuteCoroutines () { | |
while (_isRunning) { | |
_runningCount = _runningCoroutinesList.Count; // show in inspector | |
if (_runningCount > 0 || _queuedCoroutines.Count > 0) { | |
// adding -> running | |
lock (_queuedCoroutines) { | |
foreach (var functor in _queuedCoroutines) { | |
_runningCoroutinesList.AddLast(new LinkedListNode<IEnumerator>((IEnumerator) functor)); | |
} | |
_queuedCoroutines.Clear(); | |
} | |
for (var currentNode = _runningCoroutinesList.First; | |
currentNode != null; | |
currentNode = currentNode.Next) { | |
try { | |
IEnumerator functor = currentNode.Value; | |
if (functor.MoveNext()) { | |
var func = functor.Current as Func<MonoBehaviour>; | |
if (func != null) { | |
// if yield return csp.toUnity(behaviour), set to unity | |
var theBehaviour = func(); | |
Instance._queuedUnityFunctor.Enqueue( | |
new BackToUnityData(theBehaviour ? theBehaviour : Instance, functor)); | |
_runningCoroutinesList.Remove(currentNode); | |
// continue; // skip next tick | |
} | |
// _nextTempCoroutines.Enqueue(functor); // not skip next tick | |
} | |
else { | |
_runningCoroutinesList.Remove(currentNode); | |
} | |
} | |
catch (Exception e) { | |
Debug.LogError(e); | |
Debug.LogError(new CoroutineExitException()); | |
} | |
} | |
} | |
else { | |
Thread.Sleep(1); // suspend this thread, when this thread is idle | |
} | |
} | |
} | |
public static bool isInMainThread { | |
get { return Instance._mainThreadId == Thread.CurrentThread.ManagedThreadId; } | |
} | |
private static readonly object _toSub = new object(); | |
public static object toSub () { | |
return _toSub; | |
} | |
public static Func<MonoBehaviour> toUnity (MonoBehaviour behaviour = null) { | |
return () => behaviour; | |
} | |
private struct BackToUnityData { | |
public readonly MonoBehaviour behaviour; | |
public readonly IEnumerator functor; | |
public BackToUnityData (MonoBehaviour theBehaviour, IEnumerator thefunctor) { | |
behaviour = theBehaviour; | |
functor = thefunctor; | |
} | |
} | |
/// <summary> | |
/// the take action and alt action result wrapper | |
/// </summary> | |
public class Result { | |
private object _value; | |
public object value { | |
get { | |
if (exception != null) | |
throw exception; | |
return _value; | |
} | |
set { _value = value; } | |
} | |
public bool isTimeout { get; set; } | |
public bool closed { get; set; } // channel closed | |
public CSPException exception; | |
public void copy (Result other) { | |
_value = other._value; | |
isTimeout = other.isTimeout; | |
closed = other.closed; | |
exception = other.exception; | |
} | |
} | |
/// <summary> | |
/// Chan the specified n as the default buffer length, | |
/// the default buffer size is 0 | |
/// </summary> | |
public static Channel chan (int n = 0) { | |
return chan(n, Channel.BufferType.Fixed); | |
} | |
/// <summary> | |
/// Chan the specified n and bufferType. Type maybe changed at any time | |
/// </summary> | |
public static Channel chan (int n, Channel.BufferType bufferType) { | |
return new Channel(n, bufferType); | |
} | |
public static void go (Action<object> action, object state = null) { | |
ThreadPool.QueueUserWorkItem(new WaitCallback(action), state); | |
} | |
public static void go (IEnumerator functor) { | |
initialize(); | |
Instance.addToSubCoroutineList(new SubCoroutineWrapper(functor)); | |
} | |
/// <summary> | |
/// set to the global event loop and return the running channel | |
/// </summary> | |
public static Coroutine go (MonoBehaviour theBehaviour, IEnumerator functor) { | |
initialize(); | |
if (isInMainThread) { | |
// startCoroutine to run this functor | |
return theBehaviour.StartCoroutine(startCspGoroutine(functor)); | |
} | |
else { | |
Instance._queuedUnityFunctor.Enqueue(new BackToUnityData(theBehaviour, functor)); | |
return null; | |
} | |
} | |
private static IEnumerator startCspGoroutine (IEnumerator functor) { | |
while (functor.MoveNext()) { | |
if (functor.Current == _toSub) { | |
go(functor); | |
yield break; | |
} | |
if (functor.Current is Yield) { | |
var y = (Yield) functor.Current; | |
while (true) { | |
try { | |
if (!y.MoveNext()) { | |
break; // break this while in Yield | |
} | |
} | |
catch (Exception e) { | |
Debug.LogError(e); | |
yield break; // kill this coroutine | |
} | |
yield return y.Current; | |
} | |
} | |
else { | |
yield return functor.Current; | |
} | |
} | |
} | |
/// <summary> | |
/// Take from the specified chan. | |
/// </summary> | |
public static Yield take (out Result result, Channel channel) { | |
return new Channel.TakeYield(out result, channel); | |
} | |
/// <summary> | |
/// Put value to the channel | |
/// </summary> | |
public static Yield put (out Result result, Channel channel, object value) { | |
return new Channel.PutYield(out result, channel, value); | |
} | |
public static Yield put (Channel channel, object value) { | |
Result ret; | |
return new Channel.PutYield(out ret, channel, value); | |
} | |
public static Yield timeout (float time) { | |
return new Channel.TimeoutYield(time); | |
} | |
public static Yield alts (out Result result, params Yield[] ienumerators) { | |
return new Channel.AltsYield(out result, ienumerators); | |
} | |
public static Yield altsInTime (out Result result, float time, params Yield[] ienumerators) { | |
var list = new List<Yield>(ienumerators) {timeout(time)}; | |
return new Channel.AltsYield(out result, list); | |
} | |
/// <summary> | |
/// Asyncs the take. | |
/// </summary> | |
/// <param name="obj">Object.</param> | |
/// <param name="channel">Chan.</param> | |
/// <param name="cb">Cb.</param> | |
public static void takeAsync (MonoBehaviour obj, Channel channel, Action<Result> cb = null) { | |
go(obj, takeAsyncFunction(channel, cb)); | |
} | |
private static IEnumerator takeAsyncFunction (Channel channel, Action<Result> cb) { | |
Result ret; | |
yield return take(out ret, channel); | |
if (cb != null) cb(ret); | |
} | |
/// <summary> | |
/// Asyncs the put. | |
/// </summary> | |
/// <param name="obj">Object.</param> | |
/// <param name="channel">Chan.</param> | |
/// <param name="value">Value.</param> | |
/// <param name="cb">Cb.</param> | |
public static void putAsync (MonoBehaviour obj, Channel channel, object value, Action<Result> cb = null) { | |
go(obj, putAsyncFunctor(channel, value, cb)); | |
} | |
private static IEnumerator putAsyncFunctor (Channel channel, object value, Action<Result> cb) { | |
Result ret; | |
yield return put(out ret, channel, value); | |
if (cb != null) cb(ret); | |
} | |
public static bool offer (Channel channel, object value) { | |
return channel.offerValue(value); | |
} | |
public static bool poll (Channel channel, out object outValue) { | |
return channel.pollValue(out outValue); | |
} | |
/// <summary> | |
/// Yield. instruction for channel to use | |
/// </summary> | |
public class Yield : IEnumerator { | |
public Result yieldResult { get; private set; } | |
protected Channel _channel; | |
public Yield (Channel chan) { | |
_channel = chan; | |
yieldResult = new Result(); | |
} | |
public Yield (out Result result, Channel chan) { | |
_channel = chan; | |
yieldResult = result = new Result(); | |
} | |
public object Current { get; private set; } | |
public virtual bool MoveNext () { | |
return false; // default end the IEnumerator | |
} | |
public void yieldResultApply () { | |
Current = yieldResult.value; | |
} | |
public void Reset () {} | |
} | |
public class Channel : Yield { | |
public enum BufferType { | |
Fixed, Dropping, Sliding | |
} | |
//public int dirtyGet { get; private set; } | |
//public int dirtyPut { get; private set; } | |
public readonly bool buffered; | |
public readonly BufferType bufferType; | |
private readonly Queue _buffer; | |
private readonly int _maxBufferSize; | |
#region lock these data | |
private readonly object thisLock = new object(); | |
public object fixedValue { get; private set; } | |
public Yield thePutYield { get; private set; } | |
public bool closed { get; private set; } | |
#endregion | |
public Channel (int n, BufferType type) : base(null) { | |
closed = false; | |
_maxBufferSize = n; | |
bufferType = type; | |
if (n > 0) { | |
buffered = true; | |
_buffer = Queue.Synchronized(new Queue(n)); // thread safe | |
} | |
} | |
public override bool MoveNext () { | |
try { | |
object value; | |
var hasNext = !pollValue(out value); | |
yieldResult.value = value; | |
yieldResultApply(); | |
return hasNext; | |
} | |
catch (ChannelCloseException e) { | |
yieldResult.closed = true; | |
yieldResult.exception = e; | |
} | |
yieldResultApply(); | |
return false; | |
} | |
public bool isEmpty { | |
get { return _buffer.Count == 0; } | |
} | |
public bool isfull { | |
get { return _buffer.Count >= _maxBufferSize; } | |
} | |
public void close () { | |
lock (thisLock) { | |
closed = true; | |
} | |
} | |
private bool tryOfferValue (object value) { | |
if (closed) | |
throw new ChannelCloseException("tryOfferValue"); | |
if (buffered) { | |
if (isfull) { | |
return false; // block | |
} | |
_buffer.Enqueue(value); | |
return true; | |
} | |
if (fixedValue != null) { | |
return false; // block | |
} | |
lock (thisLock) { | |
fixedValue = value; | |
} | |
return true; | |
} | |
public bool offerValue (object value) { | |
if (closed) | |
throw new ChannelCloseException("offerValue"); | |
var tryOffered = tryOfferValue(value); | |
if (tryOffered) return true; | |
if (bufferType == BufferType.Sliding) { | |
if (buffered) { | |
_buffer.Dequeue(); | |
_buffer.Enqueue(value); | |
} | |
else { | |
lock (thisLock) { | |
fixedValue = value; | |
} | |
} | |
return true; | |
} | |
// else if Dropping, fixed | |
return false; | |
} | |
public bool pollValue (out object outValue) { | |
bool ret; | |
outValue = null; | |
if (buffered) { | |
if (isEmpty) { | |
ret = false; // no value | |
} | |
else { | |
outValue = _buffer.Dequeue(); | |
ret = true; | |
} | |
} | |
else { | |
if (fixedValue == null) { | |
ret = false; // no value | |
} | |
else { | |
lock (thisLock) { | |
outValue = fixedValue; | |
fixedValue = null; | |
thePutYield = null; | |
} | |
ret = true; | |
} | |
} | |
if (!ret && closed) { | |
throw new ChannelCloseException("pollValue"); | |
} | |
return ret; | |
} | |
public void setFixedValue (object value, PutYield putYield) { | |
lock (thisLock) { | |
fixedValue = value; | |
thePutYield = putYield; | |
} | |
} | |
// *** the yields *** | |
public class TakeYield : Yield { | |
public TakeYield (out Result result, Channel chan) | |
: base(out result, chan) {} | |
public override bool MoveNext () { | |
object takeResult; | |
bool polled; | |
try { | |
polled = _channel.pollValue(out takeResult); | |
} | |
catch (ChannelCloseException e) { | |
yieldResult.closed = true; | |
yieldResult.exception = e; | |
yieldResultApply(); | |
return false; | |
} | |
yieldResult.value = takeResult; | |
yieldResultApply(); | |
return !polled; | |
} | |
} | |
public class PutYield : Yield { | |
private readonly object _value; | |
private bool _isWaitingTake; | |
public PutYield (out Result result, Channel chan, object value) | |
: base(out result, chan) { | |
_value = value; | |
} | |
public override bool MoveNext () { | |
if (_channel.closed) { | |
yieldResult.closed = true; | |
yieldResult.value = null; | |
yieldResult.exception = new ChannelCloseException("PutYield.MoveNext"); | |
yieldResultApply(); | |
return false; | |
} | |
if (!_channel.buffered) { | |
if (_isWaitingTake) { | |
return _channel.thePutYield == this; // false - no block | |
} | |
_channel.setFixedValue(_value, this); | |
_isWaitingTake = true; // to wait this to be took | |
return true; | |
} | |
// else _channel is buffered | |
var offered = _channel.offerValue(_value); | |
if (offered) { | |
yieldResult.value = _value; | |
yieldResultApply(); | |
return false; // no block | |
} | |
if (_channel.bufferType == BufferType.Dropping) { | |
return false; | |
} | |
return true; // block | |
} | |
} | |
public class TimeoutYield : Yield { | |
private readonly float _startTime; | |
private readonly float _lifeTime; | |
public TimeoutYield (float time) : base(null) { | |
if (isInMainThread) { | |
_startTime = Time.time; | |
_lifeTime = time; | |
} | |
else { | |
Debug.LogError("timeout only work in unity"); | |
} | |
} | |
public override bool MoveNext () { | |
if (Time.time - _startTime < _lifeTime) { | |
return true; | |
} | |
yieldResult.isTimeout = true; | |
return false; | |
} | |
} | |
/// <summary> | |
/// Alts yield. normally used with a timeout yield | |
/// </summary> | |
public class AltsYield : Yield { | |
private readonly IList<Yield> _ienumerators; | |
public AltsYield (out Result result, IList<Yield> ienumerators) | |
: base(out result, null) { | |
_ienumerators = ienumerators; | |
} | |
public override bool MoveNext () { | |
//csp.ret = null; // TODO check if the goroutine is return immediat | |
foreach (var cspYeild in _ienumerators) { | |
var hasNext = cspYeild.MoveNext(); | |
if (!hasNext) { | |
yieldResult.copy(cspYeild.yieldResult); | |
yieldResult.value = cspYeild.Current; | |
base.MoveNext(); | |
return false; | |
} | |
} | |
base.MoveNext(); | |
return true; | |
} | |
} | |
} | |
} | |
/// <summary> | |
/// hold a call stack to apply nested IEnumerator | |
/// </summary> | |
internal class SubCoroutineWrapper : IEnumerator { | |
private readonly Stack<IEnumerator> _functorStack = new Stack<IEnumerator>(); | |
public SubCoroutineWrapper (IEnumerator functor) { | |
_functorStack.Push(functor); | |
} | |
public bool MoveNext () { | |
if (_functorStack.Count == 0) { | |
return false; | |
} | |
var currentEnumerator = _functorStack.Peek(); | |
if (currentEnumerator.MoveNext()) { | |
// if there is a sub ienumerator | |
var ie = currentEnumerator.Current as IEnumerator; | |
if (ie != null) { | |
_functorStack.Push(ie); | |
return MoveNext(); | |
} | |
Current = currentEnumerator.Current; | |
return true; | |
} | |
// move next false | |
_functorStack.Pop(); | |
return MoveNext(); | |
} | |
public void Reset () {} | |
public object Current { get; private set; } | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public IEnumerator usage () { | |
yield return csp.toSub(); // 跳出unity线程 | |
// do something ... | |
yield return csp.toUnity(); // 跳回unity线程 | |
// yield return csp.toUnity(this); // 跳回unity线程时指定某个monobehaviour来运行 | |
var ch = csp.chan(); // 默认阻塞的管道 | |
var ch0 = csp.chan(2); // 缓存为2的管道 | |
var ch1 = csp.chan(3, csp.Channel.BufferType.Sliding); // 管道选项 | |
csp.Result ret; // 由于c# 3.5 没有 async 和 await,此处用一个变量当做yield的返回值 | |
yield return csp.put(ch0, 1); // 放入管道, 如果被阻塞则等待 | |
yield return csp.put(out ret, ch1, "asdf"); // 放入管道, 获得返回值,可以看看异常之类的 | |
csp.putAsync(this, ch0, 1, _ => { Debug.Log("put succeed"); }); // 放入管道,不等待 | |
yield return csp.take(out ret, ch0); // 从管道取出一个值 | |
Debug.Log("take the result" + ret.value); | |
csp.takeAsync(this, ch1, result => { | |
Debug.Log("take async: " + result.value); | |
}); | |
yield return csp.timeout(1f); // 等待1秒, 【注:timeout只能在unity线程中使用】 | |
yield return csp.alts(out ret, csp.timeout(1f), ch, ch0, ch1); | |
// 从3个管道中取出一个值,或者1秒后没有值则超时 | |
if (ret.isTimeout) { | |
Debug.Log("timeout"); | |
} | |
else { | |
Debug.Log("value" + ret.value); | |
} | |
yield return csp.altsInTime(out ret, 1f, ch, ch0, ch1); // 效果同上, 少写一个timeout | |
} | |
// 调用 | |
csp.go(this, usage()); // this处需要一个monobehaviour来运行初始coroutine, | |
//csp.go(usage()); // 或者不提供monobehaviour参数,则初始就在系统线程中 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment