Last active
May 6, 2016 22:28
-
-
Save mavam/463312c5b7ec57f1838c77b50e25c973 to your computer and use it in GitHub Desktop.
Broker API Synopsis
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// --- component setup -------------------------------------------------------- | |
// Creates a broker execution context that encapsulates runtime state, such as | |
// a thread pool and message type information. | |
system sys{cfg}; | |
// Create a broker in a given system. A broker is a thin abstraction on top of | |
// a CAF actor for sending and receiving messages. There exist synchronous and | |
// aysnchronous brokers. | |
async_broker a{sys}; | |
sync_broker b{sys}; | |
// Publish broker at a TCP endpoint, allowing it to accept remote connections. | |
b.listen("127.0.0.1", 42000); | |
// --- peering between brokers ------------------------------------------------ | |
// Peer with another broker. Peering units exchange subscriptions and route | |
// messages among them. | |
sync_broker c{sys}; | |
c.peer(b); | |
b.peer(c); // idempotent | |
// Peer with a remote broker and remote broker to peers upon success. | |
b.peer("1.2.3.4", 42000); // block until connected or failed | |
b.peer<immediate>("1.2.3.4", 42000); // equivalent, but with explicit flag | |
// Try to connect automatically until successful. | |
// This version uncondionally adds the remote broker to the list of peers. | |
b.peer<lazy>("1.2.3.4", 42000); | |
b.peer<lazy>("1.2.3.4", 42000, | |
hour(1), // when to abort ultimately | |
seconds(10) // delay between attempts | |
); | |
// Inspect peers. | |
for (auto p : b.peers()) | |
cout << p.address() << ':' << p.port() << endl; // If failed: <unbound>:0 | |
// Remove a peering. | |
b.unpeer(c); | |
b.unpeer("1.2.3.4", 42000); | |
// --- publishing messages ---------------------------------------------------- | |
// Send data <foo, 42, 3.14> under topic "/foo" to all peers. | |
// The function drops the message if no peers are alive/connected. | |
b.publish("/foo", "foo", 42, 3.14); | |
// --- subscribing to messages ------------------------------------------------ | |
// Subscribes to a topic. | |
b.subscribe("/foo"); | |
b.subscribe("/bar"); | |
// For a synchronous broker, block and wait until a single message arrives. | |
// For an asynchronous broker, register the handler. | |
b.process( | |
[=](const topic& t, const message& msg) { | |
msg.apply( | |
[&](int, int) { .. }, | |
[&](double) { .. }, | |
[&](const string&) { .. } | |
); | |
}, | |
[=](const status& s) { | |
if (s == remote_peer_disconnected) { | |
auto r = s.context().get<broker>(0); // retrieve status context | |
cout << "lost connection to " << r.address() << ':' << r.port() << endl; | |
} | |
} | |
); | |
// For synchronous brokers and as an alternative to the functional message | |
// processing API, one can just receive the next <topic, data> pair. | |
// This function blocks. | |
auto msg = b.receive(); | |
msg.apply( | |
[&](const topic& t, const message& msg) { | |
// received regular data | |
}, | |
[&](const status& s) { | |
// received a status message | |
} | |
); | |
// For synchronous brokers, retrieve a descriptor suitable for integration into | |
// poll/select loops. The descriptor is "ready" when there exists at least one | |
// message that can then be extracted synchronously without waiting. | |
auto fd = b.descriptor(); | |
// --- data store API --------------------------------------------------------- | |
// TODO |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment