Last active
June 18, 2019 15:59
-
-
Save yKimisaki/18c8cd28148936f1e9dad56c7d8396e5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using UniRx; | |
namespace Tonari.UniRx | |
{ | |
public class ContinuousObservableQueue<T> | |
{ | |
private Subject<Unit> _onAgitate; | |
private Subject<Unit> _onSettle; | |
private readonly Queue<QueueTask<T>> _queue; | |
public ContinuousObservableQueue() | |
{ | |
this._queue = new Queue<QueueTask<T>>(); | |
this._onAgitate = new Subject<Unit>(); | |
this._onSettle = new Subject<Unit>(); | |
} | |
/// <summary> | |
/// キューがない状態から、新たにキューが開始された時に呼ばれます。 | |
/// </summary> | |
public IObservable<Unit> OnAgitateAsObservable() | |
{ | |
return this._onAgitate; | |
} | |
/// <summary> | |
/// 現在積まれているキューをすべて終えた時に呼ばれます。 | |
/// </summary> | |
public IObservable<Unit> OnSettleAsObservable() | |
{ | |
return this._onSettle; | |
} | |
public IObservable<T> ReserveAsync(Func<IObservable<T>> action) | |
{ | |
// キューがあるなら待ち状態 | |
if (this._queue.Any()) | |
{ | |
var queueTask = new QueueTask<T>(action); | |
this._queue.Enqueue(queueTask); | |
return queueTask.GetResultAsync(); | |
} | |
// 外部処理 | |
this._onAgitate.OnNext(Unit.Default); | |
// 待ち状態でないときは初期化 | |
var task = new QueueTask<T>(action); | |
this._queue.Enqueue(task); | |
// そのまま実行して、終わり次第次を呼ぶ | |
this.Step(); | |
return task.GetResultAsync(); | |
} | |
private void Step() | |
{ | |
// キューにものがあったら | |
if (this._queue.Any()) | |
{ | |
// キューから引っ張り出してきて実行 | |
var process = this._queue.Peek(); | |
process.ExecuteAsync() | |
.Finally(() => | |
{ | |
// 終わったらキューから抜いて次のやつを実行するぞい | |
this._queue.Dequeue(); | |
this.Step(); | |
}) | |
.Subscribe(); | |
return; | |
} | |
// 残ってなかったら終了 | |
this._queue.Clear(); | |
// 外部処理 | |
this._onSettle.OnNext(Unit.Default); | |
} | |
private class QueueTask<TItem> | |
{ | |
private Func<IObservable<TItem>> _task; | |
private AsyncSubject<TItem> _result; | |
private bool _canceled; | |
public QueueTask(Func<IObservable<TItem>> task) | |
{ | |
_task = task; | |
_result = new AsyncSubject<TItem>(); | |
} | |
public IObservable<Unit> ExecuteAsync() | |
{ | |
// キャンセルされていたら何もしないで次に進む | |
if (_canceled) | |
{ | |
return Observable.ReturnUnit(); | |
} | |
else | |
{ | |
return _task() | |
.Do(x => | |
{ | |
_result.OnNext(x); | |
_result.OnCompleted(); | |
}) | |
.AsUnitObservable(); | |
} | |
} | |
public IObservable<TItem> GetResultAsync() | |
{ | |
if (_result.IsCompleted) return _result; | |
return _result.DoOnCancel(() => _canceled = true); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
ぶっちゃけCoroutine使いましょう、という感じではあります