Skip to content

Instantly share code, notes, and snippets.

@diorcety
Created September 26, 2017 12:55
Show Gist options
  • Save diorcety/50de19e866459af160e05a84b1816e42 to your computer and use it in GitHub Desktop.
Save diorcety/50de19e866459af160e05a84b1816e42 to your computer and use it in GitHub Desktop.
#include <iostream>
#include <memory>
#include <chrono>
#include <thread>
#include <rxcpp/rx.hpp>
struct Abcd {
Abcd() {
printf("Abcd\n");
}
~Abcd() {
printf("~Abcd\n");
}
};
template<typename A>
struct Context1 {
typedef ::rxcpp::observable<A> observable_type;
Context1(int a, int x) : a(a), x(x) {
}
int a;
int x;
};
template<typename A>
struct Context2 {
typedef ::rxcpp::observable<A> observable_type;
Context2(const observable_type &observable, const ::std::shared_ptr<Abcd> &abcd): observable(observable), abcd(abcd) {
}
observable_type observable;
::std::shared_ptr<Abcd> abcd;
const observable_type &get() const {
return observable;
}
};
template<typename A>
::rxcpp::observable<A> flat_map(const ::rxcpp::observable<Context2<A>> &a) {
return a.flat_map(
[](const auto &a) { return a.get(); },
[](const auto &a, const auto &b) { return b; }
);
};
template<typename Type>
::rxcpp::observable<Context2<Type>> abcd(::rxcpp::observable<Context1<int>> observable) {
auto hook = ::std::make_shared<Abcd>();
return observable.group_by(
[](const auto &a) {
return a.a;
},
[](const auto &a) {
return a.x;
}
).map([hook](const auto &a) {
return Context2<Type>(a, hook);
});
}
int main() {
std::cout << "Hello, World!" << std::endl;
#if 1
::rxcpp::subjects::subject<Context1<int>> subject;
auto thread = ::rxcpp::serialize_new_thread();
auto sub = abcd<int>(subject.get_observable()).subscribe_on(thread).flat_map(
[](const auto &a) { printf("TEST\n"); return a.get(); },
[](const auto &a, const auto &b) { return b; }).subscribe([](const auto &a) {
printf("Value: %d\n", a);
});
auto subscriber = subject.get_subscriber();
subscriber.on_next(Context1<int>(1, 0));
subscriber.on_next(Context1<int>(1, 20));
//subscriber.on_completed();
::std::this_thread::sleep_for (std::chrono::seconds(1));
std::cout << "Unsubscribe" << std::endl;
sub.unsubscribe();
#endif
std::cout << "End of World!" << std::endl;
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment