Skip to content

Instantly share code, notes, and snippets.

@recursivecodes
Created March 12, 2019 13:23
Show Gist options
  • Select an option

  • Save recursivecodes/2832e09e9423b1460200377192059f0f to your computer and use it in GitHub Desktop.

Select an option

Save recursivecodes/2832e09e9423b1460200377192059f0f to your computer and use it in GitHub Desktop.
package codes.recursive.barn.automation.service.streaming
import codes.recursive.barn.automation.event.EventEmitter
import codes.recursive.barn.automation.model.BarnEvent
import codes.recursive.barn.automation.service.data.OracleDataService
import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider
import com.oracle.bmc.streaming.StreamClient
import com.oracle.bmc.streaming.model.CreateGroupCursorDetails
import com.oracle.bmc.streaming.model.Message
import com.oracle.bmc.streaming.requests.CreateGroupCursorRequest
import com.oracle.bmc.streaming.requests.GetMessagesRequest
import groovy.json.JsonException
import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import groovy.util.logging.Slf4j
import javax.enterprise.context.ApplicationScoped
import javax.inject.Inject
import java.util.concurrent.atomic.AtomicBoolean
@ApplicationScoped
@Slf4j
class MessageConsumerService {
String configFilePath
String streamId
String groupName = 'group-0'
StreamClient client
private final AtomicBoolean closed = new AtomicBoolean(false)
@Inject private EventEmitter eventEmitter
@Inject private OracleDataService oracleDataService
MessageConsumerService(configFilePath=System.getProperty("ociConfigPath", "/.oci/config"), streamId=System.getProperty("streamId")) {
this.configFilePath = configFilePath
this.streamId = streamId
def provider = new ConfigFileAuthenticationDetailsProvider(this.configFilePath, 'DEFAULT')
def client = new StreamClient(provider)
client.setRegion('us-phoenix-1')
this.client = client
}
def start() {
log.info("Creating cursor...")
def cursorDetails = CreateGroupCursorDetails.builder()
.type(CreateGroupCursorDetails.Type.TrimHorizon)
.commitOnGet(true)
.groupName(this.groupName)
.build()
def groupCursorRequest = CreateGroupCursorRequest.builder()
.streamId(streamId)
.createGroupCursorDetails(cursorDetails)
.build()
def cursorResponse = this.client.createGroupCursor(groupCursorRequest)
log.info("Cursor created...")
def getRequest = GetMessagesRequest.builder()
.cursor(cursorResponse.cursor.value)
.streamId(this.streamId)
.build()
while(!closed.get()) {
def getResult = client.getMessages(getRequest)
if( getResult.items.size() ) {
getResult.items.each { Message record ->
def msg
try {
def slurper = new JsonSlurper()
msg = slurper.parseText( new String(record.value, "UTF-8") )
log.info(msg.toString())
BarnEvent evt = new BarnEvent( msg?.type, JsonOutput.toJson(msg?.data), record.timestamp )
oracleDataService.save(evt)
eventEmitter.emit('incomingMessage', [message: [type: evt.type, capturedAt: evt.capturedAt, data: slurper.parseText(evt.data)], timestamp: record.timestamp])
}
catch (JsonException e) {
log.error("Error parsing JSON from ${record.value}")
e.printStackTrace()
}
catch (Exception e) {
log.error("Error:")
e.printStackTrace()
}
}
}
else {
//println '<--- no messages'
}
getRequest.cursor = getResult.opcNextCursor
sleep(1000)
}
}
def close() {
log.info("Closing cursor...")
closed.set(true)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment