Skip to content

Instantly share code, notes, and snippets.

@michalfapso
Last active February 13, 2021 20:34
Show Gist options
  • Save michalfapso/4bbbc9076ee08d64e03f6221a4f38e67 to your computer and use it in GitHub Desktop.
Save michalfapso/4bbbc9076ee08d64e03f6221a4f38e67 to your computer and use it in GitHub Desktop.

Build instructions:

git clone https://github.com/ReactiveX/RxCpp.git
make
./main

This is the output of the program:

===== println stream of std::string =====
s: Hello, Matthew!
s: Hello, Aaron!

===== process all items =====
async_fun() sending the first data immediately
OnNext:99
async_fun() inside async begin
OnNext:100
OnNext:101
OnNext:102
OnNext:103
async_fun() inside async end
OnCompleted

===== process all items and transform in a chain =====
async_fun() sending the first data immediately
Map: 99 -> 990
OnNext:990
async_fun() inside async begin
Map: 100 -> 1000
OnNext:1000
Map: 101 -> 1010
OnNext:1010
Map: 102 -> 1020
OnNext:1020
Map: 103 -> 1030
OnNext:1030
async_fun() inside async end
OnCompleted

===== process all items and transform between different types in a chain =====
async_fun() sending the first data immediately
Map: 99 -> 99.123
OnNext:99.123
async_fun() inside async begin
Map: 100 -> 100.123
OnNext:100.123
Map: 101 -> 101.123
OnNext:101.123
Map: 102 -> 102.123
OnNext:102.123
Map: 103 -> 103.123
OnNext:103.123
async_fun() inside async end
OnCompleted

===== process all items + exception =====
async_fun() sending the first data immediately
OnNext:199
async_fun() inside async begin
OnNext:200
OnNext:201
OnError: testing exception
async_fun() inside async end

===== process only the last item =====
async_fun() sending the first data immediately
async_fun() inside async begin
async_fun() inside async end
OnNext serves here also as OnCompleted:103

===== process all items producing sub items for each input item =====
flat_map create sub elements from i:1
flat_map create output element i:1 i_sub:101
OnNext: 1 - 101
flat_map create output element i:1 i_sub:102
OnNext: 1 - 102
flat_map create output element i:1 i_sub:103
OnNext: 1 - 103
flat_map create sub elements from i:2
flat_map create output element i:2 i_sub:201
OnNext: 2 - 201
flat_map create output element i:2 i_sub:202
OnNext: 2 - 202
flat_map create output element i:2 i_sub:203
OnNext: 2 - 203
flat_map create sub elements from i:3
flat_map create output element i:3 i_sub:301
OnNext: 3 - 301
flat_map create output element i:3 i_sub:302
OnNext: 3 - 302
flat_map create output element i:3 i_sub:303
OnNext: 3 - 303
#include <iostream>
#include <thread>
using namespace std;
#include "rxcpp/rx.hpp"
// create alias to simplify code
namespace rx=rxcpp;
namespace rxu=rxcpp::util;
// A testing function which produces data elements asynchronously. In a real world, this could be an http get request.
rx::observable<int> async_fun(int i) {
// Create an observable and return it immediately, but it's data will arrive later
return rx::observable<>::create<int>([=](rx::subscriber<int> sub){
cout << "async_fun() sending the first data immediately" << endl;
sub.on_next(i-1);
// Create a new thread
std::async(std::launch::async, [sub, i_const=i](){
int i = i_const; // because the captured i is const
cout << "async_fun() inside async begin" << endl;
for (int j=0; j<3; j++) {
if (i > 201) {
sub.on_error(rxu::make_error_ptr(std::runtime_error("testing exception")));
}
// Send data to the subscriber
sub.on_next(i++);
std::this_thread::sleep_for(1000ms);
}
sub.on_next(i++);
cout << "async_fun() inside async end" << endl;
sub.on_completed();
});
});
}
int main() {
{
cout << "===== println stream of std::string =====" << endl;
auto get_names = [](){return rx::observable<>::from<std::string>(
"Matthew",
"Aaron"
);};
auto hello_str = [&](){return get_names().map([](std::string n){
return "Hello, " + n + "!";
}).as_dynamic();};
hello_str().subscribe([&](std::string s){
cout << "s: "<<s << endl;
});
}
{
cout << endl << "===== process all items =====" << endl;
async_fun(100)
.subscribe(
[](int i) { cout << "OnNext:"<<i << endl; },
[](rxu::error_ptr e){ cout << "OnError: "<<rxu::what(e)<<endl; },
[]() { cout << "OnCompleted" << endl; }
);
}
{
cout << endl << "===== process all items and transform in a chain =====" << endl;
async_fun(100)
.map(
[](int i) { cout << "Map: "<<i<<" -> "<<i*10 << endl; return i*10; }
).subscribe(
[](int i) { cout << "OnNext:"<<i << endl; },
[](rxu::error_ptr e){ cout << "OnError: "<<rxu::what(e)<<endl; },
[]() { cout << "OnCompleted" << endl; }
);
}
{
cout << endl << "===== process all items and transform between different types in a chain =====" << endl;
async_fun(100)
.map(
[](int i) { cout << "Map: "<<i<<" -> "<<i+0.123f << endl; return i+0.123f; }
).subscribe(
[](float f) { cout << "OnNext:"<<f << endl; },
[](rxu::error_ptr e){ cout << "OnError: "<<rxu::what(e)<<endl; },
[]() { cout << "OnCompleted" << endl; }
);
}
{
cout << endl << "===== process all items + exception =====" << endl;
async_fun(200)
.subscribe(
[](int i) { cout << "OnNext:"<<i << endl; },
[](rxu::error_ptr e){ cout << "OnError: "<<rxu::what(e)<<endl; },
[]() { cout << "OnCompleted" << endl; }
);
}
{
cout << endl << "===== process only the last item =====" << endl;
async_fun(100)
.last()
.subscribe(
[](int i){ cout << "OnNext serves here also as OnCompleted:"<<i << endl; }
);
}
{
cout << endl << "===== process all items producing sub items for each input item =====" << endl;
auto values = rxcpp::observable<>::range(1, 3)
.flat_map(
// Create sub elements from each input element
[](int i){
cout << "flat_map create sub elements from i:"<<i<<endl;
return rx::observable<>::create<int>([=](rx::subscriber<int> sub){
sub.on_next(i * 100 + 1);
sub.on_next(i * 100 + 2);
sub.on_next(i * 100 + 3);
});
},
// Create output elements from each pair of input and sub element
[](int i, int i_sub){
cout << "flat_map create output element i:"<<i<<" i_sub:"<<i_sub<<endl;
return std::make_tuple(i, i_sub);
});
values.
subscribe(
[](std::tuple<int, int> t){cout<<"OnNext: "<<std::get<0>(t)<<" - "<<std::get<1>(t)<<endl;},
[]() {cout<<"OnCompleted"<<endl;});
}
return 0;
}
# First, clone the RxCpp repo somewhere:
# git clone https://github.com/ReactiveX/RxCpp.git
CXXFLAGS = -I./RxCpp/Rx/v2/src
CXXFLAGS += -pthread
main: main.cpp
$(CXX) $(CXXFLAGS) -o $@ $<
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment