Last active
December 22, 2017 10:28
-
-
Save animeshtrivedi/f51968d72c28cdcb43e4b6bf11291f19 to your computer and use it in GitHub Desktop.
example API for crail streaming API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
This is just a rough sketch how a the code should look like. While | |
impelmenting the actual logic you may find that new classes or abstractions | |
might be required. | |
*/ | |
class CrailStreamProducerExample { | |
public static void main(String[] args) { | |
/* you need to implement CrailBroker, CrailProducer, CrailStreamWriter classes */ | |
BorkerProducer broker = new BorkerProducer(); | |
broker.connect("hostanme", port, someThingMore); | |
CrailProducer producer = new CrailProducer(...); | |
producer.registerBroker(broker, ...); | |
/* | |
"topic" is the name of the topic | |
1MB or 10ms - whatever is reached first will be used to notify the broker | |
1GB is the stream file size, once that is reached, a new crail file is created. | |
*/ | |
CrailStreamWriter writer = producer.createTopic("topic", 1MB, 10ms, 1GB); | |
while(true){ | |
/* internally writeRecord will constantly send updates to the broker every 1MB or 10ms */ | |
writer.writeRecord(...); | |
/* or you can explicitly call commit to send a notification */ | |
writer.commit(); | |
} | |
writer.close(); | |
broker.disconnect(); | |
} | |
} | |
///////////////////////////////////////////////////////////////////////////// | |
class CrailStreamConsumerExample { | |
public static void main(String[] args) { | |
BrokerConsumer broker = new BrokerConsumer(); | |
broker.connect("hostanme", port, someThingMore); | |
/* you need to implement CrailConsumer, CrailStreamReader classes */ | |
CrailConsumer consumer = new CrailConsumer(...); | |
consumer.registerBroker(broker, ...); | |
CrailStreamReader reader =consumer.openTopic("topic", ...); | |
while(reader.hasMoreRecords()){ | |
/* internally writeRecord will constantly send updates to the broker every 1MB or 10ms */ | |
Record rec = reader.readRecord(...); | |
/* do something with the record */ | |
} | |
reader.close(); | |
broker.disconnect(); | |
} | |
} | |
///////////////////////////////////////////////////////////////////////////// | |
/* Borker will have a server and client API - The server is a standalone program that runs on a server. | |
The client-side runs on the producer and consumer sides, and communicates with the BrokerServer. */ | |
class BrokerServer { | |
int registerProducer(...); | |
int registerConsumer(...); | |
int deregisterProducer(...); | |
int deregisterConsumer(...); | |
int cleanTopic(String topic); | |
... | |
} | |
class BorkerProducer{ | |
int connect(String host, int port, ...); | |
int sendNotificationToBroker(Notification notficiation); | |
int recvAckFromBroker(...); | |
... | |
} | |
class BorkerConsumer{ | |
int connect(String host, int port, ...); | |
Notification recvNotificationFromBroker(); | |
int sendAckToBroker(...); | |
... | |
} | |
// notification message format | |
class Notification { | |
int newRecords; | |
long numBytes; | |
long fileOfffset; | |
... | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment