Created
December 15, 2019 14:59
-
-
Save deeperunderstanding/68492b468bf17bae6717af26fe836095 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Component | |
class CoinbaseWebsocket @Autowired constructor( | |
@Value("\${cbpro.websocket.baseurl}") val websocketUrl: String | |
) { | |
private final val mapper = ObjectMapper() | |
private final val logger = LoggerFactory.getLogger(javaClass) | |
private final val client = ReactorNettyWebSocketClient() | |
init { client.maxFramePayloadLength = Int.MAX_VALUE } | |
fun connect(subscription: SubscriptionMessage): Flux<Message> { | |
val emitter = EmitterProcessor.create<Message>() | |
val subscriptionMessage = mapper.writeValueAsString(subscription) | |
logger.info("Coinbase Endpoint will connect to $websocketUrl") | |
val session = client.execute(URI(websocketUrl)) { session -> | |
logger.info("Coinbase Session started ${session.handshakeInfo}") | |
logger.info("Sending Subscription Message $subscriptionMessage") | |
session.send(Mono.just(session.textMessage(subscriptionMessage))) | |
.thenMany(session.receive() | |
.map { it.payloadAsText } | |
.doOnNext { logger.trace(it) } | |
.map { JsonMessageReader.deserialize(it) } | |
.subscribeWith(emitter)).then() | |
} | |
return emitter.doOnSubscribe { session.subscribe() } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment