Skip to content

Instantly share code, notes, and snippets.

@alexoro
Created May 7, 2015 10:34

Revisions

  1. alexoro created this gist May 7, 2015.
    118 changes: 118 additions & 0 deletions gistfile1.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,118 @@
    package com.alexoro.rxjava_test;

    import android.app.Activity;
    import android.os.Bundle;
    import android.util.Pair;
    import android.widget.TextView;

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;

    import rx.Observable;
    import rx.Observer;
    import rx.Subscriber;
    import rx.Subscription;
    import rx.android.schedulers.AndroidSchedulers;
    import rx.functions.Func2;
    import rx.schedulers.Schedulers;


    /**
    * Created by a.sorokin on 06.05.2015.
    */
    public class MainActivity extends Activity {

    private Subscription mTariffDataSubscriber;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.main);

    final TextView tv = (TextView) findViewById(R.id.text);

    mTariffDataSubscriber = Observable.combineLatest(
    getTariffInfoObservable(),
    getTariffChannelsObservable(),
    new Func2<TariffInfo, List<TariffChannel>, Pair<TariffInfo, List<TariffChannel>>>() {
    @Override
    public Pair<TariffInfo, List<TariffChannel>> call(TariffInfo tariffInfo, List<TariffChannel> tariffChannels) {
    return new Pair<TariffInfo, List<TariffChannel>>(tariffInfo, tariffChannels);
    }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<Pair<TariffInfo, List<TariffChannel>>>() {
    @Override
    public void onCompleted() {

    }
    @Override
    public void onError(Throwable e) {
    tv.setText("" + e);
    }
    @Override
    public void onNext(Pair<TariffInfo, List<TariffChannel>> result) {
    tv.setText(result.first.name);
    tv.append("\n");
    tv.append("" + result.second);
    }
    });
    }

    @Override
    protected void onDestroy() {
    super.onDestroy();
    if (mTariffDataSubscriber != null) {
    mTariffDataSubscriber.unsubscribe();
    mTariffDataSubscriber = null;
    }
    }

    protected Observable<TariffInfo> getTariffInfoObservable() {
    return Observable.create(new Observable.OnSubscribe<TariffInfo>() {
    @Override
    public void call(Subscriber<? super TariffInfo> subscriber) {
    try {
    Thread.sleep(1000);
    subscriber.onNext(new TariffInfo("Tariff name"));
    } catch (Exception ex) {
    subscriber.onError(ex);
    }
    subscriber.onCompleted();
    }
    });
    }

    private Observable<List<TariffChannel>> getTariffChannelsObservable() {
    return Observable.create(new Observable.OnSubscribe<List<TariffChannel>>() {
    @Override
    public void call(Subscriber<? super List<TariffChannel>> subscriber) {
    try {
    Thread.sleep(5000);
    subscriber.onNext(Arrays.asList(new TariffChannel("Channel #1"), new TariffChannel("Channel #2")));
    List<TariffChannel> r = new ArrayList<TariffChannel>();
    } catch (Exception ex) {
    subscriber.onError(ex);
    }
    subscriber.onCompleted();
    }
    });
    }

    private static class TariffInfo {
    public String name;
    public TariffInfo(String name) {
    this.name = name;
    }
    }

    private static class TariffChannel {
    public String name;
    public TariffChannel(String name) {
    this.name = name;
    }
    }

    }