Last active
September 18, 2015 05:23
-
-
Save liyonghelpme/ebafe07178f4d98a4bfa to your computer and use it in GitHub Desktop.
c# Actor Coroutine
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Collections; | |
using System.Threading.Tasks; | |
using System.Threading; | |
using System.Threading.Tasks.Dataflow; | |
using System.Collections.Concurrent; | |
namespace LuaCoroutine | |
{ | |
public class DelayCall { | |
public List<Coroutine> queue = new List<Coroutine>(); | |
public void Run() { | |
var p = queue[0]; | |
queue.RemoveAt(0); | |
if(p != null) { | |
p.Run(); | |
}else { | |
Console.WriteLine("WaitForSecond"); | |
} | |
} | |
public void AddCall(Coroutine c, int t){ | |
for(int i = 0; i < t; i++){ | |
queue.Add(null); | |
} | |
Console.WriteLine ("AddCall "); | |
queue.Add(c); | |
} | |
public int Count(){ | |
Console.WriteLine ("CountIs "+queue.Count); | |
return queue.Count; | |
} | |
} | |
public class Coroutine { | |
Coroutine waitFor;//当前等待执行的协程 | |
Coroutine continueWhenFinished; //执行完成之后需要执行协程 | |
IEnumerator runObj; //上下文 | |
public void Run() | |
{ | |
var ret = runObj.MoveNext(); | |
if(!ret) { | |
if(continueWhenFinished != null) { | |
continueWhenFinished.waitFor = null; | |
var c = continueWhenFinished; | |
continueWhenFinished = null; | |
c.Run(); | |
} | |
return; | |
} | |
ProcessCoroutineCurrent(); | |
} | |
public class WaitForSecond{ | |
public int time; | |
public WaitForSecond(int t) { | |
time = t; | |
} | |
} | |
void ProcessCoroutineCurrent(){ | |
var cur = runObj.Current; | |
Console.WriteLine("CurrentValue "+cur +" currentActor "+Actor.actor.Value.Id+" threadId "+Thread.CurrentThread.ManagedThreadId); | |
if (cur == null) { | |
Actor.actor.Value.delayCall.AddCall(this, 0); | |
return; | |
} | |
HandleIEnumerableCurrentReturnValue(cur); | |
} | |
void HandleIEnumerableCurrentReturnValue(object cur){ | |
Console.WriteLine ("CurActor " + Actor.actor.Value.Id); | |
if(cur.GetType() == typeof(WaitForSecond)){ | |
Actor.actor.Value.delayCall.AddCall(this, (cur as WaitForSecond).time); | |
return; | |
} | |
Console.WriteLine ("after waitForsec"); | |
if(cur.GetType() == typeof(Coroutine)){ | |
var child = cur as Coroutine; | |
waitFor = child; | |
child.continueWhenFinished = this; | |
return; | |
} | |
Console.WriteLine ("AddSelf To Delay"); | |
Actor.actor.Value.delayCall.AddCall(this, 0); | |
} | |
public static Coroutine StartCoroutine(IEnumerator ie){ | |
var c = new Coroutine(); | |
c.runObj = ie; | |
c.Run(); | |
return c; | |
} | |
} | |
public static class ActorUtil { | |
public static void SendMsg(this Actor target, string msg) { | |
target.mailbox.SendAsync (msg); | |
} | |
} | |
public class Actor { | |
public int Id; | |
public DelayCall delayCall = new DelayCall(); | |
public static ThreadLocal<Actor> actor; | |
public BufferBlock<string> mailbox = new BufferBlock<string>(); | |
public Actor(){ | |
//Init (); | |
} | |
private async Task<string> Dispatch() { | |
while (true) { | |
var msg = await mailbox.ReceiveAsync (); | |
Console.WriteLine ("threadId receive "+this.GetType()+" id "+Thread.CurrentThread.ManagedThreadId); | |
Console.WriteLine ("receive msg " + msg); | |
await ReceiveMsg (msg); | |
} | |
} | |
protected virtual async Task ReceiveMsg(string msg) { | |
} | |
public void Init() { | |
Task.Run (Dispatch); | |
} | |
} | |
public class Actor1 : Actor{ | |
//Actor a2; | |
protected override async Task ReceiveMsg (string msg) | |
{ | |
var cmds = msg.Split(' '); | |
var cmd = cmds [0]; | |
if (cmd == "create") { | |
/* | |
a2 = new Actor2 (); | |
var id = ActorManager.Instance.AddActor (a2); | |
a2.Id = id; | |
*/ | |
ActorManager.Instance.createMsg.Add (typeof(Actor2)); | |
/* | |
while (true) { | |
} | |
*/ | |
} else if (cmd == "ping") { | |
//a2.SendMsg ("who " + Id); | |
} else if (cmd == "pong") { | |
} else if (cmd == "update") { | |
/* | |
while (true) { | |
} | |
*/ | |
} | |
} | |
} | |
public class Actor2 : Actor { | |
protected override async Task ReceiveMsg (string msg) | |
{ | |
var cmds = msg.Split(' '); | |
var cmd = cmds [0]; | |
if (cmd == "who") { | |
var sender = ActorManager.Instance.GetActor (Convert.ToInt32 (cmds [1])); | |
sender.SendMsg ("pong " + Id); | |
} else if (cmd == "update") { | |
await Task.Delay (300); | |
for (int i = 0; i < 1000000; i++) { | |
} | |
} | |
} | |
} | |
public class Timer : Actor { | |
IEnumerator tick() { | |
while (true) { | |
yield return null ; | |
} | |
} | |
protected override async Task ReceiveMsg (string msg) | |
{ | |
var cmds = msg.Split(' '); | |
var cmd = cmds [0]; | |
if (cmd == "start") { | |
Console.WriteLine ("timer Id "+Id); | |
actor = new ThreadLocal<Actor> (() => this); | |
Coroutine.StartCoroutine (tick()); | |
while (delayCall.Count () > 0) { | |
Console.WriteLine ("before wait "); | |
await Task.Delay(1000); | |
Console.WriteLine ("after wait"); | |
var act = ActorManager.Instance.GetActor (1); | |
if (act != null) { | |
act.SendMsg ("update"); | |
} | |
var act2 = ActorManager.Instance.GetActor (2); | |
if (act != null) { | |
act2.SendMsg ("update"); | |
} | |
actor = new ThreadLocal<Actor> (() => this); | |
delayCall.Run (); | |
} | |
Console.WriteLine ("delayCall Finish"); | |
} | |
} | |
} | |
public class ActorManager { | |
public static ActorManager Instance; | |
ConcurrentDictionary<int, Actor> actorDict; | |
private int actId = 0; | |
public BlockingCollection <Type> createMsg; | |
public ActorManager() { | |
createMsg = new BlockingCollection<Type> (); | |
actorDict = new ConcurrentDictionary<int, Actor> (); | |
Instance = this; | |
var thread = new Thread (new ThreadStart(WaitCreateMsg)); | |
thread.Start (); | |
} | |
void WaitCreateMsg() { | |
while (!createMsg.IsCompleted) { | |
var data = createMsg.Take (); | |
Console.WriteLine ("receive create Msg "+data); | |
var a = Activator.CreateInstance (data) as Actor; | |
var id = AddActor (a); | |
a.Init (); | |
} | |
} | |
public int AddActor(Actor act) { | |
var id = Interlocked.Increment (ref actId); | |
actorDict.TryAdd (id, act); | |
act.Id = id; | |
return id; | |
} | |
public Actor GetActor(int key){ | |
Actor ret = null; | |
var ok = actorDict.TryGetValue (key, out ret); | |
return ret; | |
} | |
} | |
class MainClass | |
{ | |
async static Task<string> Walk(float x, float y) { | |
Console.WriteLine (x+" p "+y); | |
Console.WriteLine(TaskScheduler.Current+" id "+TaskScheduler.Current.Id+" maxCur "+TaskScheduler.Current.MaximumConcurrencyLevel); | |
await Task.Delay (1000); | |
return "1"; | |
} | |
async static Task<string> FlyToTarget() { | |
var t1 = Walk (1, 2); | |
var t2 = Walk (3, 4); | |
var ret = await t1; | |
var ret2 = await t2; | |
return ret; | |
} | |
/// <summary> | |
/// 1:Actor内部创建新的Actor | |
/// 2:Actor互相mailbox通信 | |
/// 3:Actor修改自己的行为 | |
/// </summary> | |
/// <returns>The actor.</returns> | |
async static Task TestActor() { | |
} | |
public static void Main (string[] args) | |
{ | |
/* | |
var t = FlyToTarget (); | |
//t.RunSynchronously (); | |
t.Wait (); | |
var a1 = new Actor (); | |
var t1 = a1.Dispatch (); | |
t1.Start (); | |
var a2 = new Actor (); | |
var t2 = a2.Dispatch (); | |
t2.Start (); | |
*/ | |
ActorManager act = new ActorManager (); | |
var a1 = new Actor1 (); | |
var id = act.AddActor (a1); | |
a1.Id = id; | |
a1.Init (); | |
/* | |
var a2 = new Actor2 (); | |
var id2 = act.AddActor(a2); | |
a2.Id = id2; | |
*/ | |
a1.SendMsg ("create"); | |
//a1.SendMsg ("ping"); | |
var t = new Timer (); | |
//var idt = act.AddActor (t); | |
//t.Id = idt; | |
t.Init (); | |
t.SendMsg ("start"); | |
Console.ReadKey (); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
由ActorManager 构建所有的Actor