Skip to content

Instantly share code, notes, and snippets.

@arturaz
Created December 19, 2013 07:30
Show Gist options
  • Save arturaz/8035632 to your computer and use it in GitHub Desktop.
Save arturaz/8035632 to your computer and use it in GitHub Desktop.
Lightweigth reactive extensions implementation for Unity3D
### utils/rx/Observable.cs ###
?using System;
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using utils.functional;
namespace utils.rx {
public interface IObservable<T> {
ISubscription subscribe(Action<T> onChange);
IObservable<A> map<A>(Func<T, A> mapper);
IObservable<T> filter(Func<T, bool> predicate);
IObservable<Tuple2<T, A>> zip<A>(IObservable<A> other);
IObservable<T> onlyChanges();
}
public interface IObserver<in T> {
void next(T value);
}
public class Observer<T> : IObserver<T> {
private readonly Action<T> onNext;
public Observer(Action<T> onNext) {
this.onNext = onNext;
}
public void next(T value) {
onNext(value);
}
}
public static class Observable {
public static IObservable<Unit> everyFrame(
MonoBehaviour behaviour
) {
return new Observable<Unit>(observer =>
behaviour.StartCoroutine(everyFrame(observer))
);
}
public static IObservable<DateTime> interval(
MonoBehaviour behaviour, float intervalS
) {
return interval(behaviour, intervalS, F.none<float>());
}
public static IObservable<DateTime> interval(
MonoBehaviour behaviour, float intervalS, float delayS
) {
return interval(behaviour, intervalS, F.some(delayS));
}
public static IObservable<DateTime> interval(
MonoBehaviour behaviour, float intervalS, Option<float> delayS
) {
return new Observable<DateTime>(observer =>
behaviour.StartCoroutine(interval(observer, intervalS, delayS))
);
}
private static IEnumerator everyFrame(IObserver<Unit> observer) {
while (true) {
observer.next(Unit.instance);
yield return null;
}
}
private static IEnumerator interval(
IObserver<DateTime> observer, float intervalS, Option<float> delayS
) {
if (delayS.isDefined) yield return new WaitForSeconds(delayS.get);
var wait = new WaitForSeconds(intervalS);
while (true) {
observer.next(DateTime.Now);
yield return wait;
}
}
}
public class Observable<T> : IObservable<T> {
private readonly IList<Tuple2<Subscription, Action<T>>> subscriptions =
new List<Tuple2<Subscription, Action<T>>>();
public Observable() { }
public static IObservable<T> a() { return new Observable<T>(); }
public Observable(Action<IObserver<T>> next) {
next(new Observer<T>(this.next));
}
public static IObservable<T> a(Action<IObserver<T>> next) {
return new Observable<T>(next);
}
protected virtual void next(T next) {
foreach (var t in subscriptions) t._2(next);
}
public virtual ISubscription subscribe(Action<T> onChange) {
Subscription subscription = null;
// ReSharper disable once AccessToModifiedClosure
subscription = new Subscription(() => unsubscribe(subscription));
subscriptions.Add(F.t2(subscription, onChange));
return subscription;
}
/**
* If this observable is last value subject we want to propogate that
* on our operations as well - this function allows you to easily do that.
**/
private static IObservable<A> wrapToLastValueSubject<A>(
Observable<A> observable, Action<IObservable<A>, Action<A>> block
) {
var emmited = F.none<A>();
block(observable, value => {
// Don't create Some every time someone emits something on this
// observable for the rest ot the time
if (emmited.isEmpty) emmited = F.some(value);
observable.next(value);
});
// fold signature - Unity compiler bug.
return emmited.fold<IObservable<A>>(() => observable, value => {
var lvs = observable.toLastValueSubject();
lvs.next(value);
return lvs;
});
}
public virtual IObservable<A> map<A>(Func<T, A> mapper) {
return wrapToLastValueSubject(
new Observable<A>(),
(mapped, emit) => subscribe(val => emit(mapper(val)))
);
}
public IObservable<T> filter(Func<T, bool> predicate) {
return wrapToLastValueSubject(
new Observable<T>(),
(filtered, emit) => subscribe(val => { if (predicate(val)) emit(val); })
);
}
public IObservable<Tuple2<T, A>> zip<A>(IObservable<A> other) {
return wrapToLastValueSubject(new Observable<Tuple2<T, A>>(), (zipped, emit) => {
var lastSelf = F.none<T>();
var lastOther = F.none<A>();
Action notify = () => lastSelf.each(aVal => lastOther.each(bVal =>
emit(F.t2(aVal, bVal))
));
subscribe(val => {
lastSelf = F.some(val);
notify();
});
other.subscribe(val => {
lastOther = F.some(val);
notify();
});
});
}
public IObservable<T> onlyChanges() {
return wrapToLastValueSubject(new Observable<T>(), (obs, emit) => {
var lastValue = F.none<T>();
subscribe(val => {
lastValue.voidFold(
() => emit(val),
lastVal => {
if (! EqualityComparer<T>.Default.Equals(lastVal, val))
emit(val);
}
);
lastValue = F.some(val);
});
});
}
private void unsubscribe(Subscription s) {
subscriptions.IndexWhere(t => t._1 == s).each(subscriptions.RemoveAt);
}
}
}
### utils/rx/ObservableList.cs ###
?using System.Collections;
using System.Collections.Generic;
using utils.functional;
namespace utils.rx {
public interface IObservableList<A> : IList<A> {
IObservable<int> sizeObservable { get; }
IObservable<Option<A>> elementObservable(int index);
}
public class ObservableList<A> : IObservableList<A> {
private readonly List<A> backing = new List<A>();
private readonly Dictionary<
int, Tuple2<IObservable<Option<A>>, IObserver<Option<A>>>
> elementObservables = new Dictionary<
int, Tuple2<IObservable<Option<A>>, IObserver<Option<A>>>
>();
private readonly IObserver<int> sizeObserver;
private readonly IObservable<int> _sizeObservable;
public IObservable<int> sizeObservable { get { return _sizeObservable; } }
public ObservableList() {
IObserver<int> sizeObserver = null;
_sizeObservable = new LastValueSubject<int>(obs => sizeObserver = obs);
this.sizeObserver = sizeObserver;
}
public IEnumerator<A> GetEnumerator() {
return backing.GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator() {
return GetEnumerator();
}
public virtual IObservable<Option<A>> elementObservable(int index) {
return elementObservables.get(index).fold(
() => {
IObserver<Option<A>> observer = null;
var observable = new LastValueSubject<Option<A>>(obs => observer = obs);
observer.next(this.get(index));
elementObservables[index] =
F.t2((IObservable<Option<A>>) observable, observer);
return observable;
},
t => t._1
);
}
public void Add(A item) {
backing.Add(item);
notifyElementObservable(Count);
sizeObserver.next(Count);
}
public void Clear() {
backing.Clear();
foreach (var kv in elementObservables) {
kv.Value._2.next(F.none<A>());
}
sizeObserver.next(Count);
}
public bool Contains(A item) {
return backing.Contains(item);
}
public void CopyTo(A[] array, int arrayIndex) {
backing.CopyTo(array, arrayIndex);
}
public bool Remove(A item) {
var index = IndexOf(item);
if (index < 0) return false;
RemoveAt(index);
sizeObserver.next(Count);
return true;
}
public int Count { get { return backing.Count; }}
public bool IsReadOnly { get { return false; } }
public int IndexOf(A item) {
return backing.IndexOf(item);
}
public void Insert(int index, A item) {
backing.Insert(index, item);
for (var i = index; i < Count; i++) {
notifyElementObservable(i);
}
sizeObserver.next(Count);
}
public void RemoveAt(int index) {
backing.RemoveAt(index);
for (var i = index; i < Count + 1; i++) {
notifyElementObservable(i);
}
sizeObserver.next(Count);
}
public A this[int index] {
get { return backing[index]; }
set {
backing[index] = value;
notifyElementObservable(index);
}
}
private void notifyElementObservable(int index) {
elementObservables.get(index).each(t => t._2.next(this.get(index)));
}
}
}
### utils/rx/ObservableView.cs ###
?using System.Collections.Generic;
using System.Collections.ObjectModel;
using utils.functional;
namespace utils.rx {
public class ObservableView {
public static ObservableView<A> a<A>(
ObservableList<A> list, int startIndex, int windowSize
) { return new ObservableView<A>(list, startIndex, windowSize); }
}
public sealed class ObservableView<A>
: LastValueSubject<ReadOnlyCollection<Option<A>>> {
public readonly int windowSize;
private readonly ObservableList<A> list;
private readonly IList<ISubscription> subscriptions =
new List<ISubscription>();
private readonly List<Option<A>> viewValues;
private int _startIndex = -1;
public int startIndex {
get { return _startIndex; }
set {
if (_startIndex == value) return;
foreach (var s in subscriptions) s.unsubscribe();
subscriptions.Clear();
var start = _startIndex;
var end = start + windowSize;
for (var current = start; current < end; current++) {
var i = current - start;
subscriptions.Add(list.elementObservable(current).subscribe(vOpt => {
viewValues[i] = vOpt;
next();
}));
viewValues[i] = list.get(current);
}
_startIndex = value;
next();
}
}
public ObservableView(ObservableList<A> list, int startIndex, int windowSize) {
this.list = list;
this.windowSize = windowSize;
viewValues = new List<Option<A>>(windowSize);
for (var i = 0; i < windowSize; i++) viewValues.Add(F.none<A>());
this.startIndex = startIndex;
}
public int mapIndex(int viewIndex) {
return startIndex + viewIndex;
}
public ReadOnlyCollection<Option<A>> current {
get { return viewValues.AsReadOnly(); }
}
private void next() {
next(current);
}
}
}
### utils/rx/Subject.cs ###
?using System;
using utils.functional;
namespace utils.rx {
public static class LastValueSubject {
public static LastValueSubject<T> from<T>(IObservable<T> source) {
return new LastValueSubject<T>(observer =>
source.subscribe(observer.next)
);
}
public static LastValueSubject<T> toLastValueSubject<T>(this IObservable<T> source) {
return from(source);
}
}
public class LastValueSubject<T> : Observable<T> {
public Option<T> lastValue { get; private set; }
public LastValueSubject() {
lastValue = F.none<T>();
}
public LastValueSubject(Action<IObserver<T>> action) : this() {
action(new Observer<T>(next));
}
protected override void next(T next) {
lastValue = F.some(next);
base.next(next);
}
public override ISubscription subscribe(Action<T> onChange) {
lastValue.each(onChange);
return base.subscribe(onChange);
}
}
}
### utils/rx/Subscription.cs ###
?using System;
using System.Collections.Generic;
namespace utils.rx {
public interface ISubscription {
bool isSubscribed { get; }
bool unsubscribe();
}
public class Subscription : ISubscription {
private readonly Action onUnsubscribe;
private bool _isSubscribed = true;
public Subscription(Action onUnsubscribe) {
this.onUnsubscribe = onUnsubscribe;
}
public bool isSubscribed { get { return _isSubscribed; } }
public bool unsubscribe() {
if (!isSubscribed) return false;
_isSubscribed = false;
onUnsubscribe();
return true;
}
}
public class SubscriptionTracker : IDisposable {
private readonly List<ISubscription> subscriptions =
new List<ISubscription>();
public ISubscription track(ISubscription subscription) {
subscriptions.Add(subscription);
return subscription;
}
public void Dispose() {
foreach (var subscription in subscriptions)
subscription.unsubscribe();
subscriptions.Clear();
}
}
}
@doppioslash
Copy link

I tried using it, by also adding in https://gist.github.com/arturaz/8036037 but I have an error: 'The type or namespace name Option1' could not be found'
Missing the class 'Option'.

@arturaz
Copy link
Author

arturaz commented Mar 23, 2014

This was only a sample code for my friend :)

However I've published our full library - hope you can make use of it, extend and contribute back!

Check it out at https://github.com/tinylabproductions/tlplib

@doppioslash
Copy link

Thanks! I'll try it out

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment