Skip to content

Instantly share code, notes, and snippets.

@CodaFi
Created February 27, 2017 22:45
Show Gist options
  • Save CodaFi/0c2d2c94c708ebb2634bd20fafd5a113 to your computer and use it in GitHub Desktop.
Save CodaFi/0c2d2c94c708ebb2634bd20fafd5a113 to your computer and use it in GitHub Desktop.
[WIP] Session Types as good as I could get em. Ported from the Rust implementation by Philip Munksgaard.
//
// Channel.swift
// Sessions
//
// Created by Robert Widmann on 2/27/17.
// Copyright © 2017 TypeLift. All rights reserved.
//
enum FakeError : Error {
case Error
}
/// Channels are unbounded FIFO streams of values with a read and write
/// terminals comprised of `MVar`s.
public struct Channel<A> {
fileprivate let readEnd : MVar<MVar<ChItem<A>>>
fileprivate let writeEnd : MVar<MVar<ChItem<A>>>
private init(read : MVar<MVar<ChItem<A>>>, write: MVar<MVar<ChItem<A>>>) {
self.readEnd = read
self.writeEnd = write
}
/// Creates and returns a new empty channel.
public init() {
let hole : MVar<ChItem<A>> = MVar()
let readVar = MVar(initial: hole)
let writeVar : MVar<MVar<ChItem<A>>> = MVar(initial: hole)
self.init(read: readVar, write: writeVar)
}
/// Reads a value from the channel.
public func read() -> A {
do {
return try self.readEnd.modify { readEnd in
let item : ChItem<A> = readEnd.read()
return (item.stream(), item.val())
}
} catch _ {
fatalError("Fatal: Could not modify read head.")
}
}
/// Reads a value from the channel.
public func tryRead() throws -> A? {
do {
return try self.readEnd.modify { readEnd in
let item : Optional<ChItem<A>> = readEnd.tryRead()
guard let x = item else {
throw FakeError.Error
}
return (x.stream(), x.val())
}
} catch _ {
return nil
}
}
/// Writes a value to a channel.
public func write(_ x : A) {
self.writeEnd.modify_ { old_hole in
let new_hole : MVar<ChItem<A>> = MVar()
old_hole.put(ChItem(x, new_hole))
return new_hole
}
}
/// Writes a list of values to a channel.
public func writeList(_ xs : [A]) {
xs.forEach(self.write)
}
/// Returns whether the channel is empty.
///
/// This function is just a snapshot of the state of the Chan at that point in
/// time. In heavily concurrent computations, this may change out from under
/// you without warning, or even by the time it can be acted on. It is better
/// to use one of the direct actions above.
public var isEmpty : Bool {
do {
return try self.readEnd.withMVar { r in
let w = r.tryRead()
return w == nil
}
} catch _ {
fatalError("Fatal: Could not determine emptiness; read of underlying MVar failed.")
}
}
/// Duplicates a channel.
///
/// The duplicate channel begins empty, but data written to either channel
/// from then on will be available from both. Because both channels share the
/// same write end, data inserted into one channel may be read by both
/// channels.
public func duplicate() -> Channel<A> {
let hole = self.writeEnd.read()
let newReadVar = MVar(initial: hole)
return Channel(read: newReadVar, write: self.writeEnd)
}
/// Reads the entirety of the channel into an array.
public func contents() -> [A] {
if self.isEmpty {
return []
}
let x = self.read()
let xs = self.contents()
return [x] + xs
}
}
internal struct ChItem<A> {
let val : () -> A
let stream : () -> MVar<ChItem<A>>
init(_ val : @autoclosure @escaping () -> A, _ stream : @autoclosure @escaping () -> MVar<ChItem<A>>) {
self.val = val
self.stream = stream
}
}
//
// MVar.swift
// Sessions
//
// Created by Robert Widmann on 2/27/17.
// Copyright © 2017 TypeLift. All rights reserved.
//
/// `MVar`s (literally "Mutable Variables") are mutable references that are
/// either empty or contain a value of type `A`. In this way, they are a form of
/// synchronization primitive that can be used to make threads wait on a value
/// before proceeding with a computation.
///
/// - Reading an empty `MVar` causes the reader to block.
///
/// - Reading a filled 'MVar' empties it, returns a value, and potentially wakes
/// up a blocked writer.
///
/// - Writing to an empty 'MVar' fills it with a value and potentially wakes up
/// a blocked reader.
///
/// - Writing to a filled 'MVar' causes the writer to block.
public final class MVar<A> {
private var val : A?
private let lock : UnsafeMutablePointer<pthread_mutex_t>
private let takeCond : UnsafeMutablePointer<pthread_cond_t>
private let putCond : UnsafeMutablePointer<pthread_cond_t>
/// Creates a new empty `MVar`.
public init() {
self.val = .none
self.lock = UnsafeMutablePointer.allocate(capacity: MemoryLayout<pthread_mutex_t>.size)
self.takeCond = UnsafeMutablePointer.allocate(capacity: MemoryLayout<pthread_cond_t>.size)
self.putCond = UnsafeMutablePointer.allocate(capacity: MemoryLayout<pthread_cond_t>.size)
pthread_mutex_init(self.lock, nil)
pthread_cond_init(self.takeCond, nil)
pthread_cond_init(self.putCond, nil)
}
/// Creates a new `MVar` containing the supplied value.
public convenience init(initial : A) {
self.init()
self.put(initial)
}
/// Returns the contents of the `MVar`.
///
/// If the `MVar` is empty, this will block until a value is put into the
/// `MVar`. If the `MVar` is full, the value is returned and the `MVar` is
/// emptied.
public func take() -> A {
pthread_mutex_lock(self.lock)
while self.val == nil {
pthread_cond_wait(self.takeCond, self.lock)
}
let value = self.val!
self.val = .none
pthread_cond_signal(self.putCond)
pthread_mutex_unlock(self.lock)
return value
}
/// Atomically reads the contents of the `MVar`.
///
/// If the `MVar` is currently empty, this will block until a value is put
/// into it. If the `MVar` is full, the value is returned, but the `MVar`
/// remains full.
public func read() -> A {
pthread_mutex_lock(self.lock)
while self.val == nil {
pthread_cond_wait(self.takeCond, self.lock)
}
let value = self.val!
pthread_cond_signal(self.putCond)
pthread_mutex_unlock(self.lock)
return value
}
/// Puts a value into the `MVar`.
///
/// If the `MVar` is currently full, the function will block until it becomes
/// empty again.
public func put(_ x : A) {
pthread_mutex_lock(self.lock)
while self.val != nil {
pthread_cond_wait(self.putCond, self.lock)
}
self.val = x
pthread_cond_signal(self.takeCond)
pthread_mutex_unlock(self.lock)
return ()
}
/// Attempts to return the contents of the `MVar` without blocking.
///
/// If the `MVar` is empty, this will immediately return .none. If the `MVar`
/// is full, the value is returned and the `MVar` is emptied.
public func tryTake() -> Optional<A> {
pthread_mutex_lock(self.lock)
if self.val == nil {
return .none
}
let value = self.val!
self.val = .none
pthread_cond_signal(self.putCond)
pthread_mutex_unlock(self.lock)
return value
}
/// Attempts to put a value into the `MVar` without blocking.
///
/// If the `MVar` is empty, this will immediately returns true. If the `MVar`
/// is full, nothing occurs and false is returned.
public func tryPut(_ x : A) -> Bool {
pthread_mutex_lock(self.lock)
if self.val != nil {
return false
}
self.val = x
pthread_cond_signal(self.takeCond)
pthread_mutex_unlock(self.lock)
return true
}
/// Attempts to read the contents of the `MVar` without blocking.
///
/// If the `MVar` is empty, this function returns .none. If the `MVar` is full,
/// this function wraps the value in .some and returns.
public func tryRead() -> Optional<A> {
pthread_mutex_lock(self.lock)
if self.val == nil {
return .none
}
let value = self.val!
pthread_cond_signal(self.putCond)
pthread_mutex_unlock(self.lock)
return value
}
/// Returns whether the `MVar` is empty.
///
/// This function is just a snapshot of the state of the `MVar` at that point in
/// time. In heavily concurrent computations, this may change out from under
/// you without warning, or even by the time it can be acted on. It is better
/// to use one of the direct actions above.
public var isEmpty : Bool {
return (self.val == nil)
}
/// Atomically, take a value from the `MVar`, put a given new value in the
/// `MVar`, then return the `MVar`'s old value.
public func swap(_ x : A) -> A {
let old = self.take()
self.put(x)
return old
}
/// An exception-safe way of using the value in the `MVar` in a computation.
///
/// On exception, the value previously stored in the `MVar` is put back into it
/// and the exception is rethrown.
public func withMVar<B>(_ f : (A) throws -> B) throws -> B {
let a = self.take()
do {
let b = try f(a)
self.put(a)
return b
} catch let e {
self.put(a)
throw e
}
}
/// An exception-safe way to modify the contents of the `MVar`. On
/// successful modification, the new value of the `MVar` is returned.
///
/// On exception, the value previously stored in the `MVar` is put back into it.
public func modify<B>(_ f : (A) throws -> (A, B)) throws -> B {
let a = self.take()
do {
let t = try f(a)
self.put(t.0)
return t.1
} catch let e {
self.put(a)
throw e
}
}
/// An exception-safe way to modify the contents of the `MVar`.
///
/// On exception, the value previously stored in the `MVar` is put back into it.
public func modify_(_ f : (A) throws -> A) {
let a = self.take()
do {
let a1 = try f(a)
self.put(a1)
} catch _ {
self.put(a)
}
}
deinit {
self.lock.deinitialize()
self.takeCond.deinitialize()
self.putCond.deinitialize()
}
}
/// Equality over `MVar`s.
///
/// Two `MVar`s are equal if they both contain no value or if the values they
/// contain are equal. This particular definition of equality is time-dependent
/// and fundamentally unstable. By the time two `MVar`s can be read and
/// compared for equality, one may have already lost its value, or may have had
/// its value swapped out from under you. It is better to `take()` the values
/// yourself if you need a stricter equality.
public func ==<A : Equatable>(lhs : `MVar`<A>, rhs : `MVar`<A>) -> Bool {
if lhs.isEmpty && !rhs.isEmpty {
return true
}
if lhs.isEmpty != rhs.isEmpty {
return false
}
return lhs.read() == rhs.read()
}
#if os(Linux)
import Glibc
#else
import Darwin
#endif
//
// Sessions.swift
// Sessions
//
// Created by Robert Widmann on 2/27/17.
// Copyright © 2017 TypeLift. All rights reserved.
//
import Dispatch
//: This is an implementation of *session types* in Swift.
//:
//: The channels in Swifts standard library are useful for a great many things,
//: but they're restricted to a single type. Session types allows one to use a
//: single channel for transferring values of different types, depending on the
//: context in which it is used. Specifically, a session typed channel always
//: carry a *protocol*, which dictates how communication is to take place.
//:
//: For example, imagine that two threads, `A` and `B` want to communicate with
//: the following pattern:
//:
//: 1. `A` sends an integer to `B`.
//: 2. `B` sends a Boolean to `A` depending on the integer received.
//:
//: With session types, this could be done by sharing a single channel. From
//: `A`'s point of view, it would have the typealias `int ! (Bool ? eps)` where `t ! r`
//: is the protocol "send something of typealias `t` then proceed with
//: protocol `r`", the protocol `t ? r` is "receive something of typealias `t` then proceed
//: with protocol `r`, and `eps` is a special marker indicating the end of a
//: communication session.
//:
//: Our session typealias library allows the user to create channels that adhere to a
//: specified protocol. For example, a channel like the above would have the type
//: `Chan<(), Send<Int64, Recv<Bool, Eps>>>`, and the full program could look like this:
//:
//: ```
//: typealias Server = Recv<Int64, Send<Bool, Eps>>;
//: typealias Client = Send<Int64, Recv<Bool, Eps>>;
//:
//: func srv(_ c: Chan<(), Server>) {
//: let (c, n) = c.recv();
//: if n % 2 == 0 {
//: c.send(true).close()
//: } else {
//: c.send(false).close()
//: }
//: }
//:
//: func cli(_ c: Chan<(), Client>) {
//: let n = 42;
//: let c = c.send(n);
//: let (c, b) = c.recv();
//:
//: if b {
//: print("\(n) is even");
//: } else {
//: print("\(n) is odd");
//: }
//:
//: c.close();
//: }
//:
//: connect(srv, cli);
//: ```
/// A session typed channel. `P` is the protocol and `E` is the environment,
/// containing potential recursion targets
struct AnyChannel<E, P> {
fileprivate let r : MVar<MVar<ChItem<Any>>>
fileprivate let s : MVar<MVar<ChItem<Any>>>
fileprivate init(read : MVar<MVar<ChItem<Any>>>, write: MVar<MVar<ChItem<Any>>>) {
self.r = read
self.s = write
}
public init() {
let hole : MVar<ChItem<Any>> = MVar()
let readVar = MVar(initial: hole)
let writeVar : MVar<MVar<ChItem<Any>>> = MVar(initial: hole)
self.init(read: readVar, write: writeVar)
}
func transmute<F, T>(as : (F.Type, T.Type)) -> AnyChannel<F, T> {
return AnyChannel<F, T>(read: self.r, write: self.s)
}
/// Reads a value from the channel.
public func read<A>() -> A {
do {
let xx : Any = try self.r.modify { readEnd in
let item : ChItem<Any> = readEnd.read()
return (item.stream(), item.val())
}
return xx as! A
} catch _ {
fatalError("Fatal: Could not modify read head.")
}
}
/// Writes a value to a channel.
public func write<A>(_ x : A) {
self.s.modify_ { old_hole in
let new_hole : MVar<ChItem<Any>> = MVar()
old_hole.put(ChItem(x as Any, new_hole))
return new_hole
}
}
}
/// Peano numbers: Zero
struct Z {}
/// Peano numbers: Increment
struct S<N> {}
/// End of communication session (epsilon)
struct Eps : Op {}
/// Receive `A`, then `P`
protocol _Recv : Op { associatedtype _A; associatedtype _P }
struct Recv<A, P : Op> : _Recv { typealias _A = A; typealias _P = P }
/// Send `A`, then `P`
protocol _Send : Op { associatedtype _A; associatedtype _P }
struct Send<A, P : Op> : _Send { typealias _A = A; typealias _P = P }
/// Active choice between `P` and `Q`
protocol _Choose : Op { associatedtype _P; associatedtype _Q }
struct Choose<P : Op, Q : Op> : _Choose { typealias _P = P; typealias _Q = Q }
/// Passive choice (offer) between `P` and `Q`
protocol _Offer : Op { associatedtype _P; associatedtype _Q }
struct Offer<P : Op, Q : Op> : _Offer { typealias _P = P; typealias _Q = Q }
/// Enter a recursive environment
protocol _Rec : Op { associatedtype _P }
struct Rec<P : Op> : _Rec { typealias _P = P }
/// Recurse. N indicates how many layers of the recursive environment we recurse
/// out of.
struct Var<N> : Op {}
protocol _Tuple2 { associatedtype _L; associatedtype _R }
struct Tuple2<L, R> : _Tuple2 { typealias _L = L; typealias _R = R }
protocol Op {}
protocol HasDual : Op {
associatedtype Dual : Op
}
extension Eps : HasDual {
typealias Dual = Eps
}
extension Send where P : HasDual {
typealias Dual = Recv<A, P.Dual>
}
extension Recv where P : HasDual {
typealias Dual = Send<A, P.Dual>
}
extension Choose where P : HasDual, Q : HasDual {
typealias Dual = Offer<P.Dual, Q.Dual>
}
extension Offer where P : HasDual, Q : HasDual {
typealias Dual = Choose<P.Dual, Q.Dual>
}
extension Var where N == Z {
typealias Dual = Var<Z>
}
extension Rec where P : HasDual {
typealias Dual = Rec<P.Dual>
}
indirect enum Branch<L, R> {
case Left(L)
case Right(R)
}
/*
unsafe impl <N> HasDual for Var<S<N>> {
typealias Dual = Var<S<N>>;
}
*/
extension AnyChannel where P == Eps {
/// Close a channel. Should always be used at the end of your program.
func close() {
/*
// This method cleans up the channel without running the panicky destructor
// In essence, it calls the drop glue bypassing the `Drop::drop` method
use std::mem;
// Create some dummy values to place the real things inside
// This is safe because nobody will read these
// mem::swap uses a similar technique (also paired with `forget()`)
let mut sender = unsafe { mem::uninitialized() };
let mut receiver = unsafe { mem::uninitialized() };
// Extract the internal sender/receiver so that we can drop them
// We cannot drop directly since moving out of a type
// that implements `Drop` is disallowed
mem::swap(&mut self.0, &mut sender);
mem::swap(&mut self.1, &mut receiver);
drop(sender);drop(receiver); // drop them
// Ensure Chan destructors don't run so that we don't panic
// This also ensures that the uninitialized values don't get
// read at any point
mem::forget(self);
*/
}
}
extension AnyChannel where P : _Send {
/// Send a value of typealias `A` over the channel. Returns a channel with
/// protocol `P.
func send(_ v : P._A) -> AnyChannel<E, P._P> {
self.write(v)
return self.transmute(as: (E.self, P._P.self))
}
}
extension AnyChannel where P : _Recv {
/// Receives a value of typealias `A` from the channel. Returns a tuple
/// containing the resulting channel and the received value.
func recv() -> (AnyChannel<E, P._P>, P._A) {
let v : P._A = self.read()
return (self.transmute(as: (E.self, P._P.self)), v)
}
}
extension AnyChannel where P : _Choose {
/// Perform an active choice, selecting protocol `P`.
func sel1() -> AnyChannel<E, P._P> {
self.write(true)
return self.transmute(as: (E.self, P._P.self))
}
/// Perform an active choice, selecting protocol `Q`.
func sel2() -> AnyChannel<E, P._Q> {
self.write(false)
return self.transmute(as: (E.self, P._Q.self))
}
}
extension AnyChannel where P : _Offer {
/// Passive choice. This allows the other end of the channel to select one
/// of two options for continuing the protocol: either `P` or `Q`.
func offer() -> Branch<AnyChannel<E, P._P>, AnyChannel<E, P._Q>> {
let b : Bool = self.read()
if b {
return Branch<AnyChannel<E, P._P>, AnyChannel<E, P._Q>>.Left(self.transmute(as: (E.self, P._P.self)))
} else {
return Branch<AnyChannel<E, P._P>, AnyChannel<E, P._Q>>.Right(self.transmute(as: (E.self, P._Q.self)))
}
}
func withOffer<T>(_ f : (Branch<AnyChannel<E, P._P>, AnyChannel<E, P._Q>>) -> T) -> T {
return f(self.offer())
}
}
extension AnyChannel where P : _Rec {
/// Enter a recursive environment, putting the current environment on the
/// top of the environment stack.
func enter() -> AnyChannel<Tuple2<P._P, E>, P._P> {
return self.transmute(as: (Tuple2<P._P, E>.self, P._P.self))
}
}
/*
impl<E, P> Chan<E, Rec<P>> {
#[must_use]
pub fn enter(self) -> Chan<(P, E), P> {
unsafe { transmute(self) }
}
}
*/
/// Returns two session channels
func session_channel<P : HasDual>() -> (AnyChannel<(), P>, AnyChannel<(), P.Dual>) {
let c1 = AnyChannel<(), P>()
let c2 = AnyChannel<(), P.Dual>()
return (c1, c2)
}
/// Connect two functions using a session typed channel.
func connect<P : HasDual>(_ srv : (AnyChannel<(), P>) -> (), _ cli : (AnyChannel<(), P.Dual>) -> ()) {
let (c1, c2) : (AnyChannel<(), P>, AnyChannel<(), P.Dual>) = session_channel()
withoutActuallyEscaping(srv) { clo in
DispatchQueue.global().async(execute: {
clo(c1)
})
}
cli(c2)
}
func _session_channel<P : Op, U : Op>() -> (AnyChannel<(), P>, AnyChannel<(), U>) {
let c1 = AnyChannel<(), P>()
let c2 = AnyChannel<(), U>()
return (AnyChannel<(), P>(read: c1.r, write: c2.s), AnyChannel<(), U>(read: c2.s, write: c1.r))
}
func _connect<P : Op, U : Op>(_ srv : (AnyChannel<(), P>) -> (), _ cli : (AnyChannel<(), U>) -> ()) {
let (c1, c2) : (AnyChannel<(), P>, AnyChannel<(), U>) = _session_channel()
withoutActuallyEscaping(srv) { clo in
DispatchQueue.global().async(execute: {
clo(c1)
})
}
cli(c2)
}
extension AnyChannel where E : _Tuple2, P == Var<Z> {
func zero() -> AnyChannel<E, E._L> {
return self.transmute(as: (E.self, E._L.self))
}
}
/*
impl<E, P> Chan<(P, E), Var<Z>> {
/// Recurse to the environment on the top of the environment stack.
#[must_use]
pub fn zero(self) -> Chan<(P, E), P> {
unsafe { transmute(self) }
}
}
impl<E, P, N> Chan<(P, E), Var<S<N>>> {
/// Pop the top environment from the environment stack.
#[must_use]
pub fn succ(self) -> Chan<E, Var<N>> {
unsafe { transmute(self) }
}
}
/// Convenience function. This is identical to `.sel2()`
impl<Z, A, B> Chan<Z, Choose<A, B>> {
#[must_use]
pub fn skip(self) -> Chan<Z, B> {
self.sel2()
}
}
*/
extension AnyChannel where E == Z, P : _Choose, P._Q : _Choose {
/// Convenience function. This is identical to `.sel2().sel2()`
func skip2() -> AnyChannel<Z, P._Q._Q> {
return self.sel2().sel2()
}
}
extension AnyChannel where E == Z, P : _Choose, P._Q : _Choose, P._Q._Q : _Choose {
/// Convenience function. This is identical to `.sel2().sel2().sel2()`
func skip3() -> AnyChannel<Z, P._Q._Q._Q> {
return self.sel2().sel2().sel2()
}
}
extension AnyChannel where E == Z, P : _Choose, P._Q : _Choose, P._Q._Q : _Choose, P._Q._Q._Q : _Choose {
/// Convenience function. This is identical to `.sel2().sel2().sel2().sel2()`
func skip4() -> AnyChannel<Z, P._Q._Q._Q._Q> {
return self.sel2().sel2().sel2().sel2()
}
}
/*
/// Convenience function. This is identical to `.sel2().sel2().sel2().sel2().sel2()`
impl<Z, A, B, C, D, E, F> Chan<Z, Choose<A, Choose<B, Choose<C, Choose<D,
Choose<E, F>>>>>> {
#[must_use]
pub fn skip5(self) -> Chan<Z, F> {
self.sel2().sel2().sel2().sel2().sel2()
}
}
/// Convenience function.
impl<Z, A, B, C, D, E, F, G> Chan<Z, Choose<A, Choose<B, Choose<C, Choose<D,
Choose<E, Choose<F, G>>>>>>> {
#[must_use]
pub fn skip6(self) -> Chan<Z, G> {
self.sel2().sel2().sel2().sel2().sel2().sel2()
}
}
/// Convenience function.
impl<Z, A, B, C, D, E, F, G, H> Chan<Z, Choose<A, Choose<B, Choose<C, Choose<D,
Choose<E, Choose<F, Choose<G, H>>>>>>>> {
#[must_use]
pub fn skip7(self) -> Chan<Z, H> {
self.sel2().sel2().sel2().sel2().sel2().sel2().sel2()
}
}
/// Homogeneous select. We have a vector of channels, all obeying the same
/// protocol (and in the exact same point of the protocol), wait for one of them
/// to receive. Removes the receiving channel from the vector and returns both
/// the channel and the new vector.
#[cfg(feature = "chan_select")]
#[must_use]
pub fn hselect<E, P, A>(mut chans: Vec<Chan<E, Recv<A, P>>>)
-> (Chan<E, Recv<A, P>>, Vec<Chan<E, Recv<A, P>>>)
{
let i = iselect(&chans);
let c = chans.remove(i);
(c, chans)
}
/// An alternative version of homogeneous select, returning the index of the Chan
/// that is ready to receive.
#[cfg(feature = "chan_select")]
pub fn iselect<E, P, A>(chans: &Vec<Chan<E, Recv<A, P>>>) -> usize {
let mut map = HashMap::new();
let id = {
let sel = Select::new();
let mut handles = Vec::with_capacity(chans.len()); // collect all the handles
for (i, chan) in chans.iter().enumerate() {
let &Chan(_, ref rx, _) = chan;
let handle = sel.handle(rx);
map.insert(handle.id(), i);
handles.push(handle);
}
for handle in handles.iter_mut() { // Add
unsafe { handle.add(); }
}
let id = sel.wait();
for handle in handles.iter_mut() { // Clean up
unsafe { handle.remove(); }
}
id
};
map.remove(&id).unwrap()
}
/// Heterogeneous selection structure for channels
///
/// This builds a structure of channels that we wish to select over. This is
/// structured in a way such that the channels selected over cannot be
/// interacted with (consumed) as long as the borrowing ChanSelect object
/// exists. This is necessary to ensure memory safety.
///
/// The typealias parameter T is a return type, ie we store a value of some typealias T
/// that is returned in case its associated channels is selected on `wait()`
#[cfg(feature = "chan_select")]
struct ChanSelect<'c, T> {
chans: Vec<(&'c Chan<(), ()>, T)>,
}
#[cfg(feature = "chan_select")]
impl<'c, T> ChanSelect<'c, T> {
pub fn new() -> ChanSelect<'c, T> {
ChanSelect {
chans: Vec::new()
}
}
/// Add a channel whose next step is `Recv`
///
/// Once a channel has been added it cannot be interacted with as long as it
/// is borrowed here (by virtue of borrow checking and lifetimes).
pub fn add_recv_ret<E, P, A: marker::Send>(&mut self,
chan: &'c Chan<E, Recv<A, P>>,
ret: T)
{
self.chans.push((unsafe { transmute(chan) }, ret));
}
pub fn add_offer_ret<E, P, Q>(&mut self,
chan: &'c Chan<E, Offer<P, Q>>,
ret: T)
{
self.chans.push((unsafe { transmute(chan) }, ret));
}
/// Find a Receiver (and hence a Chan) that is ready to receive.
///
/// This method consumes the ChanSelect, freeing up the borrowed Receivers
/// to be consumed.
pub fn wait(self) -> T {
let sel = Select::new();
let mut handles = Vec::with_capacity(self.chans.len());
let mut map = HashMap::new();
for (chan, ret) in self.chans.into_iter() {
let &Chan(_, ref rx, _) = chan;
let h = sel.handle(rx);
let id = h.id();
map.insert(id, ret);
handles.push(h);
}
for handle in handles.iter_mut() {
unsafe { handle.add(); }
}
let id = sel.wait();
for handle in handles.iter_mut() {
unsafe { handle.remove(); }
}
map.remove(&id).unwrap()
}
/// How many channels are there in the structure?
pub fn len(&self) -> usize {
self.chans.len()
}
}
/// Default use of ChanSelect works with usize and returns the index
/// of the selected channel. This is also the implementation used by
/// the `chan_select!` macro.
#[cfg(feature = "chan_select")]
impl<'c> ChanSelect<'c, usize> {
pub fn add_recv<E, P, A: marker::Send>(&mut self,
c: &'c Chan<E, Recv<A, P>>)
{
let index = self.chans.len();
self.add_recv_ret(c, index);
}
pub fn add_offer<E, P, Q>(&mut self,
c: &'c Chan<E, Offer<P, Q>>)
{
let index = self.chans.len();
self.add_offer_ret(c, index);
}
}
//
// SessionsTests.swift
// SessionsTests
//
// Created by Robert Widmann on 2/27/17.
// Copyright © 2017 TypeLift. All rights reserved.
//
import XCTest
@testable import Sessions
import Dispatch
class Generics<R : Op, S : Op, T : Op> {
class SessionsTests: XCTestCase {
func testSession() {
{
let (c1, c2) : (AnyChannel<(), Rec<Server>>, AnyChannel<(), Rec<AddCli<R>>>) = _session_channel()
DispatchQueue.global().async(execute: {
server(c1)
})
add_client(c2)
}();
{
let (c1, c2) : (AnyChannel<(), Rec<Server>>, AnyChannel<(), Rec<SqrtCli<R, S, T>>>) = _session_channel()
DispatchQueue.global().async(execute: {
server(c1)
})
sqrt_client(c2)
}();
{
let (c1, c2) : (AnyChannel<(), Rec<Server>>, AnyChannel<(), Rec<PrimeCli<R, S, T>>>) = _session_channel()
DispatchQueue.global().async(execute: {
server(c1)
})
fn_client(c2)
}();
// connect(server, neg_client)
// _connect(server, sqrt_client)
// _connect(server, fn_client)
let (c1, c1_) : (AnyChannel<(), Rec<Server>>, AnyChannel<(), Rec<Server>>) = _session_channel()
DispatchQueue.global().async {
server(c1)
}
// DispatchQueue.global().async {
// ask_neg(c1_, c2)
// }
// DispatchQueue.global().async {
// get_neg(c2_)
// }
}
func testConnection() {
func server(c: AnyChannel<(), Eps>) {
c.close()
}
func client(c: AnyChannel<(), Eps>) {
c.close()
}
connect(server, client);
}
func testManyClients() {
typealias Server = Recv<UInt32, Choose<Send<UInt32, Eps>, Eps>>
// typealias Client = Server.Dual
typealias Client = Send<UInt32, Offer<Recv<UInt32, Eps>, Eps>>
func server_handler(_ c: AnyChannel<(), Server>) {
let (c, n) = c.recv();
let r = n.advanced(by: 42)
let overflow = r < n
if overflow {
return c.sel2().close()
} else {
return c.sel1().send(r).close()
}
}
func server(_ rx: Channel<AnyChannel<(), Server>>) {
var count = 0;
repeat {
do {
guard let c = try rx.tryRead() else {
break
}
DispatchQueue.global().async {
server_handler(c)
}
count += 1;
} catch _ {
break
}
} while true
print("Handled \(count) connections");
}
func client_handler(_ c: AnyChannel<(), Client>) {
let n = arc4random()
switch c.send(n).offer() {
case let .Left(c):
let (c, n2) = c.recv();
c.close()
print("\(n) + 42 = \(n + n2)");
case let .Right(c):
c.close()
print("\(n) + 42 is an overflow :(", n);
}
}
let tx = Channel<AnyChannel<(), Server>>();
for _ in (0...(arc4random() % 32)) {
let tmp = tx.duplicate();
DispatchQueue.global().async {
let (c1, c2) : (AnyChannel<(), Server>, AnyChannel<(), Client>) = _session_channel();
tmp.write(c1)
client_handler(c2)
}
}
server(tx);
}
}
}
typealias Server =
Offer<Eps,
Offer<Recv<Int64, Recv<Int64, Send<Int64, Var<Z>>>>,
Offer<Recv<Int64, Send<Int64, Var<Z>>>,
Offer<Recv<Double, Choose<Send<Double, Var<Z>>, Var<Z>>>,
Recv<(Int64) -> Bool, Recv<Int64, Send<Bool, Var<Z>>>>>>>>
func server(_ c: AnyChannel<(), Rec<Server>>) {
var c = c.enter();
repeat {
c = c.withOffer { c in
switch c {
case let .Left(c): // CLOSE
c.close()
fatalError()
case let .Right(v):
switch v.offer() {
case let .Left(c): // ADD
let (c, n) = c.recv()
let (c2, m) = c.recv()
return c2.send(n + m).zero()
case let .Right(v):
switch v.offer() {
case let .Left(c): // NEGATE
let (c, n) = c.recv()
return c.send(-n).zero()
case let .Right(v):
switch v.offer() {
case let .Left(c): // SQRT
let (c, x) = c.recv()
if x >= 0.0 {
return c.sel1().send(x.squareRoot()).zero()
} else {
return c.sel2().zero()
}
case let .Right(c): // EVALUATE
let (c, f) = c.recv()
let (c2, n) = c.recv()
return c2.send(f(n)).zero()
}
}
}
}
}
} while true
}
// `add_client`, `neg_client` and `sqrt_client` are all pretty straightforward
// uses of session types, but they do showcase subtyping, recursion and how to
// work the types in general.
typealias AddCli<R : Op> =
Choose<Eps,
Choose<Send<Int64, Send<Int64, Recv<Int64, Var<Z>>>>, R>>
func add_client<R : Op>(_ c: AnyChannel<(), Rec<AddCli<R>>>) {
let (c, n) = c.enter().sel2().sel1().send(42).send(1).recv();
print("\(n)");
c.zero().sel1().close()
}
typealias NegCli<R : Op, S : Op> =
Choose<Eps,
Choose<R,
Choose<Send<Int64, Recv<Int64, Var<Z>>>,
S>>>
func neg_client<R : Op, S : Op>(c: AnyChannel<(), Rec<NegCli<R, S>>>) {
let (c, n) = c.enter().sel2().sel2().sel1().send(42).recv()
print("\(n)");
c.zero().sel1().close()
}
typealias SqrtCli<R : Op, S : Op, T : Op> =
Choose<Eps,
Choose<R,
Choose<S,
Choose<Send<Double, Offer<Recv<Double, Var<Z>>, Var<Z>>>,
T>>>>
func sqrt_client<R : Op, S : Op, T : Op>(_ c: AnyChannel<(), Rec<SqrtCli<R, S, T>>>) {
switch c.enter().sel2().sel2().sel2().sel1().send(42.0).offer() {
case let .Left(c):
let (c, n) = c.recv();
print("\(n)", n);
c.zero().sel1().close()
case let .Right(c):
print("Couldn't take square root!");
c.zero().sel1().close()
}
}
// `fn_client` sends a function over the channel
typealias PrimeCli<R : Op, S : Op, T : Op> =
Choose<Eps,
Choose<R,
Choose<S,
Choose<T,
Send<(Int64) -> Bool, Send<Int64, Recv<Bool, Var<Z>>>>>>>>
func fn_client<R : Op, S : Op, T : Op>(_ c: AnyChannel<(), Rec<PrimeCli<R, S, T>>>) {
func even(n: Int64) -> Bool {
return n % 2 == 0
}
let (c, b) = c.enter()
.sel2().sel2().sel2().sel2()
.send(even)
.send(42)
.recv();
print("\(b)");
c.zero().sel1().close();
}
// `ask_neg` and `get_neg` use delegation, that is, sending a channel over
// another channel.
// `ask_neg` selects the negation operation and sends an integer, whereafter it
// sends the whole channel to `get_neg`. `get_neg` then receives the negated
// integer and prints it.
//typealias AskNeg<R : Op, S : Op> =
// Choose<Eps,
// Choose<R,
// Choose<Send<Int64, Recv<Int64, Var<Z>>>,
// S>>>;
//
//
//func ask_neg<R : Op, S : Op>(c1: AnyChannel<(), Rec<AskNeg<R, S>>>,
// c2: AnyChannel<(), Send<AnyChannel<(AskNeg<R, S>, ()), Recv<Int64, Var<Z>>>, Eps>>) {
// let c1 = c1.enter().sel2().sel2().sel1().send(42);
// c2.send(c1).close();
//}
//
//func get_neg<R: Op, S : Op>(c1: AnyChannel<(), Recv<AnyChannel<(AskNeg<R, S>, ()), Recv<Int64, Var<Z>>>, Eps>>) {
// let (c1, c2) = c1.recv();
// let (c3, n) = c2.recv();
// print("\(n)");
// c3.zero().sel1().close();
// c1.close();
//}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment