Created
June 5, 2017 16:03
-
-
Save TORISOUP/3e618adcd968395db7bfbfcbc3a2ac53 to your computer and use it in GitHub Desktop.
Expire operator publishes default value after a certain time since published
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class ExpireSample : MonoBehaviour | |
{ | |
[SerializeField] | |
private Button button; | |
void Start() | |
{ | |
//When this button click, publish "true". | |
//and after 3 seconds, publish "false". | |
button.OnClickAsObservable() | |
.Select(_ => true) | |
.Expire(TimeSpan.FromSeconds(3f)) | |
.Subscribe(x => Debug.Log(x)); | |
} | |
} | |
//------------------- | |
public class ExpireObservable<T> : OperatorObservableBase<T> | |
{ | |
readonly IObservable<T> source; | |
readonly TimeSpan dueTime; | |
readonly IScheduler scheduler; | |
private readonly T defaultValue; | |
public ExpireObservable(IObservable<T> source, T defaultValue, TimeSpan dueTime, IScheduler scheduler) | |
: base(scheduler == Scheduler.CurrentThread || source.IsRequiredSubscribeOnCurrentThread()) | |
{ | |
this.source = source; | |
this.dueTime = dueTime; | |
this.scheduler = scheduler; | |
this.defaultValue = defaultValue; | |
} | |
protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel) | |
{ | |
return new Expire(this, observer, cancel).Run(); | |
} | |
public class Expire : OperatorObserverBase<T, T> | |
{ | |
private readonly ExpireObservable<T> parent; | |
private readonly object gate = new object(); | |
SerialDisposable cancelable; | |
public Expire(ExpireObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel) | |
{ | |
this.parent = parent; | |
} | |
public IDisposable Run() | |
{ | |
cancelable = new SerialDisposable(); | |
var subscription = parent.source.Subscribe(this); | |
return StableCompositeDisposable.Create(cancelable, subscription); | |
} | |
private void OnNext() | |
{ | |
lock (gate) | |
{ | |
observer.OnNext(parent.defaultValue); | |
} | |
} | |
public override void OnNext(T value) | |
{ | |
lock (gate) | |
{ | |
observer.OnNext(value); | |
var d = new SingleAssignmentDisposable(); | |
cancelable.Disposable = d; | |
d.Disposable = parent.scheduler.Schedule(parent.dueTime, OnNext); | |
} | |
} | |
public override void OnError(Exception error) | |
{ | |
cancelable.Dispose(); | |
lock (gate) | |
{ | |
try { observer.OnError(error); } finally { Dispose(); } | |
} | |
} | |
public override void OnCompleted() | |
{ | |
cancelable.Dispose(); | |
lock (gate) | |
{ | |
try { observer.OnCompleted(); } finally { Dispose(); } | |
} | |
} | |
} | |
} | |
//------------------------------- | |
public static partial class Observable | |
{ | |
public static IObservable<TSource> Expire<TSource>(this IObservable<TSource> source, TSource defaultValue, TimeSpan dueTime, IScheduler scheduler) | |
{ | |
return new ExpireObservable<TSource>(source, defaultValue, dueTime, scheduler); | |
} | |
public static IObservable<TSource> Expire<TSource>(this IObservable<TSource> source, TSource defaultValue, TimeSpan dueTime) | |
{ | |
return source.Expire(defaultValue, dueTime, Scheduler.DefaultSchedulers.TimeBasedOperations); | |
} | |
public static IObservable<TSource> Expire<TSource>(this IObservable<TSource> source, TimeSpan dueTime) | |
{ | |
return source.Expire(default(TSource), dueTime, Scheduler.DefaultSchedulers.TimeBasedOperations); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment