Skip to content

Instantly share code, notes, and snippets.

@cakper
Last active December 16, 2020 09:35
Show Gist options
  • Save cakper/688de4f199c26996d9964c4095fe3178 to your computer and use it in GitHub Desktop.
Save cakper/688de4f199c26996d9964c4095fe3178 to your computer and use it in GitHub Desktop.
private def send(records: Seq[ProducerRecord[String, V]]): Task[Seq[RecordMetadata]] =
Task.create[Seq[RecordMetadata]] { (s, cb) =>
val results = TrieMap.empty[Long, RecordMetadata]
val expectedSize = records.size
val asyncCb = Callback.forked(cb)(s)
val compositeCancelable = CompositeCancelable()
// Forcing asynchronous boundary
sc.executeAsync(() => {
records.foreach { record =>
val connection = StackedCancelable()
if (isCanceled.get()) {
asyncCb.onError(ProducerCancelled(s"Failed to produce message. The producer was canceled"))
} else {
val isActive = Atomic(true)
val cancelable = SingleAssignCancelable()
try {
// Force evaluation
val producer = producerRef
// Using asynchronous API
val future = producer.send(
record,
(meta: RecordMetadata, exception: Exception) => if (isActive.getAndSet(false)) {
connection.pop()
if (exception != null)
asyncCb.onError(exception)
else {
results.addOne(meta.offset(), meta)
if (results.size == expectedSize) {
asyncCb.onSuccess(results.toList.map(_._2))
}
}
} else if (exception != null) {
s.reportFailure(exception)
}
)
cancelable := Cancelable(() => future.cancel(false))
connection.push(cancelable)
} catch {
case NonFatal(ex) =>
// Needs synchronization, otherwise we are violating the contract
if (isActive.compareAndSet(expect = true, update = false)) {
connection.pop()
ex match {
case _: IllegalStateException if isCanceled.get() =>
asyncCb.onError(ProducerCancelled(s"Failed to produce message. The producer was canceled"))
case _ =>
asyncCb.onError(ex)
}
} else {
s.reportFailure(ex)
}
}
}
compositeCancelable.add(connection)
}
})
compositeCancelable
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment