Last active
December 16, 2020 09:35
-
-
Save cakper/688de4f199c26996d9964c4095fe3178 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private def send(records: Seq[ProducerRecord[String, V]]): Task[Seq[RecordMetadata]] = | |
Task.create[Seq[RecordMetadata]] { (s, cb) => | |
val results = TrieMap.empty[Long, RecordMetadata] | |
val expectedSize = records.size | |
val asyncCb = Callback.forked(cb)(s) | |
val compositeCancelable = CompositeCancelable() | |
// Forcing asynchronous boundary | |
sc.executeAsync(() => { | |
records.foreach { record => | |
val connection = StackedCancelable() | |
if (isCanceled.get()) { | |
asyncCb.onError(ProducerCancelled(s"Failed to produce message. The producer was canceled")) | |
} else { | |
val isActive = Atomic(true) | |
val cancelable = SingleAssignCancelable() | |
try { | |
// Force evaluation | |
val producer = producerRef | |
// Using asynchronous API | |
val future = producer.send( | |
record, | |
(meta: RecordMetadata, exception: Exception) => if (isActive.getAndSet(false)) { | |
connection.pop() | |
if (exception != null) | |
asyncCb.onError(exception) | |
else { | |
results.addOne(meta.offset(), meta) | |
if (results.size == expectedSize) { | |
asyncCb.onSuccess(results.toList.map(_._2)) | |
} | |
} | |
} else if (exception != null) { | |
s.reportFailure(exception) | |
} | |
) | |
cancelable := Cancelable(() => future.cancel(false)) | |
connection.push(cancelable) | |
} catch { | |
case NonFatal(ex) => | |
// Needs synchronization, otherwise we are violating the contract | |
if (isActive.compareAndSet(expect = true, update = false)) { | |
connection.pop() | |
ex match { | |
case _: IllegalStateException if isCanceled.get() => | |
asyncCb.onError(ProducerCancelled(s"Failed to produce message. The producer was canceled")) | |
case _ => | |
asyncCb.onError(ex) | |
} | |
} else { | |
s.reportFailure(ex) | |
} | |
} | |
} | |
compositeCancelable.add(connection) | |
} | |
}) | |
compositeCancelable | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment