Skip to content

Instantly share code, notes, and snippets.

@animeshtrivedi
Last active December 22, 2017 10:28
Show Gist options
  • Save animeshtrivedi/f51968d72c28cdcb43e4b6bf11291f19 to your computer and use it in GitHub Desktop.
Save animeshtrivedi/f51968d72c28cdcb43e4b6bf11291f19 to your computer and use it in GitHub Desktop.
example API for crail streaming API
/*
This is just a rough sketch how a the code should look like. While
impelmenting the actual logic you may find that new classes or abstractions
might be required.
*/
class CrailStreamProducerExample {
public static void main(String[] args) {
/* you need to implement CrailBroker, CrailProducer, CrailStreamWriter classes */
BorkerProducer broker = new BorkerProducer();
broker.connect("hostanme", port, someThingMore);
CrailProducer producer = new CrailProducer(...);
producer.registerBroker(broker, ...);
/*
"topic" is the name of the topic
1MB or 10ms - whatever is reached first will be used to notify the broker
1GB is the stream file size, once that is reached, a new crail file is created.
*/
CrailStreamWriter writer = producer.createTopic("topic", 1MB, 10ms, 1GB);
while(true){
/* internally writeRecord will constantly send updates to the broker every 1MB or 10ms */
writer.writeRecord(...);
/* or you can explicitly call commit to send a notification */
writer.commit();
}
writer.close();
broker.disconnect();
}
}
/////////////////////////////////////////////////////////////////////////////
class CrailStreamConsumerExample {
public static void main(String[] args) {
BrokerConsumer broker = new BrokerConsumer();
broker.connect("hostanme", port, someThingMore);
/* you need to implement CrailConsumer, CrailStreamReader classes */
CrailConsumer consumer = new CrailConsumer(...);
consumer.registerBroker(broker, ...);
CrailStreamReader reader =consumer.openTopic("topic", ...);
while(reader.hasMoreRecords()){
/* internally writeRecord will constantly send updates to the broker every 1MB or 10ms */
Record rec = reader.readRecord(...);
/* do something with the record */
}
reader.close();
broker.disconnect();
}
}
/////////////////////////////////////////////////////////////////////////////
/* Borker will have a server and client API - The server is a standalone program that runs on a server.
The client-side runs on the producer and consumer sides, and communicates with the BrokerServer. */
class BrokerServer {
int registerProducer(...);
int registerConsumer(...);
int deregisterProducer(...);
int deregisterConsumer(...);
int cleanTopic(String topic);
...
}
class BorkerProducer{
int connect(String host, int port, ...);
int sendNotificationToBroker(Notification notficiation);
int recvAckFromBroker(...);
...
}
class BorkerConsumer{
int connect(String host, int port, ...);
Notification recvNotificationFromBroker();
int sendAckToBroker(...);
...
}
// notification message format
class Notification {
int newRecords;
long numBytes;
long fileOfffset;
...
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment