Created
March 12, 2019 13:23
-
-
Save recursivecodes/2832e09e9423b1460200377192059f0f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package codes.recursive.barn.automation.service.streaming | |
| import codes.recursive.barn.automation.event.EventEmitter | |
| import codes.recursive.barn.automation.model.BarnEvent | |
| import codes.recursive.barn.automation.service.data.OracleDataService | |
| import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider | |
| import com.oracle.bmc.streaming.StreamClient | |
| import com.oracle.bmc.streaming.model.CreateGroupCursorDetails | |
| import com.oracle.bmc.streaming.model.Message | |
| import com.oracle.bmc.streaming.requests.CreateGroupCursorRequest | |
| import com.oracle.bmc.streaming.requests.GetMessagesRequest | |
| import groovy.json.JsonException | |
| import groovy.json.JsonOutput | |
| import groovy.json.JsonSlurper | |
| import groovy.util.logging.Slf4j | |
| import javax.enterprise.context.ApplicationScoped | |
| import javax.inject.Inject | |
| import java.util.concurrent.atomic.AtomicBoolean | |
| @ApplicationScoped | |
| @Slf4j | |
| class MessageConsumerService { | |
| String configFilePath | |
| String streamId | |
| String groupName = 'group-0' | |
| StreamClient client | |
| private final AtomicBoolean closed = new AtomicBoolean(false) | |
| @Inject private EventEmitter eventEmitter | |
| @Inject private OracleDataService oracleDataService | |
| MessageConsumerService(configFilePath=System.getProperty("ociConfigPath", "/.oci/config"), streamId=System.getProperty("streamId")) { | |
| this.configFilePath = configFilePath | |
| this.streamId = streamId | |
| def provider = new ConfigFileAuthenticationDetailsProvider(this.configFilePath, 'DEFAULT') | |
| def client = new StreamClient(provider) | |
| client.setRegion('us-phoenix-1') | |
| this.client = client | |
| } | |
| def start() { | |
| log.info("Creating cursor...") | |
| def cursorDetails = CreateGroupCursorDetails.builder() | |
| .type(CreateGroupCursorDetails.Type.TrimHorizon) | |
| .commitOnGet(true) | |
| .groupName(this.groupName) | |
| .build() | |
| def groupCursorRequest = CreateGroupCursorRequest.builder() | |
| .streamId(streamId) | |
| .createGroupCursorDetails(cursorDetails) | |
| .build() | |
| def cursorResponse = this.client.createGroupCursor(groupCursorRequest) | |
| log.info("Cursor created...") | |
| def getRequest = GetMessagesRequest.builder() | |
| .cursor(cursorResponse.cursor.value) | |
| .streamId(this.streamId) | |
| .build() | |
| while(!closed.get()) { | |
| def getResult = client.getMessages(getRequest) | |
| if( getResult.items.size() ) { | |
| getResult.items.each { Message record -> | |
| def msg | |
| try { | |
| def slurper = new JsonSlurper() | |
| msg = slurper.parseText( new String(record.value, "UTF-8") ) | |
| log.info(msg.toString()) | |
| BarnEvent evt = new BarnEvent( msg?.type, JsonOutput.toJson(msg?.data), record.timestamp ) | |
| oracleDataService.save(evt) | |
| eventEmitter.emit('incomingMessage', [message: [type: evt.type, capturedAt: evt.capturedAt, data: slurper.parseText(evt.data)], timestamp: record.timestamp]) | |
| } | |
| catch (JsonException e) { | |
| log.error("Error parsing JSON from ${record.value}") | |
| e.printStackTrace() | |
| } | |
| catch (Exception e) { | |
| log.error("Error:") | |
| e.printStackTrace() | |
| } | |
| } | |
| } | |
| else { | |
| //println '<--- no messages' | |
| } | |
| getRequest.cursor = getResult.opcNextCursor | |
| sleep(1000) | |
| } | |
| } | |
| def close() { | |
| log.info("Closing cursor...") | |
| closed.set(true) | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment