Skip to content

Instantly share code, notes, and snippets.

@samidalouche
Created April 20, 2016 16:49
Show Gist options
  • Save samidalouche/8a317a55a888072751d95be93637bdc7 to your computer and use it in GitHub Desktop.
Save samidalouche/8a317a55a888072751d95be93637bdc7 to your computer and use it in GitHub Desktop.
public func streamMessages(from startOffset: Offset = Offset(value: 0), toExclusive endOffsetOpt : Offset? = .None,
includeTransient: Bool = true) -> SignalProducer<Message, NoError> {
func streamMessagesChunk(from: Offset) -> SignalProducer<Message, NoError> {
func waitForNewMessageAvailable(from: Offset) -> SignalProducer<Offset?, NoError> {
return self.lastOffsetIncludingTransient(includeTransient).producer
.filter{ offsetOpt in offsetOpt.map {offset in offset >= from } ?? false }
.take(1)
}
let streamMessagesProducer = self.fetchMessages(10, from: from, includeTransientMessages: includeTransient)
.flatMap(.Concat){ messages in SignalProducer<Message, NoError>(values: messages)}
return waitForNewMessageAvailable(from)
.then(streamMessagesProducer)
}
func streamNextBatch(from: Offset, observer: Observer<Message, NoError>, observerDisposable: CompositeDisposable) -> Void {
func hasReachedEndOffset(currentOffset: Offset) -> Bool {
return endOffsetOpt.map{ endOffset in endOffset == currentOffset } ?? false
}
print("StreamNextBatch \(from)")
streamMessagesChunk(from).startWithSignal { signal, signalDisposable in
var lastOffset: Offset = from
let disposableHandle = observerDisposable.addDisposable(signalDisposable)
signal.observe { switch $0 {
case let .Failed(error): observer.sendFailed(error)
case .Interrupted: observer.sendInterrupted()
case .Completed:
disposableHandle.remove()
streamNextBatch(lastOffset.next, observer: observer, observerDisposable: observerDisposable)
case .Next(let message):
if hasReachedEndOffset(message.offset) {
disposableHandle.remove()
observer.sendCompleted()
} else {
lastOffset = message.offset
observer.sendNext(message)
}
}
}
}
}
return SignalProducer<Message, NoError> { observer, observerDisposable in
streamNextBatch(startOffset, observer: observer, observerDisposable: observerDisposable)
}
}
func testShouldStreamMessagesWaitingForFutureMessages() {
let expectation = self.expectationWithDescription("Test")
let messages = (0...50000).map{value in self.createKafkaData(UInt64(value)) }
let nextMessages = (50001...65000).map{value in self.createKafkaData(UInt64(value)) }
try! self.sut.publishMessages(messages, persist: false).get()
let messageCountFuture = self.sut
.streamMessages(from: Offset(value: 45), toExclusive: Offset(value: 60000), includeTransient: true)
.observeOn(QueueScheduler())
.map{ m in print("sleeping at \(m.data)"); sleep(1); return 1 }
.reduce(0, +)
.toFuture()
messageCountFuture.onSuccess{ count in
expect(count) == 15
expectation.fulfill()
}
try! self.sut.publishMessages(nextMessages, persist: false).get()
self.waitForExpectationsWithTimeout(30, handler: nil)
}
func createKafkaData(number: UInt64) -> String {
return "message \(number)"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment