Last active
December 19, 2018 12:54
-
-
Save graph1zzlle/6ac547b938fa44580e8339c5d3b4c43f to your computer and use it in GitHub Desktop.
Binance Live Order Book using Akka Stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#how-to-manage-a-local-order-book-correctly | |
* @param asset | |
* @param getSnaptshot | |
*/ | |
class OrderBookFlow( | |
asset: Asset, | |
getSnaptshot: Asset => Future[OrderBook] | |
) extends GraphStage[FlowShape[OrderBookUpdateEvent, OrderBook]] { | |
val in = Inlet[OrderBookUpdateEvent]("BinanceOrderBookFlow.in") | |
val out = Outlet[OrderBook]("BinanceOrderBookFlow.out") | |
val shape: FlowShape[OrderBookUpdateEvent, OrderBook] = | |
FlowShape.of(in, out) | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = | |
new GraphStageLogic(shape) with StageLogging { | |
var orderBookSnapshot: Option[OrderBook] = Option.empty[OrderBook] | |
var lastDepthUpdateEvent: Option[OrderBookUpdateEvent] = | |
Option.empty[OrderBookUpdateEvent] | |
val acquiringSnapshot = new AtomicBoolean(false) | |
val buffer: ListBuffer[OrderBookUpdateEvent] = | |
mutable.ListBuffer[OrderBookUpdateEvent]() | |
private def acquireSnapshot(): Unit = { | |
acquiringSnapshot.set(true) | |
getSnaptshot(asset).onComplete { | |
case Success(value) => | |
acquiringSnapshot.set(false) | |
log.debug("Acquired snapshot, {}", value) | |
orderBookSnapshot = Some(value) | |
case Failure(ex) => failStage(ex) | |
}(materializer.executionContext) | |
} | |
setHandler( | |
in, | |
new InHandler { | |
override def onPush(): Unit = { | |
val elem = grab(in) | |
// 6. While listening to the stream, each new event's firstUpdateId | |
// should be equal to the previous event's lastUpdateId + 1 | |
lastDepthUpdateEvent match { | |
case Some(last) if elem.firstUpdateId != last.lastUpdateId + 1 => | |
failStage( | |
OrderBookSynchronisationException( | |
s""" | |
| There is an issue while processing order book event stream, | |
| while listening to the stream, each new event's U should | |
| be equal to the previous event's u+1. | |
| previous=$last | |
| current=$elem | |
""".stripMargin | |
)) | |
case _ => lastDepthUpdateEvent = Some(elem) | |
} | |
orderBookSnapshot match { | |
// 2. Buffer the events you receive from the stream | |
case None if !acquiringSnapshot.get => | |
log.debug("Buffering {}", elem) | |
buffer.append(elem) | |
// 3. Get a depth snapshot from | |
// https://www.binance.com/api/v1/depth?symbol=[SYMBOL]&limit=1000 | |
acquireSnapshot() | |
pull(in) | |
// 2. Buffer the events you receive from the stream | |
case None if acquiringSnapshot.get() => | |
log.debug("Buffering {}", elem) | |
buffer.append(elem) | |
pull(in) | |
case Some(snapshot) | |
if buffer.nonEmpty && !acquiringSnapshot.get => | |
log.debug("Processing buffer {}", buffer) | |
// 4. Drop any event where u is <= lastUpdateId in the snapshot | |
val toDrain = | |
buffer.filterNot(_.lastUpdateId <= snapshot.lastUpdateId) | |
log.debug( | |
"Dropped any event where u is <= lastUpdateId, {}", | |
toDrain) | |
// 5. The first processed should have | |
// firstUpdateId <= lastUpdateId+1 AND lastUpdateId >= lastUpdateId+1 | |
toDrain.headOption.foreach { | |
firstProcessed => | |
if (!(firstProcessed.firstUpdateId <= snapshot.lastUpdateId + 1 && | |
firstProcessed.lastUpdateId >= snapshot.lastUpdateId + 1)) | |
failStage( | |
OrderBookSynchronisationException( | |
s""" | |
| The first processed event should have | |
| U <= lastUpdateId+1 AND u >= lastUpdateId+1. | |
| U=${firstProcessed.firstUpdateId} u=${firstProcessed.lastUpdateId} | |
| snapshotLastUpdateId=${snapshot.lastUpdateId} | |
""".mkString("") | |
)) | |
} | |
toDrain.foreach { value => | |
log.debug("Updating {} with {}", snapshot, value) | |
// 7. The data in each event is the absolute quantity for a price level | |
// 8. If the quantity is 0, remove the price level | |
// 9. Receiving an event that removes a price level that is not in your local order book can happen and is normal. | |
snapshot.update(value) | |
} | |
buffer.clear() | |
push(out, snapshot) | |
case Some(snapshot) if buffer.isEmpty && !acquiringSnapshot.get => | |
// 7. The data in each event is the absolute quantity for a price level | |
// 8. If the quantity is 0, remove the price level | |
// 9. Receiving an event that removes a price level that is not in your local order book can happen and is normal. | |
log.debug("Updating {} with {}", snapshot, elem) | |
snapshot.update(elem) | |
push(out, snapshot) | |
} | |
} | |
} | |
) | |
setHandler(out, new OutHandler { | |
override def onPull(): Unit = pull(in) | |
}) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class OrderBookStream( | |
system: ActorSystem, | |
binanceApiConfiguration: BinanceApiConfiguration, | |
binancePublicRestApi: BinancePublicRestApi, | |
val asset: Asset, | |
publish: OrderBookStreamEvent => Unit | |
) extends LazyLogging { | |
private var _killSwitch: Option[Promise[Option[Message]]] = | |
Option.empty[Promise[Option[Message]]] | |
private final val decider: Supervision.Decider = { | |
case NonFatal(ex) => | |
logger.error(ex.getMessage, ex) | |
ex match { | |
case _: OrderBookSynchronisationException => Supervision.Restart | |
case _ => Supervision.Resume | |
} | |
} | |
private final implicit val materializer: ActorMaterializer = | |
ActorMaterializer( | |
ActorMaterializerSettings(system) | |
.withSupervisionStrategy(decider) | |
)(system) | |
private val updateCounter = | |
AppMetrics.binanceLiveApi.refine("order-book-update-counter" -> asset.pair) | |
private final val processingFlow: Sink[Message, NotUsed] = Flow[Message] | |
.map(_.asScala.asTextMessage.getStrictText) | |
.map(parse(_).fold(ex => throw ex, result => result)) | |
.map { ev => | |
updateCounter.increment() | |
ev | |
} | |
.map(OrderBookUpdateEvent.deserialize) | |
.via(new OrderBookFlow(asset, binancePublicRestApi.orderBook)) | |
.to(Sink.foreach(orderBook => | |
publish(OrderBookStreamEvent(asset, orderBook)))) | |
private val webSocketFlow: Flow[Message, Message, NotUsed] = { | |
RestartFlow.withBackoff( | |
minBackoff = 1.second, | |
maxBackoff = 10.seconds, | |
randomFactor = 0.1 | |
)( | |
() => | |
Http(system).webSocketClientFlow(WebSocketRequest( | |
binanceApiConfiguration.marketDataStreamUrl.depth(asset)))) | |
} | |
def start(): Unit = { | |
val killSwitch = Source.maybe | |
.via(webSocketFlow) | |
.to(processingFlow) | |
.run() | |
_killSwitch = Some(killSwitch) | |
} | |
def stop(): Unit = { | |
_killSwitch match { | |
case Some(ks) => ks.success(None) | |
case _ => | |
throw new IllegalStateException( | |
s"Couldn't stop a non-running collector") | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment