Skip to content

Instantly share code, notes, and snippets.

@graph1zzlle
Last active December 19, 2018 12:54
Show Gist options
  • Save graph1zzlle/6ac547b938fa44580e8339c5d3b4c43f to your computer and use it in GitHub Desktop.
Save graph1zzlle/6ac547b938fa44580e8339c5d3b4c43f to your computer and use it in GitHub Desktop.
Binance Live Order Book using Akka Stream
/**
* https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#how-to-manage-a-local-order-book-correctly
* @param asset
* @param getSnaptshot
*/
class OrderBookFlow(
asset: Asset,
getSnaptshot: Asset => Future[OrderBook]
) extends GraphStage[FlowShape[OrderBookUpdateEvent, OrderBook]] {
val in = Inlet[OrderBookUpdateEvent]("BinanceOrderBookFlow.in")
val out = Outlet[OrderBook]("BinanceOrderBookFlow.out")
val shape: FlowShape[OrderBookUpdateEvent, OrderBook] =
FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with StageLogging {
var orderBookSnapshot: Option[OrderBook] = Option.empty[OrderBook]
var lastDepthUpdateEvent: Option[OrderBookUpdateEvent] =
Option.empty[OrderBookUpdateEvent]
val acquiringSnapshot = new AtomicBoolean(false)
val buffer: ListBuffer[OrderBookUpdateEvent] =
mutable.ListBuffer[OrderBookUpdateEvent]()
private def acquireSnapshot(): Unit = {
acquiringSnapshot.set(true)
getSnaptshot(asset).onComplete {
case Success(value) =>
acquiringSnapshot.set(false)
log.debug("Acquired snapshot, {}", value)
orderBookSnapshot = Some(value)
case Failure(ex) => failStage(ex)
}(materializer.executionContext)
}
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
// 6. While listening to the stream, each new event's firstUpdateId
// should be equal to the previous event's lastUpdateId + 1
lastDepthUpdateEvent match {
case Some(last) if elem.firstUpdateId != last.lastUpdateId + 1 =>
failStage(
OrderBookSynchronisationException(
s"""
| There is an issue while processing order book event stream,
| while listening to the stream, each new event's U should
| be equal to the previous event's u+1.
| previous=$last
| current=$elem
""".stripMargin
))
case _ => lastDepthUpdateEvent = Some(elem)
}
orderBookSnapshot match {
// 2. Buffer the events you receive from the stream
case None if !acquiringSnapshot.get =>
log.debug("Buffering {}", elem)
buffer.append(elem)
// 3. Get a depth snapshot from
// https://www.binance.com/api/v1/depth?symbol=[SYMBOL]&limit=1000
acquireSnapshot()
pull(in)
// 2. Buffer the events you receive from the stream
case None if acquiringSnapshot.get() =>
log.debug("Buffering {}", elem)
buffer.append(elem)
pull(in)
case Some(snapshot)
if buffer.nonEmpty && !acquiringSnapshot.get =>
log.debug("Processing buffer {}", buffer)
// 4. Drop any event where u is <= lastUpdateId in the snapshot
val toDrain =
buffer.filterNot(_.lastUpdateId <= snapshot.lastUpdateId)
log.debug(
"Dropped any event where u is <= lastUpdateId, {}",
toDrain)
// 5. The first processed should have
// firstUpdateId <= lastUpdateId+1 AND lastUpdateId >= lastUpdateId+1
toDrain.headOption.foreach {
firstProcessed =>
if (!(firstProcessed.firstUpdateId <= snapshot.lastUpdateId + 1 &&
firstProcessed.lastUpdateId >= snapshot.lastUpdateId + 1))
failStage(
OrderBookSynchronisationException(
s"""
| The first processed event should have
| U <= lastUpdateId+1 AND u >= lastUpdateId+1.
| U=${firstProcessed.firstUpdateId} u=${firstProcessed.lastUpdateId}
| snapshotLastUpdateId=${snapshot.lastUpdateId}
""".mkString("")
))
}
toDrain.foreach { value =>
log.debug("Updating {} with {}", snapshot, value)
// 7. The data in each event is the absolute quantity for a price level
// 8. If the quantity is 0, remove the price level
// 9. Receiving an event that removes a price level that is not in your local order book can happen and is normal.
snapshot.update(value)
}
buffer.clear()
push(out, snapshot)
case Some(snapshot) if buffer.isEmpty && !acquiringSnapshot.get =>
// 7. The data in each event is the absolute quantity for a price level
// 8. If the quantity is 0, remove the price level
// 9. Receiving an event that removes a price level that is not in your local order book can happen and is normal.
log.debug("Updating {} with {}", snapshot, elem)
snapshot.update(elem)
push(out, snapshot)
}
}
}
)
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
}
}
class OrderBookStream(
system: ActorSystem,
binanceApiConfiguration: BinanceApiConfiguration,
binancePublicRestApi: BinancePublicRestApi,
val asset: Asset,
publish: OrderBookStreamEvent => Unit
) extends LazyLogging {
private var _killSwitch: Option[Promise[Option[Message]]] =
Option.empty[Promise[Option[Message]]]
private final val decider: Supervision.Decider = {
case NonFatal(ex) =>
logger.error(ex.getMessage, ex)
ex match {
case _: OrderBookSynchronisationException => Supervision.Restart
case _ => Supervision.Resume
}
}
private final implicit val materializer: ActorMaterializer =
ActorMaterializer(
ActorMaterializerSettings(system)
.withSupervisionStrategy(decider)
)(system)
private val updateCounter =
AppMetrics.binanceLiveApi.refine("order-book-update-counter" -> asset.pair)
private final val processingFlow: Sink[Message, NotUsed] = Flow[Message]
.map(_.asScala.asTextMessage.getStrictText)
.map(parse(_).fold(ex => throw ex, result => result))
.map { ev =>
updateCounter.increment()
ev
}
.map(OrderBookUpdateEvent.deserialize)
.via(new OrderBookFlow(asset, binancePublicRestApi.orderBook))
.to(Sink.foreach(orderBook =>
publish(OrderBookStreamEvent(asset, orderBook))))
private val webSocketFlow: Flow[Message, Message, NotUsed] = {
RestartFlow.withBackoff(
minBackoff = 1.second,
maxBackoff = 10.seconds,
randomFactor = 0.1
)(
() =>
Http(system).webSocketClientFlow(WebSocketRequest(
binanceApiConfiguration.marketDataStreamUrl.depth(asset))))
}
def start(): Unit = {
val killSwitch = Source.maybe
.via(webSocketFlow)
.to(processingFlow)
.run()
_killSwitch = Some(killSwitch)
}
def stop(): Unit = {
_killSwitch match {
case Some(ks) => ks.success(None)
case _ =>
throw new IllegalStateException(
s"Couldn't stop a non-running collector")
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment