Created
February 27, 2017 22:45
-
-
Save CodaFi/0c2d2c94c708ebb2634bd20fafd5a113 to your computer and use it in GitHub Desktop.
[WIP] Session Types as good as I could get em. Ported from the Rust implementation by Philip Munksgaard.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Channel.swift | |
// Sessions | |
// | |
// Created by Robert Widmann on 2/27/17. | |
// Copyright © 2017 TypeLift. All rights reserved. | |
// | |
enum FakeError : Error { | |
case Error | |
} | |
/// Channels are unbounded FIFO streams of values with a read and write | |
/// terminals comprised of `MVar`s. | |
public struct Channel<A> { | |
fileprivate let readEnd : MVar<MVar<ChItem<A>>> | |
fileprivate let writeEnd : MVar<MVar<ChItem<A>>> | |
private init(read : MVar<MVar<ChItem<A>>>, write: MVar<MVar<ChItem<A>>>) { | |
self.readEnd = read | |
self.writeEnd = write | |
} | |
/// Creates and returns a new empty channel. | |
public init() { | |
let hole : MVar<ChItem<A>> = MVar() | |
let readVar = MVar(initial: hole) | |
let writeVar : MVar<MVar<ChItem<A>>> = MVar(initial: hole) | |
self.init(read: readVar, write: writeVar) | |
} | |
/// Reads a value from the channel. | |
public func read() -> A { | |
do { | |
return try self.readEnd.modify { readEnd in | |
let item : ChItem<A> = readEnd.read() | |
return (item.stream(), item.val()) | |
} | |
} catch _ { | |
fatalError("Fatal: Could not modify read head.") | |
} | |
} | |
/// Reads a value from the channel. | |
public func tryRead() throws -> A? { | |
do { | |
return try self.readEnd.modify { readEnd in | |
let item : Optional<ChItem<A>> = readEnd.tryRead() | |
guard let x = item else { | |
throw FakeError.Error | |
} | |
return (x.stream(), x.val()) | |
} | |
} catch _ { | |
return nil | |
} | |
} | |
/// Writes a value to a channel. | |
public func write(_ x : A) { | |
self.writeEnd.modify_ { old_hole in | |
let new_hole : MVar<ChItem<A>> = MVar() | |
old_hole.put(ChItem(x, new_hole)) | |
return new_hole | |
} | |
} | |
/// Writes a list of values to a channel. | |
public func writeList(_ xs : [A]) { | |
xs.forEach(self.write) | |
} | |
/// Returns whether the channel is empty. | |
/// | |
/// This function is just a snapshot of the state of the Chan at that point in | |
/// time. In heavily concurrent computations, this may change out from under | |
/// you without warning, or even by the time it can be acted on. It is better | |
/// to use one of the direct actions above. | |
public var isEmpty : Bool { | |
do { | |
return try self.readEnd.withMVar { r in | |
let w = r.tryRead() | |
return w == nil | |
} | |
} catch _ { | |
fatalError("Fatal: Could not determine emptiness; read of underlying MVar failed.") | |
} | |
} | |
/// Duplicates a channel. | |
/// | |
/// The duplicate channel begins empty, but data written to either channel | |
/// from then on will be available from both. Because both channels share the | |
/// same write end, data inserted into one channel may be read by both | |
/// channels. | |
public func duplicate() -> Channel<A> { | |
let hole = self.writeEnd.read() | |
let newReadVar = MVar(initial: hole) | |
return Channel(read: newReadVar, write: self.writeEnd) | |
} | |
/// Reads the entirety of the channel into an array. | |
public func contents() -> [A] { | |
if self.isEmpty { | |
return [] | |
} | |
let x = self.read() | |
let xs = self.contents() | |
return [x] + xs | |
} | |
} | |
internal struct ChItem<A> { | |
let val : () -> A | |
let stream : () -> MVar<ChItem<A>> | |
init(_ val : @autoclosure @escaping () -> A, _ stream : @autoclosure @escaping () -> MVar<ChItem<A>>) { | |
self.val = val | |
self.stream = stream | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// MVar.swift | |
// Sessions | |
// | |
// Created by Robert Widmann on 2/27/17. | |
// Copyright © 2017 TypeLift. All rights reserved. | |
// | |
/// `MVar`s (literally "Mutable Variables") are mutable references that are | |
/// either empty or contain a value of type `A`. In this way, they are a form of | |
/// synchronization primitive that can be used to make threads wait on a value | |
/// before proceeding with a computation. | |
/// | |
/// - Reading an empty `MVar` causes the reader to block. | |
/// | |
/// - Reading a filled 'MVar' empties it, returns a value, and potentially wakes | |
/// up a blocked writer. | |
/// | |
/// - Writing to an empty 'MVar' fills it with a value and potentially wakes up | |
/// a blocked reader. | |
/// | |
/// - Writing to a filled 'MVar' causes the writer to block. | |
public final class MVar<A> { | |
private var val : A? | |
private let lock : UnsafeMutablePointer<pthread_mutex_t> | |
private let takeCond : UnsafeMutablePointer<pthread_cond_t> | |
private let putCond : UnsafeMutablePointer<pthread_cond_t> | |
/// Creates a new empty `MVar`. | |
public init() { | |
self.val = .none | |
self.lock = UnsafeMutablePointer.allocate(capacity: MemoryLayout<pthread_mutex_t>.size) | |
self.takeCond = UnsafeMutablePointer.allocate(capacity: MemoryLayout<pthread_cond_t>.size) | |
self.putCond = UnsafeMutablePointer.allocate(capacity: MemoryLayout<pthread_cond_t>.size) | |
pthread_mutex_init(self.lock, nil) | |
pthread_cond_init(self.takeCond, nil) | |
pthread_cond_init(self.putCond, nil) | |
} | |
/// Creates a new `MVar` containing the supplied value. | |
public convenience init(initial : A) { | |
self.init() | |
self.put(initial) | |
} | |
/// Returns the contents of the `MVar`. | |
/// | |
/// If the `MVar` is empty, this will block until a value is put into the | |
/// `MVar`. If the `MVar` is full, the value is returned and the `MVar` is | |
/// emptied. | |
public func take() -> A { | |
pthread_mutex_lock(self.lock) | |
while self.val == nil { | |
pthread_cond_wait(self.takeCond, self.lock) | |
} | |
let value = self.val! | |
self.val = .none | |
pthread_cond_signal(self.putCond) | |
pthread_mutex_unlock(self.lock) | |
return value | |
} | |
/// Atomically reads the contents of the `MVar`. | |
/// | |
/// If the `MVar` is currently empty, this will block until a value is put | |
/// into it. If the `MVar` is full, the value is returned, but the `MVar` | |
/// remains full. | |
public func read() -> A { | |
pthread_mutex_lock(self.lock) | |
while self.val == nil { | |
pthread_cond_wait(self.takeCond, self.lock) | |
} | |
let value = self.val! | |
pthread_cond_signal(self.putCond) | |
pthread_mutex_unlock(self.lock) | |
return value | |
} | |
/// Puts a value into the `MVar`. | |
/// | |
/// If the `MVar` is currently full, the function will block until it becomes | |
/// empty again. | |
public func put(_ x : A) { | |
pthread_mutex_lock(self.lock) | |
while self.val != nil { | |
pthread_cond_wait(self.putCond, self.lock) | |
} | |
self.val = x | |
pthread_cond_signal(self.takeCond) | |
pthread_mutex_unlock(self.lock) | |
return () | |
} | |
/// Attempts to return the contents of the `MVar` without blocking. | |
/// | |
/// If the `MVar` is empty, this will immediately return .none. If the `MVar` | |
/// is full, the value is returned and the `MVar` is emptied. | |
public func tryTake() -> Optional<A> { | |
pthread_mutex_lock(self.lock) | |
if self.val == nil { | |
return .none | |
} | |
let value = self.val! | |
self.val = .none | |
pthread_cond_signal(self.putCond) | |
pthread_mutex_unlock(self.lock) | |
return value | |
} | |
/// Attempts to put a value into the `MVar` without blocking. | |
/// | |
/// If the `MVar` is empty, this will immediately returns true. If the `MVar` | |
/// is full, nothing occurs and false is returned. | |
public func tryPut(_ x : A) -> Bool { | |
pthread_mutex_lock(self.lock) | |
if self.val != nil { | |
return false | |
} | |
self.val = x | |
pthread_cond_signal(self.takeCond) | |
pthread_mutex_unlock(self.lock) | |
return true | |
} | |
/// Attempts to read the contents of the `MVar` without blocking. | |
/// | |
/// If the `MVar` is empty, this function returns .none. If the `MVar` is full, | |
/// this function wraps the value in .some and returns. | |
public func tryRead() -> Optional<A> { | |
pthread_mutex_lock(self.lock) | |
if self.val == nil { | |
return .none | |
} | |
let value = self.val! | |
pthread_cond_signal(self.putCond) | |
pthread_mutex_unlock(self.lock) | |
return value | |
} | |
/// Returns whether the `MVar` is empty. | |
/// | |
/// This function is just a snapshot of the state of the `MVar` at that point in | |
/// time. In heavily concurrent computations, this may change out from under | |
/// you without warning, or even by the time it can be acted on. It is better | |
/// to use one of the direct actions above. | |
public var isEmpty : Bool { | |
return (self.val == nil) | |
} | |
/// Atomically, take a value from the `MVar`, put a given new value in the | |
/// `MVar`, then return the `MVar`'s old value. | |
public func swap(_ x : A) -> A { | |
let old = self.take() | |
self.put(x) | |
return old | |
} | |
/// An exception-safe way of using the value in the `MVar` in a computation. | |
/// | |
/// On exception, the value previously stored in the `MVar` is put back into it | |
/// and the exception is rethrown. | |
public func withMVar<B>(_ f : (A) throws -> B) throws -> B { | |
let a = self.take() | |
do { | |
let b = try f(a) | |
self.put(a) | |
return b | |
} catch let e { | |
self.put(a) | |
throw e | |
} | |
} | |
/// An exception-safe way to modify the contents of the `MVar`. On | |
/// successful modification, the new value of the `MVar` is returned. | |
/// | |
/// On exception, the value previously stored in the `MVar` is put back into it. | |
public func modify<B>(_ f : (A) throws -> (A, B)) throws -> B { | |
let a = self.take() | |
do { | |
let t = try f(a) | |
self.put(t.0) | |
return t.1 | |
} catch let e { | |
self.put(a) | |
throw e | |
} | |
} | |
/// An exception-safe way to modify the contents of the `MVar`. | |
/// | |
/// On exception, the value previously stored in the `MVar` is put back into it. | |
public func modify_(_ f : (A) throws -> A) { | |
let a = self.take() | |
do { | |
let a1 = try f(a) | |
self.put(a1) | |
} catch _ { | |
self.put(a) | |
} | |
} | |
deinit { | |
self.lock.deinitialize() | |
self.takeCond.deinitialize() | |
self.putCond.deinitialize() | |
} | |
} | |
/// Equality over `MVar`s. | |
/// | |
/// Two `MVar`s are equal if they both contain no value or if the values they | |
/// contain are equal. This particular definition of equality is time-dependent | |
/// and fundamentally unstable. By the time two `MVar`s can be read and | |
/// compared for equality, one may have already lost its value, or may have had | |
/// its value swapped out from under you. It is better to `take()` the values | |
/// yourself if you need a stricter equality. | |
public func ==<A : Equatable>(lhs : `MVar`<A>, rhs : `MVar`<A>) -> Bool { | |
if lhs.isEmpty && !rhs.isEmpty { | |
return true | |
} | |
if lhs.isEmpty != rhs.isEmpty { | |
return false | |
} | |
return lhs.read() == rhs.read() | |
} | |
#if os(Linux) | |
import Glibc | |
#else | |
import Darwin | |
#endif |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Sessions.swift | |
// Sessions | |
// | |
// Created by Robert Widmann on 2/27/17. | |
// Copyright © 2017 TypeLift. All rights reserved. | |
// | |
import Dispatch | |
//: This is an implementation of *session types* in Swift. | |
//: | |
//: The channels in Swifts standard library are useful for a great many things, | |
//: but they're restricted to a single type. Session types allows one to use a | |
//: single channel for transferring values of different types, depending on the | |
//: context in which it is used. Specifically, a session typed channel always | |
//: carry a *protocol*, which dictates how communication is to take place. | |
//: | |
//: For example, imagine that two threads, `A` and `B` want to communicate with | |
//: the following pattern: | |
//: | |
//: 1. `A` sends an integer to `B`. | |
//: 2. `B` sends a Boolean to `A` depending on the integer received. | |
//: | |
//: With session types, this could be done by sharing a single channel. From | |
//: `A`'s point of view, it would have the typealias `int ! (Bool ? eps)` where `t ! r` | |
//: is the protocol "send something of typealias `t` then proceed with | |
//: protocol `r`", the protocol `t ? r` is "receive something of typealias `t` then proceed | |
//: with protocol `r`, and `eps` is a special marker indicating the end of a | |
//: communication session. | |
//: | |
//: Our session typealias library allows the user to create channels that adhere to a | |
//: specified protocol. For example, a channel like the above would have the type | |
//: `Chan<(), Send<Int64, Recv<Bool, Eps>>>`, and the full program could look like this: | |
//: | |
//: ``` | |
//: typealias Server = Recv<Int64, Send<Bool, Eps>>; | |
//: typealias Client = Send<Int64, Recv<Bool, Eps>>; | |
//: | |
//: func srv(_ c: Chan<(), Server>) { | |
//: let (c, n) = c.recv(); | |
//: if n % 2 == 0 { | |
//: c.send(true).close() | |
//: } else { | |
//: c.send(false).close() | |
//: } | |
//: } | |
//: | |
//: func cli(_ c: Chan<(), Client>) { | |
//: let n = 42; | |
//: let c = c.send(n); | |
//: let (c, b) = c.recv(); | |
//: | |
//: if b { | |
//: print("\(n) is even"); | |
//: } else { | |
//: print("\(n) is odd"); | |
//: } | |
//: | |
//: c.close(); | |
//: } | |
//: | |
//: connect(srv, cli); | |
//: ``` | |
/// A session typed channel. `P` is the protocol and `E` is the environment, | |
/// containing potential recursion targets | |
struct AnyChannel<E, P> { | |
fileprivate let r : MVar<MVar<ChItem<Any>>> | |
fileprivate let s : MVar<MVar<ChItem<Any>>> | |
fileprivate init(read : MVar<MVar<ChItem<Any>>>, write: MVar<MVar<ChItem<Any>>>) { | |
self.r = read | |
self.s = write | |
} | |
public init() { | |
let hole : MVar<ChItem<Any>> = MVar() | |
let readVar = MVar(initial: hole) | |
let writeVar : MVar<MVar<ChItem<Any>>> = MVar(initial: hole) | |
self.init(read: readVar, write: writeVar) | |
} | |
func transmute<F, T>(as : (F.Type, T.Type)) -> AnyChannel<F, T> { | |
return AnyChannel<F, T>(read: self.r, write: self.s) | |
} | |
/// Reads a value from the channel. | |
public func read<A>() -> A { | |
do { | |
let xx : Any = try self.r.modify { readEnd in | |
let item : ChItem<Any> = readEnd.read() | |
return (item.stream(), item.val()) | |
} | |
return xx as! A | |
} catch _ { | |
fatalError("Fatal: Could not modify read head.") | |
} | |
} | |
/// Writes a value to a channel. | |
public func write<A>(_ x : A) { | |
self.s.modify_ { old_hole in | |
let new_hole : MVar<ChItem<Any>> = MVar() | |
old_hole.put(ChItem(x as Any, new_hole)) | |
return new_hole | |
} | |
} | |
} | |
/// Peano numbers: Zero | |
struct Z {} | |
/// Peano numbers: Increment | |
struct S<N> {} | |
/// End of communication session (epsilon) | |
struct Eps : Op {} | |
/// Receive `A`, then `P` | |
protocol _Recv : Op { associatedtype _A; associatedtype _P } | |
struct Recv<A, P : Op> : _Recv { typealias _A = A; typealias _P = P } | |
/// Send `A`, then `P` | |
protocol _Send : Op { associatedtype _A; associatedtype _P } | |
struct Send<A, P : Op> : _Send { typealias _A = A; typealias _P = P } | |
/// Active choice between `P` and `Q` | |
protocol _Choose : Op { associatedtype _P; associatedtype _Q } | |
struct Choose<P : Op, Q : Op> : _Choose { typealias _P = P; typealias _Q = Q } | |
/// Passive choice (offer) between `P` and `Q` | |
protocol _Offer : Op { associatedtype _P; associatedtype _Q } | |
struct Offer<P : Op, Q : Op> : _Offer { typealias _P = P; typealias _Q = Q } | |
/// Enter a recursive environment | |
protocol _Rec : Op { associatedtype _P } | |
struct Rec<P : Op> : _Rec { typealias _P = P } | |
/// Recurse. N indicates how many layers of the recursive environment we recurse | |
/// out of. | |
struct Var<N> : Op {} | |
protocol _Tuple2 { associatedtype _L; associatedtype _R } | |
struct Tuple2<L, R> : _Tuple2 { typealias _L = L; typealias _R = R } | |
protocol Op {} | |
protocol HasDual : Op { | |
associatedtype Dual : Op | |
} | |
extension Eps : HasDual { | |
typealias Dual = Eps | |
} | |
extension Send where P : HasDual { | |
typealias Dual = Recv<A, P.Dual> | |
} | |
extension Recv where P : HasDual { | |
typealias Dual = Send<A, P.Dual> | |
} | |
extension Choose where P : HasDual, Q : HasDual { | |
typealias Dual = Offer<P.Dual, Q.Dual> | |
} | |
extension Offer where P : HasDual, Q : HasDual { | |
typealias Dual = Choose<P.Dual, Q.Dual> | |
} | |
extension Var where N == Z { | |
typealias Dual = Var<Z> | |
} | |
extension Rec where P : HasDual { | |
typealias Dual = Rec<P.Dual> | |
} | |
indirect enum Branch<L, R> { | |
case Left(L) | |
case Right(R) | |
} | |
/* | |
unsafe impl <N> HasDual for Var<S<N>> { | |
typealias Dual = Var<S<N>>; | |
} | |
*/ | |
extension AnyChannel where P == Eps { | |
/// Close a channel. Should always be used at the end of your program. | |
func close() { | |
/* | |
// This method cleans up the channel without running the panicky destructor | |
// In essence, it calls the drop glue bypassing the `Drop::drop` method | |
use std::mem; | |
// Create some dummy values to place the real things inside | |
// This is safe because nobody will read these | |
// mem::swap uses a similar technique (also paired with `forget()`) | |
let mut sender = unsafe { mem::uninitialized() }; | |
let mut receiver = unsafe { mem::uninitialized() }; | |
// Extract the internal sender/receiver so that we can drop them | |
// We cannot drop directly since moving out of a type | |
// that implements `Drop` is disallowed | |
mem::swap(&mut self.0, &mut sender); | |
mem::swap(&mut self.1, &mut receiver); | |
drop(sender);drop(receiver); // drop them | |
// Ensure Chan destructors don't run so that we don't panic | |
// This also ensures that the uninitialized values don't get | |
// read at any point | |
mem::forget(self); | |
*/ | |
} | |
} | |
extension AnyChannel where P : _Send { | |
/// Send a value of typealias `A` over the channel. Returns a channel with | |
/// protocol `P. | |
func send(_ v : P._A) -> AnyChannel<E, P._P> { | |
self.write(v) | |
return self.transmute(as: (E.self, P._P.self)) | |
} | |
} | |
extension AnyChannel where P : _Recv { | |
/// Receives a value of typealias `A` from the channel. Returns a tuple | |
/// containing the resulting channel and the received value. | |
func recv() -> (AnyChannel<E, P._P>, P._A) { | |
let v : P._A = self.read() | |
return (self.transmute(as: (E.self, P._P.self)), v) | |
} | |
} | |
extension AnyChannel where P : _Choose { | |
/// Perform an active choice, selecting protocol `P`. | |
func sel1() -> AnyChannel<E, P._P> { | |
self.write(true) | |
return self.transmute(as: (E.self, P._P.self)) | |
} | |
/// Perform an active choice, selecting protocol `Q`. | |
func sel2() -> AnyChannel<E, P._Q> { | |
self.write(false) | |
return self.transmute(as: (E.self, P._Q.self)) | |
} | |
} | |
extension AnyChannel where P : _Offer { | |
/// Passive choice. This allows the other end of the channel to select one | |
/// of two options for continuing the protocol: either `P` or `Q`. | |
func offer() -> Branch<AnyChannel<E, P._P>, AnyChannel<E, P._Q>> { | |
let b : Bool = self.read() | |
if b { | |
return Branch<AnyChannel<E, P._P>, AnyChannel<E, P._Q>>.Left(self.transmute(as: (E.self, P._P.self))) | |
} else { | |
return Branch<AnyChannel<E, P._P>, AnyChannel<E, P._Q>>.Right(self.transmute(as: (E.self, P._Q.self))) | |
} | |
} | |
func withOffer<T>(_ f : (Branch<AnyChannel<E, P._P>, AnyChannel<E, P._Q>>) -> T) -> T { | |
return f(self.offer()) | |
} | |
} | |
extension AnyChannel where P : _Rec { | |
/// Enter a recursive environment, putting the current environment on the | |
/// top of the environment stack. | |
func enter() -> AnyChannel<Tuple2<P._P, E>, P._P> { | |
return self.transmute(as: (Tuple2<P._P, E>.self, P._P.self)) | |
} | |
} | |
/* | |
impl<E, P> Chan<E, Rec<P>> { | |
#[must_use] | |
pub fn enter(self) -> Chan<(P, E), P> { | |
unsafe { transmute(self) } | |
} | |
} | |
*/ | |
/// Returns two session channels | |
func session_channel<P : HasDual>() -> (AnyChannel<(), P>, AnyChannel<(), P.Dual>) { | |
let c1 = AnyChannel<(), P>() | |
let c2 = AnyChannel<(), P.Dual>() | |
return (c1, c2) | |
} | |
/// Connect two functions using a session typed channel. | |
func connect<P : HasDual>(_ srv : (AnyChannel<(), P>) -> (), _ cli : (AnyChannel<(), P.Dual>) -> ()) { | |
let (c1, c2) : (AnyChannel<(), P>, AnyChannel<(), P.Dual>) = session_channel() | |
withoutActuallyEscaping(srv) { clo in | |
DispatchQueue.global().async(execute: { | |
clo(c1) | |
}) | |
} | |
cli(c2) | |
} | |
func _session_channel<P : Op, U : Op>() -> (AnyChannel<(), P>, AnyChannel<(), U>) { | |
let c1 = AnyChannel<(), P>() | |
let c2 = AnyChannel<(), U>() | |
return (AnyChannel<(), P>(read: c1.r, write: c2.s), AnyChannel<(), U>(read: c2.s, write: c1.r)) | |
} | |
func _connect<P : Op, U : Op>(_ srv : (AnyChannel<(), P>) -> (), _ cli : (AnyChannel<(), U>) -> ()) { | |
let (c1, c2) : (AnyChannel<(), P>, AnyChannel<(), U>) = _session_channel() | |
withoutActuallyEscaping(srv) { clo in | |
DispatchQueue.global().async(execute: { | |
clo(c1) | |
}) | |
} | |
cli(c2) | |
} | |
extension AnyChannel where E : _Tuple2, P == Var<Z> { | |
func zero() -> AnyChannel<E, E._L> { | |
return self.transmute(as: (E.self, E._L.self)) | |
} | |
} | |
/* | |
impl<E, P> Chan<(P, E), Var<Z>> { | |
/// Recurse to the environment on the top of the environment stack. | |
#[must_use] | |
pub fn zero(self) -> Chan<(P, E), P> { | |
unsafe { transmute(self) } | |
} | |
} | |
impl<E, P, N> Chan<(P, E), Var<S<N>>> { | |
/// Pop the top environment from the environment stack. | |
#[must_use] | |
pub fn succ(self) -> Chan<E, Var<N>> { | |
unsafe { transmute(self) } | |
} | |
} | |
/// Convenience function. This is identical to `.sel2()` | |
impl<Z, A, B> Chan<Z, Choose<A, B>> { | |
#[must_use] | |
pub fn skip(self) -> Chan<Z, B> { | |
self.sel2() | |
} | |
} | |
*/ | |
extension AnyChannel where E == Z, P : _Choose, P._Q : _Choose { | |
/// Convenience function. This is identical to `.sel2().sel2()` | |
func skip2() -> AnyChannel<Z, P._Q._Q> { | |
return self.sel2().sel2() | |
} | |
} | |
extension AnyChannel where E == Z, P : _Choose, P._Q : _Choose, P._Q._Q : _Choose { | |
/// Convenience function. This is identical to `.sel2().sel2().sel2()` | |
func skip3() -> AnyChannel<Z, P._Q._Q._Q> { | |
return self.sel2().sel2().sel2() | |
} | |
} | |
extension AnyChannel where E == Z, P : _Choose, P._Q : _Choose, P._Q._Q : _Choose, P._Q._Q._Q : _Choose { | |
/// Convenience function. This is identical to `.sel2().sel2().sel2().sel2()` | |
func skip4() -> AnyChannel<Z, P._Q._Q._Q._Q> { | |
return self.sel2().sel2().sel2().sel2() | |
} | |
} | |
/* | |
/// Convenience function. This is identical to `.sel2().sel2().sel2().sel2().sel2()` | |
impl<Z, A, B, C, D, E, F> Chan<Z, Choose<A, Choose<B, Choose<C, Choose<D, | |
Choose<E, F>>>>>> { | |
#[must_use] | |
pub fn skip5(self) -> Chan<Z, F> { | |
self.sel2().sel2().sel2().sel2().sel2() | |
} | |
} | |
/// Convenience function. | |
impl<Z, A, B, C, D, E, F, G> Chan<Z, Choose<A, Choose<B, Choose<C, Choose<D, | |
Choose<E, Choose<F, G>>>>>>> { | |
#[must_use] | |
pub fn skip6(self) -> Chan<Z, G> { | |
self.sel2().sel2().sel2().sel2().sel2().sel2() | |
} | |
} | |
/// Convenience function. | |
impl<Z, A, B, C, D, E, F, G, H> Chan<Z, Choose<A, Choose<B, Choose<C, Choose<D, | |
Choose<E, Choose<F, Choose<G, H>>>>>>>> { | |
#[must_use] | |
pub fn skip7(self) -> Chan<Z, H> { | |
self.sel2().sel2().sel2().sel2().sel2().sel2().sel2() | |
} | |
} | |
/// Homogeneous select. We have a vector of channels, all obeying the same | |
/// protocol (and in the exact same point of the protocol), wait for one of them | |
/// to receive. Removes the receiving channel from the vector and returns both | |
/// the channel and the new vector. | |
#[cfg(feature = "chan_select")] | |
#[must_use] | |
pub fn hselect<E, P, A>(mut chans: Vec<Chan<E, Recv<A, P>>>) | |
-> (Chan<E, Recv<A, P>>, Vec<Chan<E, Recv<A, P>>>) | |
{ | |
let i = iselect(&chans); | |
let c = chans.remove(i); | |
(c, chans) | |
} | |
/// An alternative version of homogeneous select, returning the index of the Chan | |
/// that is ready to receive. | |
#[cfg(feature = "chan_select")] | |
pub fn iselect<E, P, A>(chans: &Vec<Chan<E, Recv<A, P>>>) -> usize { | |
let mut map = HashMap::new(); | |
let id = { | |
let sel = Select::new(); | |
let mut handles = Vec::with_capacity(chans.len()); // collect all the handles | |
for (i, chan) in chans.iter().enumerate() { | |
let &Chan(_, ref rx, _) = chan; | |
let handle = sel.handle(rx); | |
map.insert(handle.id(), i); | |
handles.push(handle); | |
} | |
for handle in handles.iter_mut() { // Add | |
unsafe { handle.add(); } | |
} | |
let id = sel.wait(); | |
for handle in handles.iter_mut() { // Clean up | |
unsafe { handle.remove(); } | |
} | |
id | |
}; | |
map.remove(&id).unwrap() | |
} | |
/// Heterogeneous selection structure for channels | |
/// | |
/// This builds a structure of channels that we wish to select over. This is | |
/// structured in a way such that the channels selected over cannot be | |
/// interacted with (consumed) as long as the borrowing ChanSelect object | |
/// exists. This is necessary to ensure memory safety. | |
/// | |
/// The typealias parameter T is a return type, ie we store a value of some typealias T | |
/// that is returned in case its associated channels is selected on `wait()` | |
#[cfg(feature = "chan_select")] | |
struct ChanSelect<'c, T> { | |
chans: Vec<(&'c Chan<(), ()>, T)>, | |
} | |
#[cfg(feature = "chan_select")] | |
impl<'c, T> ChanSelect<'c, T> { | |
pub fn new() -> ChanSelect<'c, T> { | |
ChanSelect { | |
chans: Vec::new() | |
} | |
} | |
/// Add a channel whose next step is `Recv` | |
/// | |
/// Once a channel has been added it cannot be interacted with as long as it | |
/// is borrowed here (by virtue of borrow checking and lifetimes). | |
pub fn add_recv_ret<E, P, A: marker::Send>(&mut self, | |
chan: &'c Chan<E, Recv<A, P>>, | |
ret: T) | |
{ | |
self.chans.push((unsafe { transmute(chan) }, ret)); | |
} | |
pub fn add_offer_ret<E, P, Q>(&mut self, | |
chan: &'c Chan<E, Offer<P, Q>>, | |
ret: T) | |
{ | |
self.chans.push((unsafe { transmute(chan) }, ret)); | |
} | |
/// Find a Receiver (and hence a Chan) that is ready to receive. | |
/// | |
/// This method consumes the ChanSelect, freeing up the borrowed Receivers | |
/// to be consumed. | |
pub fn wait(self) -> T { | |
let sel = Select::new(); | |
let mut handles = Vec::with_capacity(self.chans.len()); | |
let mut map = HashMap::new(); | |
for (chan, ret) in self.chans.into_iter() { | |
let &Chan(_, ref rx, _) = chan; | |
let h = sel.handle(rx); | |
let id = h.id(); | |
map.insert(id, ret); | |
handles.push(h); | |
} | |
for handle in handles.iter_mut() { | |
unsafe { handle.add(); } | |
} | |
let id = sel.wait(); | |
for handle in handles.iter_mut() { | |
unsafe { handle.remove(); } | |
} | |
map.remove(&id).unwrap() | |
} | |
/// How many channels are there in the structure? | |
pub fn len(&self) -> usize { | |
self.chans.len() | |
} | |
} | |
/// Default use of ChanSelect works with usize and returns the index | |
/// of the selected channel. This is also the implementation used by | |
/// the `chan_select!` macro. | |
#[cfg(feature = "chan_select")] | |
impl<'c> ChanSelect<'c, usize> { | |
pub fn add_recv<E, P, A: marker::Send>(&mut self, | |
c: &'c Chan<E, Recv<A, P>>) | |
{ | |
let index = self.chans.len(); | |
self.add_recv_ret(c, index); | |
} | |
pub fn add_offer<E, P, Q>(&mut self, | |
c: &'c Chan<E, Offer<P, Q>>) | |
{ | |
let index = self.chans.len(); | |
self.add_offer_ret(c, index); | |
} | |
} | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// SessionsTests.swift | |
// SessionsTests | |
// | |
// Created by Robert Widmann on 2/27/17. | |
// Copyright © 2017 TypeLift. All rights reserved. | |
// | |
import XCTest | |
@testable import Sessions | |
import Dispatch | |
class Generics<R : Op, S : Op, T : Op> { | |
class SessionsTests: XCTestCase { | |
func testSession() { | |
{ | |
let (c1, c2) : (AnyChannel<(), Rec<Server>>, AnyChannel<(), Rec<AddCli<R>>>) = _session_channel() | |
DispatchQueue.global().async(execute: { | |
server(c1) | |
}) | |
add_client(c2) | |
}(); | |
{ | |
let (c1, c2) : (AnyChannel<(), Rec<Server>>, AnyChannel<(), Rec<SqrtCli<R, S, T>>>) = _session_channel() | |
DispatchQueue.global().async(execute: { | |
server(c1) | |
}) | |
sqrt_client(c2) | |
}(); | |
{ | |
let (c1, c2) : (AnyChannel<(), Rec<Server>>, AnyChannel<(), Rec<PrimeCli<R, S, T>>>) = _session_channel() | |
DispatchQueue.global().async(execute: { | |
server(c1) | |
}) | |
fn_client(c2) | |
}(); | |
// connect(server, neg_client) | |
// _connect(server, sqrt_client) | |
// _connect(server, fn_client) | |
let (c1, c1_) : (AnyChannel<(), Rec<Server>>, AnyChannel<(), Rec<Server>>) = _session_channel() | |
DispatchQueue.global().async { | |
server(c1) | |
} | |
// DispatchQueue.global().async { | |
// ask_neg(c1_, c2) | |
// } | |
// DispatchQueue.global().async { | |
// get_neg(c2_) | |
// } | |
} | |
func testConnection() { | |
func server(c: AnyChannel<(), Eps>) { | |
c.close() | |
} | |
func client(c: AnyChannel<(), Eps>) { | |
c.close() | |
} | |
connect(server, client); | |
} | |
func testManyClients() { | |
typealias Server = Recv<UInt32, Choose<Send<UInt32, Eps>, Eps>> | |
// typealias Client = Server.Dual | |
typealias Client = Send<UInt32, Offer<Recv<UInt32, Eps>, Eps>> | |
func server_handler(_ c: AnyChannel<(), Server>) { | |
let (c, n) = c.recv(); | |
let r = n.advanced(by: 42) | |
let overflow = r < n | |
if overflow { | |
return c.sel2().close() | |
} else { | |
return c.sel1().send(r).close() | |
} | |
} | |
func server(_ rx: Channel<AnyChannel<(), Server>>) { | |
var count = 0; | |
repeat { | |
do { | |
guard let c = try rx.tryRead() else { | |
break | |
} | |
DispatchQueue.global().async { | |
server_handler(c) | |
} | |
count += 1; | |
} catch _ { | |
break | |
} | |
} while true | |
print("Handled \(count) connections"); | |
} | |
func client_handler(_ c: AnyChannel<(), Client>) { | |
let n = arc4random() | |
switch c.send(n).offer() { | |
case let .Left(c): | |
let (c, n2) = c.recv(); | |
c.close() | |
print("\(n) + 42 = \(n + n2)"); | |
case let .Right(c): | |
c.close() | |
print("\(n) + 42 is an overflow :(", n); | |
} | |
} | |
let tx = Channel<AnyChannel<(), Server>>(); | |
for _ in (0...(arc4random() % 32)) { | |
let tmp = tx.duplicate(); | |
DispatchQueue.global().async { | |
let (c1, c2) : (AnyChannel<(), Server>, AnyChannel<(), Client>) = _session_channel(); | |
tmp.write(c1) | |
client_handler(c2) | |
} | |
} | |
server(tx); | |
} | |
} | |
} | |
typealias Server = | |
Offer<Eps, | |
Offer<Recv<Int64, Recv<Int64, Send<Int64, Var<Z>>>>, | |
Offer<Recv<Int64, Send<Int64, Var<Z>>>, | |
Offer<Recv<Double, Choose<Send<Double, Var<Z>>, Var<Z>>>, | |
Recv<(Int64) -> Bool, Recv<Int64, Send<Bool, Var<Z>>>>>>>> | |
func server(_ c: AnyChannel<(), Rec<Server>>) { | |
var c = c.enter(); | |
repeat { | |
c = c.withOffer { c in | |
switch c { | |
case let .Left(c): // CLOSE | |
c.close() | |
fatalError() | |
case let .Right(v): | |
switch v.offer() { | |
case let .Left(c): // ADD | |
let (c, n) = c.recv() | |
let (c2, m) = c.recv() | |
return c2.send(n + m).zero() | |
case let .Right(v): | |
switch v.offer() { | |
case let .Left(c): // NEGATE | |
let (c, n) = c.recv() | |
return c.send(-n).zero() | |
case let .Right(v): | |
switch v.offer() { | |
case let .Left(c): // SQRT | |
let (c, x) = c.recv() | |
if x >= 0.0 { | |
return c.sel1().send(x.squareRoot()).zero() | |
} else { | |
return c.sel2().zero() | |
} | |
case let .Right(c): // EVALUATE | |
let (c, f) = c.recv() | |
let (c2, n) = c.recv() | |
return c2.send(f(n)).zero() | |
} | |
} | |
} | |
} | |
} | |
} while true | |
} | |
// `add_client`, `neg_client` and `sqrt_client` are all pretty straightforward | |
// uses of session types, but they do showcase subtyping, recursion and how to | |
// work the types in general. | |
typealias AddCli<R : Op> = | |
Choose<Eps, | |
Choose<Send<Int64, Send<Int64, Recv<Int64, Var<Z>>>>, R>> | |
func add_client<R : Op>(_ c: AnyChannel<(), Rec<AddCli<R>>>) { | |
let (c, n) = c.enter().sel2().sel1().send(42).send(1).recv(); | |
print("\(n)"); | |
c.zero().sel1().close() | |
} | |
typealias NegCli<R : Op, S : Op> = | |
Choose<Eps, | |
Choose<R, | |
Choose<Send<Int64, Recv<Int64, Var<Z>>>, | |
S>>> | |
func neg_client<R : Op, S : Op>(c: AnyChannel<(), Rec<NegCli<R, S>>>) { | |
let (c, n) = c.enter().sel2().sel2().sel1().send(42).recv() | |
print("\(n)"); | |
c.zero().sel1().close() | |
} | |
typealias SqrtCli<R : Op, S : Op, T : Op> = | |
Choose<Eps, | |
Choose<R, | |
Choose<S, | |
Choose<Send<Double, Offer<Recv<Double, Var<Z>>, Var<Z>>>, | |
T>>>> | |
func sqrt_client<R : Op, S : Op, T : Op>(_ c: AnyChannel<(), Rec<SqrtCli<R, S, T>>>) { | |
switch c.enter().sel2().sel2().sel2().sel1().send(42.0).offer() { | |
case let .Left(c): | |
let (c, n) = c.recv(); | |
print("\(n)", n); | |
c.zero().sel1().close() | |
case let .Right(c): | |
print("Couldn't take square root!"); | |
c.zero().sel1().close() | |
} | |
} | |
// `fn_client` sends a function over the channel | |
typealias PrimeCli<R : Op, S : Op, T : Op> = | |
Choose<Eps, | |
Choose<R, | |
Choose<S, | |
Choose<T, | |
Send<(Int64) -> Bool, Send<Int64, Recv<Bool, Var<Z>>>>>>>> | |
func fn_client<R : Op, S : Op, T : Op>(_ c: AnyChannel<(), Rec<PrimeCli<R, S, T>>>) { | |
func even(n: Int64) -> Bool { | |
return n % 2 == 0 | |
} | |
let (c, b) = c.enter() | |
.sel2().sel2().sel2().sel2() | |
.send(even) | |
.send(42) | |
.recv(); | |
print("\(b)"); | |
c.zero().sel1().close(); | |
} | |
// `ask_neg` and `get_neg` use delegation, that is, sending a channel over | |
// another channel. | |
// `ask_neg` selects the negation operation and sends an integer, whereafter it | |
// sends the whole channel to `get_neg`. `get_neg` then receives the negated | |
// integer and prints it. | |
//typealias AskNeg<R : Op, S : Op> = | |
// Choose<Eps, | |
// Choose<R, | |
// Choose<Send<Int64, Recv<Int64, Var<Z>>>, | |
// S>>>; | |
// | |
// | |
//func ask_neg<R : Op, S : Op>(c1: AnyChannel<(), Rec<AskNeg<R, S>>>, | |
// c2: AnyChannel<(), Send<AnyChannel<(AskNeg<R, S>, ()), Recv<Int64, Var<Z>>>, Eps>>) { | |
// let c1 = c1.enter().sel2().sel2().sel1().send(42); | |
// c2.send(c1).close(); | |
//} | |
// | |
//func get_neg<R: Op, S : Op>(c1: AnyChannel<(), Recv<AnyChannel<(AskNeg<R, S>, ()), Recv<Int64, Var<Z>>>, Eps>>) { | |
// let (c1, c2) = c1.recv(); | |
// let (c3, n) = c2.recv(); | |
// print("\(n)"); | |
// c3.zero().sel1().close(); | |
// c1.close(); | |
//} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment