Skip to content

Instantly share code, notes, and snippets.

@Dorus
Dorus / GroupByDistinct.cs
Last active August 29, 2015 14:25
GroupBy that closes and creates a new Group when the key changes. Useful on sorted collections.
public static IObservable<IObservable<TElement>> GroupByDistinct<TElement, TKey>(this IObservable<TElement> source, Func<TElement, TKey> keySelector)
{
return Observable.Create<IObservable<TElement>>(o =>
{
SerialDisposable dis = new SerialDisposable();
Subject<TElement> sub = new Subject<TElement>();
dis.Disposable = sub;
o.OnNext(sub.AsObservable());
bool first = true;
TKey prev = default(TKey);
@Dorus
Dorus / Additional Rx Extension Methods
Last active October 15, 2016 21:27
RxExtensions.cs
public static class Extended {
public static IObservable<TSource> Amb<TSource>(this IObservable<IObservable<TSource>> source) {
return Observable.Create<TSource>(o => {
int first = -1;
return source.TakeWhile(_ => first == -1)
.Select((el, c) => el
.DoFirst(1, _ => Interlocked.CompareExchange(ref first, c, -1))
.TakeWhile(_ => first == c))
.Merge().Subscribe(o);
using System;
using System.Collections.Concurrent;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
namespace Server {
static class OverflowQueue_ {
/// <summary>
package RxTest.RxTest;
import rx.functions.Func2;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import rx.Observable;
public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> {
@Dorus
Dorus / MyTransformers.java
Last active November 20, 2015 11:07
Additional RxJava transformers
/**
* Transformer to concat a new observable based on the last element of the previous observable. Use the default value
* if the first observable is empty.
*
* @param defaultValue
* the default.
* @param nextObservable
* function to create the next observable based on the current observable.
* @return the transformer.
*/
package RxTest.RxTest;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import rx.Notification;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
@Dorus
Dorus / samples.js
Created May 4, 2016 22:52
Marble samples
var source$ = Rx.Observable.marble("1--23--456--7890--");
var enter$ = Rx.Observable.marble("-e-----e----------");
var exit$ = Rx.Observable.marble("---x-------x------");
var sampler$ = Rx.Observable.marble("a--b--c--d--e--f--");
var full$ = Rx.Observable.marble("01234567890abcdefg");
source$.draw('source', '#container')
.debounceTime(1050).draw('debounceTime(1)', '#container');
source$.auditTime(1050).draw('auditTime(1)', '#container');
source$.throttleTime(1050).draw('throttleTime(1)', '#container');
@Dorus
Dorus / index.js
Last active May 6, 2021 11:18 — forked from xgrommx/index.js
How we can make methods of observable via other methods
const flatMap = (fn, stream, resultSelector) =>
stream.flatMap((x, xi) => fn(x).map((y, yi) => resultSelector(x, y, xi, yi)));
const flatMapLatest = (fn, stream) =>
stream.publish(s => s.flatMap(v => fn(v).takeUntil(s)));
const flatMapLatest = (fn, stream, resultSelector) => stream.publish(s => {
return s.flatMap(v => fn(v), resultSelector).takeUntil(s));
});
@Dorus
Dorus / GroupCache
Last active December 8, 2016 12:33
GroupCache
public sealed class GroupCache<TSource, TKey> : IConnectableObservable<TSource>
{
private IConnectableObservable<TSource> _souce;
private IObservable<TSource> _souceGrouped;
public GroupCache(IObservable<TSource> source, Func<TSource, TKey> keySelector)
{
_souce = source.Publish();
var sub = new ReplaySubject<IObservable<TSource>>();
_souce
function LL (x) {
return 1/(1+10**(-x/400));
}
function LLR(W, L, elo0, elo1) {
//if (W==0 || L==0) return 0;
if (!W) W=1;
if (!L) L=1;