Skip to content

Instantly share code, notes, and snippets.

@liyonghelpme
Last active September 18, 2015 05:23
Show Gist options
  • Save liyonghelpme/ebafe07178f4d98a4bfa to your computer and use it in GitHub Desktop.
Save liyonghelpme/ebafe07178f4d98a4bfa to your computer and use it in GitHub Desktop.
c# Actor Coroutine
using System;
using System.Collections.Generic;
using System.Collections;
using System.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks.Dataflow;
using System.Collections.Concurrent;
namespace LuaCoroutine
{
public class DelayCall {
public List<Coroutine> queue = new List<Coroutine>();
public void Run() {
var p = queue[0];
queue.RemoveAt(0);
if(p != null) {
p.Run();
}else {
Console.WriteLine("WaitForSecond");
}
}
public void AddCall(Coroutine c, int t){
for(int i = 0; i < t; i++){
queue.Add(null);
}
Console.WriteLine ("AddCall ");
queue.Add(c);
}
public int Count(){
Console.WriteLine ("CountIs "+queue.Count);
return queue.Count;
}
}
public class Coroutine {
Coroutine waitFor;//当前等待执行的协程
Coroutine continueWhenFinished; //执行完成之后需要执行协程
IEnumerator runObj; //上下文
public void Run()
{
var ret = runObj.MoveNext();
if(!ret) {
if(continueWhenFinished != null) {
continueWhenFinished.waitFor = null;
var c = continueWhenFinished;
continueWhenFinished = null;
c.Run();
}
return;
}
ProcessCoroutineCurrent();
}
public class WaitForSecond{
public int time;
public WaitForSecond(int t) {
time = t;
}
}
void ProcessCoroutineCurrent(){
var cur = runObj.Current;
Console.WriteLine("CurrentValue "+cur +" currentActor "+Actor.actor.Value.Id+" threadId "+Thread.CurrentThread.ManagedThreadId);
if (cur == null) {
Actor.actor.Value.delayCall.AddCall(this, 0);
return;
}
HandleIEnumerableCurrentReturnValue(cur);
}
void HandleIEnumerableCurrentReturnValue(object cur){
Console.WriteLine ("CurActor " + Actor.actor.Value.Id);
if(cur.GetType() == typeof(WaitForSecond)){
Actor.actor.Value.delayCall.AddCall(this, (cur as WaitForSecond).time);
return;
}
Console.WriteLine ("after waitForsec");
if(cur.GetType() == typeof(Coroutine)){
var child = cur as Coroutine;
waitFor = child;
child.continueWhenFinished = this;
return;
}
Console.WriteLine ("AddSelf To Delay");
Actor.actor.Value.delayCall.AddCall(this, 0);
}
public static Coroutine StartCoroutine(IEnumerator ie){
var c = new Coroutine();
c.runObj = ie;
c.Run();
return c;
}
}
public static class ActorUtil {
public static void SendMsg(this Actor target, string msg) {
target.mailbox.SendAsync (msg);
}
}
public class Actor {
public int Id;
public DelayCall delayCall = new DelayCall();
public static ThreadLocal<Actor> actor;
public BufferBlock<string> mailbox = new BufferBlock<string>();
public Actor(){
//Init ();
}
private async Task<string> Dispatch() {
while (true) {
var msg = await mailbox.ReceiveAsync ();
Console.WriteLine ("threadId receive "+this.GetType()+" id "+Thread.CurrentThread.ManagedThreadId);
Console.WriteLine ("receive msg " + msg);
await ReceiveMsg (msg);
}
}
protected virtual async Task ReceiveMsg(string msg) {
}
public void Init() {
Task.Run (Dispatch);
}
}
public class Actor1 : Actor{
//Actor a2;
protected override async Task ReceiveMsg (string msg)
{
var cmds = msg.Split(' ');
var cmd = cmds [0];
if (cmd == "create") {
/*
a2 = new Actor2 ();
var id = ActorManager.Instance.AddActor (a2);
a2.Id = id;
*/
ActorManager.Instance.createMsg.Add (typeof(Actor2));
/*
while (true) {
}
*/
} else if (cmd == "ping") {
//a2.SendMsg ("who " + Id);
} else if (cmd == "pong") {
} else if (cmd == "update") {
/*
while (true) {
}
*/
}
}
}
public class Actor2 : Actor {
protected override async Task ReceiveMsg (string msg)
{
var cmds = msg.Split(' ');
var cmd = cmds [0];
if (cmd == "who") {
var sender = ActorManager.Instance.GetActor (Convert.ToInt32 (cmds [1]));
sender.SendMsg ("pong " + Id);
} else if (cmd == "update") {
await Task.Delay (300);
for (int i = 0; i < 1000000; i++) {
}
}
}
}
public class Timer : Actor {
IEnumerator tick() {
while (true) {
yield return null ;
}
}
protected override async Task ReceiveMsg (string msg)
{
var cmds = msg.Split(' ');
var cmd = cmds [0];
if (cmd == "start") {
Console.WriteLine ("timer Id "+Id);
actor = new ThreadLocal<Actor> (() => this);
Coroutine.StartCoroutine (tick());
while (delayCall.Count () > 0) {
Console.WriteLine ("before wait ");
await Task.Delay(1000);
Console.WriteLine ("after wait");
var act = ActorManager.Instance.GetActor (1);
if (act != null) {
act.SendMsg ("update");
}
var act2 = ActorManager.Instance.GetActor (2);
if (act != null) {
act2.SendMsg ("update");
}
actor = new ThreadLocal<Actor> (() => this);
delayCall.Run ();
}
Console.WriteLine ("delayCall Finish");
}
}
}
public class ActorManager {
public static ActorManager Instance;
ConcurrentDictionary<int, Actor> actorDict;
private int actId = 0;
public BlockingCollection <Type> createMsg;
public ActorManager() {
createMsg = new BlockingCollection<Type> ();
actorDict = new ConcurrentDictionary<int, Actor> ();
Instance = this;
var thread = new Thread (new ThreadStart(WaitCreateMsg));
thread.Start ();
}
void WaitCreateMsg() {
while (!createMsg.IsCompleted) {
var data = createMsg.Take ();
Console.WriteLine ("receive create Msg "+data);
var a = Activator.CreateInstance (data) as Actor;
var id = AddActor (a);
a.Init ();
}
}
public int AddActor(Actor act) {
var id = Interlocked.Increment (ref actId);
actorDict.TryAdd (id, act);
act.Id = id;
return id;
}
public Actor GetActor(int key){
Actor ret = null;
var ok = actorDict.TryGetValue (key, out ret);
return ret;
}
}
class MainClass
{
async static Task<string> Walk(float x, float y) {
Console.WriteLine (x+" p "+y);
Console.WriteLine(TaskScheduler.Current+" id "+TaskScheduler.Current.Id+" maxCur "+TaskScheduler.Current.MaximumConcurrencyLevel);
await Task.Delay (1000);
return "1";
}
async static Task<string> FlyToTarget() {
var t1 = Walk (1, 2);
var t2 = Walk (3, 4);
var ret = await t1;
var ret2 = await t2;
return ret;
}
/// <summary>
/// 1:Actor内部创建新的Actor
/// 2:Actor互相mailbox通信
/// 3:Actor修改自己的行为
/// </summary>
/// <returns>The actor.</returns>
async static Task TestActor() {
}
public static void Main (string[] args)
{
/*
var t = FlyToTarget ();
//t.RunSynchronously ();
t.Wait ();
var a1 = new Actor ();
var t1 = a1.Dispatch ();
t1.Start ();
var a2 = new Actor ();
var t2 = a2.Dispatch ();
t2.Start ();
*/
ActorManager act = new ActorManager ();
var a1 = new Actor1 ();
var id = act.AddActor (a1);
a1.Id = id;
a1.Init ();
/*
var a2 = new Actor2 ();
var id2 = act.AddActor(a2);
a2.Id = id2;
*/
a1.SendMsg ("create");
//a1.SendMsg ("ping");
var t = new Timer ();
//var idt = act.AddActor (t);
//t.Id = idt;
t.Init ();
t.SendMsg ("start");
Console.ReadKey ();
}
}
}
@liyonghelpme
Copy link
Author

由ActorManager 构建所有的Actor

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment