Skip to content

Instantly share code, notes, and snippets.

@zshannon
Created August 12, 2024 23:59
Show Gist options
  • Save zshannon/dc6addcba474d92c46edd4748c40eb6c to your computer and use it in GitHub Desktop.
Save zshannon/dc6addcba474d92c46edd4748c40eb6c to your computer and use it in GitHub Desktop.
AsyncSempaphore deadlock demo
import Semaphore
struct Example {
public func observe() async throws -> (AsyncThrowingStream<Int, Error>, AsyncSemaphore) {
let semaphore: AsyncSemaphore = .init(value: 0)
let stream: AsyncThrowingStream<Int, Error> = .init { continuation in
let forwardingTask = Task {
// simulate counter changing [0 -> 1]
continuation.yield(1)
try await Task.sleep(for: .milliseconds(50))
// simulate counter changing [1 -> 0]
print("continuation.yield(0)")
continuation.yield(0)
}
continuation.onTermination = { reason in
print("inside continuation.onTermination")
forwardingTask.cancel()
switch reason {
case .finished:
semaphore.signal()
default:
Task {
// NB: this is the goal of this pattern: want to support await on `continuation.onTermination`
// simulate doing async cleanup task by sleeping for a moment
try await Task.sleep(for: .milliseconds(20))
semaphore.signal()
}
}
}
}
return (stream, semaphore)
}
public func unreferenced() async throws {
let (stream, semaphore) = try await observe()
for try await count in stream {
if count == 0 { break }
}
print("broke out of AsyncStream")
// 1. `try await semaphore.waitUnlessCancelled()` deadlocks
try await semaphore.waitUnlessCancelled()
// 2. `try await Task.sleep(for: .milliseconds(30))` works fine
// try await Task.sleep(for: .milliseconds(30))
}
}
let example = Example()
Task {
print("try await example.unreferenced()")
try await example.unreferenced()
print("this never happens with AsyncSempahore :(")
}
@zshannon
Copy link
Author

Console output:

try await example.unreferenced()
continuation.yield(0)
broke out of AsyncStream

Program hangs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment