Created
December 19, 2013 07:30
-
-
Save arturaz/8035632 to your computer and use it in GitHub Desktop.
Lightweigth reactive extensions implementation for Unity3D
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
### utils/rx/Observable.cs ### | |
?using System; | |
using System.Collections; | |
using System.Collections.Generic; | |
using UnityEngine; | |
using utils.functional; | |
namespace utils.rx { | |
public interface IObservable<T> { | |
ISubscription subscribe(Action<T> onChange); | |
IObservable<A> map<A>(Func<T, A> mapper); | |
IObservable<T> filter(Func<T, bool> predicate); | |
IObservable<Tuple2<T, A>> zip<A>(IObservable<A> other); | |
IObservable<T> onlyChanges(); | |
} | |
public interface IObserver<in T> { | |
void next(T value); | |
} | |
public class Observer<T> : IObserver<T> { | |
private readonly Action<T> onNext; | |
public Observer(Action<T> onNext) { | |
this.onNext = onNext; | |
} | |
public void next(T value) { | |
onNext(value); | |
} | |
} | |
public static class Observable { | |
public static IObservable<Unit> everyFrame( | |
MonoBehaviour behaviour | |
) { | |
return new Observable<Unit>(observer => | |
behaviour.StartCoroutine(everyFrame(observer)) | |
); | |
} | |
public static IObservable<DateTime> interval( | |
MonoBehaviour behaviour, float intervalS | |
) { | |
return interval(behaviour, intervalS, F.none<float>()); | |
} | |
public static IObservable<DateTime> interval( | |
MonoBehaviour behaviour, float intervalS, float delayS | |
) { | |
return interval(behaviour, intervalS, F.some(delayS)); | |
} | |
public static IObservable<DateTime> interval( | |
MonoBehaviour behaviour, float intervalS, Option<float> delayS | |
) { | |
return new Observable<DateTime>(observer => | |
behaviour.StartCoroutine(interval(observer, intervalS, delayS)) | |
); | |
} | |
private static IEnumerator everyFrame(IObserver<Unit> observer) { | |
while (true) { | |
observer.next(Unit.instance); | |
yield return null; | |
} | |
} | |
private static IEnumerator interval( | |
IObserver<DateTime> observer, float intervalS, Option<float> delayS | |
) { | |
if (delayS.isDefined) yield return new WaitForSeconds(delayS.get); | |
var wait = new WaitForSeconds(intervalS); | |
while (true) { | |
observer.next(DateTime.Now); | |
yield return wait; | |
} | |
} | |
} | |
public class Observable<T> : IObservable<T> { | |
private readonly IList<Tuple2<Subscription, Action<T>>> subscriptions = | |
new List<Tuple2<Subscription, Action<T>>>(); | |
public Observable() { } | |
public static IObservable<T> a() { return new Observable<T>(); } | |
public Observable(Action<IObserver<T>> next) { | |
next(new Observer<T>(this.next)); | |
} | |
public static IObservable<T> a(Action<IObserver<T>> next) { | |
return new Observable<T>(next); | |
} | |
protected virtual void next(T next) { | |
foreach (var t in subscriptions) t._2(next); | |
} | |
public virtual ISubscription subscribe(Action<T> onChange) { | |
Subscription subscription = null; | |
// ReSharper disable once AccessToModifiedClosure | |
subscription = new Subscription(() => unsubscribe(subscription)); | |
subscriptions.Add(F.t2(subscription, onChange)); | |
return subscription; | |
} | |
/** | |
* If this observable is last value subject we want to propogate that | |
* on our operations as well - this function allows you to easily do that. | |
**/ | |
private static IObservable<A> wrapToLastValueSubject<A>( | |
Observable<A> observable, Action<IObservable<A>, Action<A>> block | |
) { | |
var emmited = F.none<A>(); | |
block(observable, value => { | |
// Don't create Some every time someone emits something on this | |
// observable for the rest ot the time | |
if (emmited.isEmpty) emmited = F.some(value); | |
observable.next(value); | |
}); | |
// fold signature - Unity compiler bug. | |
return emmited.fold<IObservable<A>>(() => observable, value => { | |
var lvs = observable.toLastValueSubject(); | |
lvs.next(value); | |
return lvs; | |
}); | |
} | |
public virtual IObservable<A> map<A>(Func<T, A> mapper) { | |
return wrapToLastValueSubject( | |
new Observable<A>(), | |
(mapped, emit) => subscribe(val => emit(mapper(val))) | |
); | |
} | |
public IObservable<T> filter(Func<T, bool> predicate) { | |
return wrapToLastValueSubject( | |
new Observable<T>(), | |
(filtered, emit) => subscribe(val => { if (predicate(val)) emit(val); }) | |
); | |
} | |
public IObservable<Tuple2<T, A>> zip<A>(IObservable<A> other) { | |
return wrapToLastValueSubject(new Observable<Tuple2<T, A>>(), (zipped, emit) => { | |
var lastSelf = F.none<T>(); | |
var lastOther = F.none<A>(); | |
Action notify = () => lastSelf.each(aVal => lastOther.each(bVal => | |
emit(F.t2(aVal, bVal)) | |
)); | |
subscribe(val => { | |
lastSelf = F.some(val); | |
notify(); | |
}); | |
other.subscribe(val => { | |
lastOther = F.some(val); | |
notify(); | |
}); | |
}); | |
} | |
public IObservable<T> onlyChanges() { | |
return wrapToLastValueSubject(new Observable<T>(), (obs, emit) => { | |
var lastValue = F.none<T>(); | |
subscribe(val => { | |
lastValue.voidFold( | |
() => emit(val), | |
lastVal => { | |
if (! EqualityComparer<T>.Default.Equals(lastVal, val)) | |
emit(val); | |
} | |
); | |
lastValue = F.some(val); | |
}); | |
}); | |
} | |
private void unsubscribe(Subscription s) { | |
subscriptions.IndexWhere(t => t._1 == s).each(subscriptions.RemoveAt); | |
} | |
} | |
} | |
### utils/rx/ObservableList.cs ### | |
?using System.Collections; | |
using System.Collections.Generic; | |
using utils.functional; | |
namespace utils.rx { | |
public interface IObservableList<A> : IList<A> { | |
IObservable<int> sizeObservable { get; } | |
IObservable<Option<A>> elementObservable(int index); | |
} | |
public class ObservableList<A> : IObservableList<A> { | |
private readonly List<A> backing = new List<A>(); | |
private readonly Dictionary< | |
int, Tuple2<IObservable<Option<A>>, IObserver<Option<A>>> | |
> elementObservables = new Dictionary< | |
int, Tuple2<IObservable<Option<A>>, IObserver<Option<A>>> | |
>(); | |
private readonly IObserver<int> sizeObserver; | |
private readonly IObservable<int> _sizeObservable; | |
public IObservable<int> sizeObservable { get { return _sizeObservable; } } | |
public ObservableList() { | |
IObserver<int> sizeObserver = null; | |
_sizeObservable = new LastValueSubject<int>(obs => sizeObserver = obs); | |
this.sizeObserver = sizeObserver; | |
} | |
public IEnumerator<A> GetEnumerator() { | |
return backing.GetEnumerator(); | |
} | |
IEnumerator IEnumerable.GetEnumerator() { | |
return GetEnumerator(); | |
} | |
public virtual IObservable<Option<A>> elementObservable(int index) { | |
return elementObservables.get(index).fold( | |
() => { | |
IObserver<Option<A>> observer = null; | |
var observable = new LastValueSubject<Option<A>>(obs => observer = obs); | |
observer.next(this.get(index)); | |
elementObservables[index] = | |
F.t2((IObservable<Option<A>>) observable, observer); | |
return observable; | |
}, | |
t => t._1 | |
); | |
} | |
public void Add(A item) { | |
backing.Add(item); | |
notifyElementObservable(Count); | |
sizeObserver.next(Count); | |
} | |
public void Clear() { | |
backing.Clear(); | |
foreach (var kv in elementObservables) { | |
kv.Value._2.next(F.none<A>()); | |
} | |
sizeObserver.next(Count); | |
} | |
public bool Contains(A item) { | |
return backing.Contains(item); | |
} | |
public void CopyTo(A[] array, int arrayIndex) { | |
backing.CopyTo(array, arrayIndex); | |
} | |
public bool Remove(A item) { | |
var index = IndexOf(item); | |
if (index < 0) return false; | |
RemoveAt(index); | |
sizeObserver.next(Count); | |
return true; | |
} | |
public int Count { get { return backing.Count; }} | |
public bool IsReadOnly { get { return false; } } | |
public int IndexOf(A item) { | |
return backing.IndexOf(item); | |
} | |
public void Insert(int index, A item) { | |
backing.Insert(index, item); | |
for (var i = index; i < Count; i++) { | |
notifyElementObservable(i); | |
} | |
sizeObserver.next(Count); | |
} | |
public void RemoveAt(int index) { | |
backing.RemoveAt(index); | |
for (var i = index; i < Count + 1; i++) { | |
notifyElementObservable(i); | |
} | |
sizeObserver.next(Count); | |
} | |
public A this[int index] { | |
get { return backing[index]; } | |
set { | |
backing[index] = value; | |
notifyElementObservable(index); | |
} | |
} | |
private void notifyElementObservable(int index) { | |
elementObservables.get(index).each(t => t._2.next(this.get(index))); | |
} | |
} | |
} | |
### utils/rx/ObservableView.cs ### | |
?using System.Collections.Generic; | |
using System.Collections.ObjectModel; | |
using utils.functional; | |
namespace utils.rx { | |
public class ObservableView { | |
public static ObservableView<A> a<A>( | |
ObservableList<A> list, int startIndex, int windowSize | |
) { return new ObservableView<A>(list, startIndex, windowSize); } | |
} | |
public sealed class ObservableView<A> | |
: LastValueSubject<ReadOnlyCollection<Option<A>>> { | |
public readonly int windowSize; | |
private readonly ObservableList<A> list; | |
private readonly IList<ISubscription> subscriptions = | |
new List<ISubscription>(); | |
private readonly List<Option<A>> viewValues; | |
private int _startIndex = -1; | |
public int startIndex { | |
get { return _startIndex; } | |
set { | |
if (_startIndex == value) return; | |
foreach (var s in subscriptions) s.unsubscribe(); | |
subscriptions.Clear(); | |
var start = _startIndex; | |
var end = start + windowSize; | |
for (var current = start; current < end; current++) { | |
var i = current - start; | |
subscriptions.Add(list.elementObservable(current).subscribe(vOpt => { | |
viewValues[i] = vOpt; | |
next(); | |
})); | |
viewValues[i] = list.get(current); | |
} | |
_startIndex = value; | |
next(); | |
} | |
} | |
public ObservableView(ObservableList<A> list, int startIndex, int windowSize) { | |
this.list = list; | |
this.windowSize = windowSize; | |
viewValues = new List<Option<A>>(windowSize); | |
for (var i = 0; i < windowSize; i++) viewValues.Add(F.none<A>()); | |
this.startIndex = startIndex; | |
} | |
public int mapIndex(int viewIndex) { | |
return startIndex + viewIndex; | |
} | |
public ReadOnlyCollection<Option<A>> current { | |
get { return viewValues.AsReadOnly(); } | |
} | |
private void next() { | |
next(current); | |
} | |
} | |
} | |
### utils/rx/Subject.cs ### | |
?using System; | |
using utils.functional; | |
namespace utils.rx { | |
public static class LastValueSubject { | |
public static LastValueSubject<T> from<T>(IObservable<T> source) { | |
return new LastValueSubject<T>(observer => | |
source.subscribe(observer.next) | |
); | |
} | |
public static LastValueSubject<T> toLastValueSubject<T>(this IObservable<T> source) { | |
return from(source); | |
} | |
} | |
public class LastValueSubject<T> : Observable<T> { | |
public Option<T> lastValue { get; private set; } | |
public LastValueSubject() { | |
lastValue = F.none<T>(); | |
} | |
public LastValueSubject(Action<IObserver<T>> action) : this() { | |
action(new Observer<T>(next)); | |
} | |
protected override void next(T next) { | |
lastValue = F.some(next); | |
base.next(next); | |
} | |
public override ISubscription subscribe(Action<T> onChange) { | |
lastValue.each(onChange); | |
return base.subscribe(onChange); | |
} | |
} | |
} | |
### utils/rx/Subscription.cs ### | |
?using System; | |
using System.Collections.Generic; | |
namespace utils.rx { | |
public interface ISubscription { | |
bool isSubscribed { get; } | |
bool unsubscribe(); | |
} | |
public class Subscription : ISubscription { | |
private readonly Action onUnsubscribe; | |
private bool _isSubscribed = true; | |
public Subscription(Action onUnsubscribe) { | |
this.onUnsubscribe = onUnsubscribe; | |
} | |
public bool isSubscribed { get { return _isSubscribed; } } | |
public bool unsubscribe() { | |
if (!isSubscribed) return false; | |
_isSubscribed = false; | |
onUnsubscribe(); | |
return true; | |
} | |
} | |
public class SubscriptionTracker : IDisposable { | |
private readonly List<ISubscription> subscriptions = | |
new List<ISubscription>(); | |
public ISubscription track(ISubscription subscription) { | |
subscriptions.Add(subscription); | |
return subscription; | |
} | |
public void Dispose() { | |
foreach (var subscription in subscriptions) | |
subscription.unsubscribe(); | |
subscriptions.Clear(); | |
} | |
} | |
} |
This was only a sample code for my friend :)
However I've published our full library - hope you can make use of it, extend and contribute back!
Check it out at https://github.com/tinylabproductions/tlplib
Thanks! I'll try it out
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I tried using it, by also adding in https://gist.github.com/arturaz/8036037 but I have an error: 'The type or namespace name
Option
1' could not be found'Missing the class 'Option'.