Skip to content

Instantly share code, notes, and snippets.

@polytypic
Last active March 25, 2025 10:05
Show Gist options
  • Save polytypic/b1c14d9f5e647c9f75dbbcbd7d0f416b to your computer and use it in GitHub Desktop.
Save polytypic/b1c14d9f5e647c9f75dbbcbd7d0f416b to your computer and use it in GitHub Desktop.
Picos — Interoperable effects based concurrency
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
<mxfile host="app.diagrams.net" agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36" version="24.7.4">
<diagram name="Page-1" id="PShktHHFir0hr1CmZQWx">
<mxGraphModel dx="551" dy="344" grid="1" gridSize="10" guides="1" tooltips="1" connect="1" arrows="1" fold="1" page="1" pageScale="1" pageWidth="500" pageHeight="350" math="0" shadow="0">
<root>
<mxCell id="0" />
<mxCell id="1" parent="0" />
<mxCell id="rqY1NgrCLtpw8ZOYy97f-2" value="&lt;i&gt;Picos&lt;/i&gt;" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="215" y="515" width="50" height="30" as="geometry" />
</mxCell>
<mxCell id="rqY1NgrCLtpw8ZOYy97f-4" value="Async IOs" style="rounded=1;whiteSpace=wrap;html=1;sketch=1;curveFitting=1;jiggle=2;" vertex="1" parent="1">
<mxGeometry x="110" y="480" width="70" height="20" as="geometry" />
</mxCell>
<mxCell id="rqY1NgrCLtpw8ZOYy97f-5" value="Synchronization primitives" style="rounded=1;whiteSpace=wrap;html=1;shadow=0;glass=0;sketch=1;curveFitting=1;jiggle=2;" vertex="1" parent="1">
<mxGeometry x="191.32999999999998" y="450" width="97.34" height="40" as="geometry" />
</mxCell>
<mxCell id="rqY1NgrCLtpw8ZOYy97f-6" value="Concurrent DSs" style="rounded=1;whiteSpace=wrap;html=1;sketch=1;curveFitting=1;jiggle=2;" vertex="1" parent="1">
<mxGeometry x="300" y="480" width="100" height="20" as="geometry" />
</mxCell>
<mxCell id="rqY1NgrCLtpw8ZOYy97f-7" value="Parallel algorithms" style="rounded=1;whiteSpace=wrap;html=1;sketch=1;curveFitting=1;jiggle=2;" vertex="1" parent="1">
<mxGeometry x="100" y="510" width="70" height="40" as="geometry" />
</mxCell>
<mxCell id="rqY1NgrCLtpw8ZOYy97f-8" value="Structuring models" style="rounded=1;whiteSpace=wrap;html=1;sketch=1;curveFitting=1;jiggle=2;" vertex="1" parent="1">
<mxGeometry x="310" y="510" width="70" height="40" as="geometry" />
</mxCell>
<mxCell id="rqY1NgrCLtpw8ZOYy97f-9" value="STMs" style="rounded=1;whiteSpace=wrap;html=1;sketch=1;curveFitting=1;jiggle=2;" vertex="1" parent="1">
<mxGeometry x="130" y="560" width="50" height="20" as="geometry" />
</mxCell>
<mxCell id="rqY1NgrCLtpw8ZOYy97f-10" value="Resource mgmt" style="rounded=1;whiteSpace=wrap;html=1;sketch=1;curveFitting=1;jiggle=2;" vertex="1" parent="1">
<mxGeometry x="300" y="560" width="100" height="20" as="geometry" />
</mxCell>
<mxCell id="rqY1NgrCLtpw8ZOYy97f-11" value="Schedulers" style="rounded=1;whiteSpace=wrap;html=1;sketch=1;curveFitting=1;jiggle=2;" vertex="1" parent="1">
<mxGeometry x="200" y="570" width="80" height="20" as="geometry" />
</mxCell>
<mxCell id="rqY1NgrCLtpw8ZOYy97f-14" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=1;exitDx=0;exitDy=0;" edge="1" parent="1" source="rqY1NgrCLtpw8ZOYy97f-8" target="rqY1NgrCLtpw8ZOYy97f-8">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="rqY1NgrCLtpw8ZOYy97f-16" value="" style="endArrow=open;endSize=6;dashed=1;html=1;rounded=0;entryX=0;entryY=0;entryDx=0;entryDy=0;" edge="1" parent="1" target="rqY1NgrCLtpw8ZOYy97f-2">
<mxGeometry width="160" relative="1" as="geometry">
<mxPoint x="180" y="500" as="sourcePoint" />
<mxPoint x="320" y="510" as="targetPoint" />
</mxGeometry>
</mxCell>
<mxCell id="rqY1NgrCLtpw8ZOYy97f-17" value="" style="endArrow=open;endSize=6;dashed=1;html=1;rounded=0;exitX=0.5;exitY=1;exitDx=0;exitDy=0;" edge="1" parent="1" source="rqY1NgrCLtpw8ZOYy97f-5" target="rqY1NgrCLtpw8ZOYy97f-2">
<mxGeometry width="160" relative="1" as="geometry">
<mxPoint x="190" y="510" as="sourcePoint" />
<mxPoint x="220" y="530" as="targetPoint" />
</mxGeometry>
</mxCell>
<mxCell id="rqY1NgrCLtpw8ZOYy97f-18" value="" style="endArrow=open;endSize=6;dashed=1;html=1;rounded=0;exitX=0;exitY=1;exitDx=0;exitDy=0;entryX=1;entryY=0;entryDx=0;entryDy=0;" edge="1" parent="1" source="rqY1NgrCLtpw8ZOYy97f-6" target="rqY1NgrCLtpw8ZOYy97f-2">
<mxGeometry width="160" relative="1" as="geometry">
<mxPoint x="200" y="520" as="sourcePoint" />
<mxPoint x="270" y="510" as="targetPoint" />
</mxGeometry>
</mxCell>
<mxCell id="rqY1NgrCLtpw8ZOYy97f-19" value="" style="endArrow=open;endSize=6;dashed=1;html=1;rounded=0;entryX=1;entryY=0.5;entryDx=0;entryDy=0;exitX=0;exitY=0.5;exitDx=0;exitDy=0;" edge="1" parent="1" source="rqY1NgrCLtpw8ZOYy97f-8" target="rqY1NgrCLtpw8ZOYy97f-2">
<mxGeometry width="160" relative="1" as="geometry">
<mxPoint x="210" y="530" as="sourcePoint" />
<mxPoint x="240" y="550" as="targetPoint" />
</mxGeometry>
</mxCell>
<mxCell id="rqY1NgrCLtpw8ZOYy97f-20" value="" style="endArrow=open;endSize=6;dashed=1;html=1;rounded=0;entryX=1;entryY=1;entryDx=0;entryDy=0;exitX=0;exitY=0;exitDx=0;exitDy=0;" edge="1" parent="1" source="rqY1NgrCLtpw8ZOYy97f-10" target="rqY1NgrCLtpw8ZOYy97f-2">
<mxGeometry width="160" relative="1" as="geometry">
<mxPoint x="220" y="540" as="sourcePoint" />
<mxPoint x="250" y="560" as="targetPoint" />
</mxGeometry>
</mxCell>
<mxCell id="rqY1NgrCLtpw8ZOYy97f-21" value="" style="endArrow=open;endSize=6;dashed=1;html=1;rounded=0;entryX=0;entryY=1;entryDx=0;entryDy=0;exitX=1;exitY=0;exitDx=0;exitDy=0;" edge="1" parent="1" source="rqY1NgrCLtpw8ZOYy97f-9" target="rqY1NgrCLtpw8ZOYy97f-2">
<mxGeometry width="160" relative="1" as="geometry">
<mxPoint x="230" y="550" as="sourcePoint" />
<mxPoint x="260" y="570" as="targetPoint" />
</mxGeometry>
</mxCell>
<mxCell id="rqY1NgrCLtpw8ZOYy97f-22" value="" style="endArrow=open;endSize=6;dashed=1;html=1;rounded=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;exitX=1;exitY=0.5;exitDx=0;exitDy=0;" edge="1" parent="1" source="rqY1NgrCLtpw8ZOYy97f-7" target="rqY1NgrCLtpw8ZOYy97f-2">
<mxGeometry width="160" relative="1" as="geometry">
<mxPoint x="240" y="560" as="sourcePoint" />
<mxPoint x="270" y="580" as="targetPoint" />
</mxGeometry>
</mxCell>
<mxCell id="rqY1NgrCLtpw8ZOYy97f-23" value="" style="endArrow=block;dashed=1;endFill=0;endSize=6;html=1;rounded=0;exitX=0.5;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" edge="1" parent="1" source="rqY1NgrCLtpw8ZOYy97f-11" target="rqY1NgrCLtpw8ZOYy97f-2">
<mxGeometry width="160" relative="1" as="geometry">
<mxPoint x="160" y="510" as="sourcePoint" />
<mxPoint x="320" y="510" as="targetPoint" />
</mxGeometry>
</mxCell>
</root>
</mxGraphModel>
</diagram>
</mxfile>
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
title author documentclass fontsize classoption bibliography abstract
Picos &mdash; Interoperable effects based concurrency
| Vesa Karvonen | Tarides | [email protected]
extarticle
8pt
twocolumn
references.bib
Picos is an ongoing project to define an interface between effects based schedulers and concurrent abstractions. Perhaps an enlightening analogy is to say that Picos is the POSIX of effects based schedulers. Picos is designed to enable an ecosystem of interoperable elements of effects based cooperative concurrent programming models such as schedulers, mechanisms for structuring concurrency, communication and synchronization primitives, and integrations with asynchronous I/O systems.

1 Introduction

OCaml 5 added support both for parallelism12 and for programming with effects3, which allows, among other things, the implementation of direct-style cooperative concurrency. Indeed, multiple effects based concurrent programming libraries have already been developed, including Affect4, Domainslib5, Eio6, Fuseau7, Miou8, Moonpool9, and Riot10 and others. All of these libraries are mutually incompatible with each other. A program written to use Domainslib's work-stealing scheduler cannot just directly use the asynchronous I/O APIs provided by Eio, for example. Instead of neatly avoiding the problem of having functions of two different colors11, we now have functions of eight different colors and counting!

Unfortunately there is no pot of gold at the end of this rainbow. The engineering effort to develop a concurrent programming library is small compared to the effort to build an ecosystem of all sorts of libraries and frameworks for application programming. Consider a library like Cohttp12, which is a popular library "for creating HTTP daemons". It currently has multiple backends, including backends for Async13, Lwt14, and Eio. Should Cohttp provide separate backends for all effects based schedulers? Should every library that needs some concurrency support be separately and explicitly designed to allow multiple backends? Indeed, due to the non-trivial effort involved, many OCaml libraries today are written on top of a single concurrent programming library, namely Lwt or Async, which has led to a significant duplication of effort15.

We propose an interface, named Picos16, to decouple effects based schedulers and other elements of concurrent programming models as depicted in figure \ref{fig:ecosystem}.

Picos ecosystem\label{fig:ecosystem}{width=270px}

This enables an open ecosystem of interoperable and interchangeable elements for both application and middleware developers to mix and match as desired.

In this abstract, we will

  • discuss the technical, economical, and social problems, that the mutual incompatibility of existing effects based schedulers implies.

  • present key ideas behind Picos and its current technical design and implementation.

  • describe some of the sample elements provided with Picos.

  • mention some current and future work on Picos.

2 Status quo

OCaml 5 introduced effects, which can be used to implement cooperative concurrency, and domains, which can be used to run threads in parallel.

Unfortunately cooperative schedulers implemented using OCaml 5's effects are not automatically compatible with each other, because they end up handling different effects. In practice, at the time of writing this, essentially all of the existing effects based schedulers for OCaml 5 handle different effects and are therefore incompatible. For example, an attempt to await a Domainslib promise inside an Eio fiber, will result in an Unhandled (Wait (_, _)) exception17, indicating that the Wait effect of Domainslib was not handled by Eio.

Roughly all of the existing multithreaded schedulers also want to spawn domains by themselves. However, there are both strict and practical limits to how many domains can and should be running. Using only the domain API18, one cannot, for example, practically write a function that would hide the fact that it internally uses domains to compute something in parallel without risking that a use of that function goes over the limits either causing an exception or slowing down the system, because other parts of the application have already spawned enough domains to reach the limits.

In other words, code written for a specific scheduler is naturally incompatible with other schedulers. Furthermore, existing schedulers assume they are the only scheduler and want to take over the management of domains and the whole runtime, so to speak. Even if one could run multiple schedulers in a single application, communication between code running on different schedulers would be limited to mechanisms that work across schedulers.

This sort of fundamental technical incompatibility typically leads to duplication of effort. In practice, many of the existing schedulers have introduced their own low level asynchronous IO integrations and APIs. The existing schedulers also provide their own synchronization and communication abstractions, their own models for managing concurrency, and their own versions of other basic elements of concurrent programming. And that is just what the schedulers provide directly. In practice one also wants higher level libraries for application programming.

Imagine that you would like to introduce a new concurrent programming model and would even have the resources to implement a basic scheduler. Should one use your scheduler in a new project? Well, one should think twice before doing so. The established schedulers have major advantages as the cost of rewriting or adapting all the higher level libraries for a new scheduler will be non-trivial. Do you have the marketing acumen to compete against existing schedulers? Will you be able to maintain the scheduler for years to come?

In practice, resources are limited and people and organizations have to choose which schedulers to support. This easily leads to an effective community split like with the split between the monadic Async and Lwt schedulers. It seems plausible that we will see many more effects based schedulers. Can the OCaml community afford all of them?

3 The architecture of Picos

Picos19 addresses the incompatibility of effects based schedulers at a fundamental level by introducing an interface to decouple schedulers and other concurrent abstractions that need services from a scheduler.

The core abstractions of Picos are

  • Trigger — the ability to await for a signal,
  • Computation — a cancelable computation, and
  • Fiber — an independent thread of execution,

that are implemented partially by the Picos interface in terms of the effects

The partial implementation of the abstractions and the effects define a contract between schedulers and other concurrent abstractions. By handling the Picos effects according to the contract a scheduler becomes Picos compatible, which allows any abstractions written against the Picos interface, i.e. Implemented in Picos, to be used with the scheduler.

3.1 Understanding cancelation

A central idea or goal of Picos is to provide a collection of building blocks for parallelism safe cancelation that allows the implementation of both blocking abstractions as well as the implementation of abstractions for structuring fibers for cancelation or managing the propagation and scope of cancelation.

While cancelation, which is essentially a kind of asynchronous exception or signal, is not necessarily recommended as a general control mechanism, the ability to cancel fibers in case of errors is crucial for the implementation of practical concurrent programming models.

Consider the following characteristic example:

Mutex.protect mutex begin fun () ->
  while true do
    Condition.wait condition mutex
  done
end

Assume that a fiber executing the above code might be canceled, at any point, by another fiber running in parallel. This could be necessary, for example, due to an error that requires the application to be shut down. How could that be done while ensuring both safety and liveness?

  • For safety, cancelation should not leave the program in an invalid state or cause the program to leak memory. In this case, Condition.wait must exit with the mutex locked, even in case of cancelation, and, as Mutex.protect exits, the ownership of the mutex must be transferred to the next fiber, if any, waiting in queue for the mutex. No references to unused objects may be left in the mutex or the condition variable.

  • For liveness, cancelation should ensure that the fiber will eventually continue after cancelation. In this case, cancelation could be triggered during the Mutex.lock operation inside Mutex.protect or the Condition.wait operation, when the fiber might be in a suspended state, and cancelation should then allow the fiber to continue.

The set of abstractions, Trigger, Computation, and Fiber, work together to support cancelation. Briefly, a fiber corresponds to an independent thread of execution and every fiber is associated with a computation at all times. When a fiber creates a trigger in order to await for a signal, it ask the scheduler to suspend the fiber on the trigger. Assuming the fiber has not forbidden the propagation of cancelation, which is required, for example, in the implementation of Condition.wait to lock the mutex upon exit, the scheduler must also attach the trigger to the computation associated with the fiber. If the computation is then canceled before the trigger is otherwise signaled, the trigger will be signaled by the cancelation of the computation, and the fiber will be resumed by the scheduler as canceled.

This cancelable suspension protocol and its partial implementation designed around the first-order Trigger.Await effect creates a clear separation between schedulers and user code running in fibers and is designed to handle the possibility of a trigger being signaled or a computation being canceled at any point during the suspension of a fiber. Schedulers are given maximal freedom to decide which fiber to resume next. As an example, a scheduler could give priority to canceled fibers — going as far as moving a fiber already in the ready queue of the scheduler to the front of the queue at the point of cancelation — based on the assumption that user code promptly cancels external requests and frees critical resources.

3.2 Trigger

A trigger provides the ability to await for a signal and is perhaps the best established and least controversial element of the Picos interface.

Here is an extract from the signature of Trigger module:

type t
val create : unit -> t
val await : t -> Exn_bt.t option
val signal : t -> unit
val on_signal : (* for schedulers *)

The idea is that a fiber may create a trigger, insert it into some shared data structure, and then call await to ask the scheduler to suspend the fiber until something signals the trigger. The Exn_bt.t type represents an exception with a backtrace. When await returns an exception with a backtrace it means that the fiber has been canceled.

As an example, let's consider the implementation of an Ivar or incremental or single-assignment variable:

type 'a t
val create : unit -> 'a t
val try_fill : 'a t -> 'a -> bool
val read : 'a t -> 'a

An Ivar is created as empty and can be filled with a value once. An attempt to read an Ivar blocks until the Ivar is filled.

Using Trigger and Atomic, we can represent an Ivar as follows:

type 'a state =
  | Filled of 'a
  | Empty of Trigger.t list

type 'a t = 'a state Atomic.t

The try_fill operation is then fairly straightforward to implement:

let rec try_fill t value =
  match Atomic.get t with
  | Filled _ -> false
  | Empty triggers as before ->
    let after = Filled value in
    if Atomic.compare_and_set t before after then
      begin
        List.iter Trigger.signal triggers; (* ! *)
        true
      end
    else
      try_fill t value

The interesting detail above is that after successfully filling an Ivar, the triggers are signaled. This allows the await inside the read operation to return:

let rec read t =
  match Atomic.get t with
  | Filled value -> value
  | Empty triggers as before ->
    let trigger = Trigger.create () in
    let after = Empty (trigger :: triggers) in
    if Atomic.compare_and_set t before after then
      match Trigger.await trigger with
      | None -> read t
      | Some exn_bt ->
        cleanup t trigger; (* ! *)
        Exn_bt.raise exn_bt
    else
      read t

An important detail above is that when await returns an exception with a backtrace, meaning that the fiber has been canceled, the cleanup operation (which is omitted) is called to remove the trigger from the Ivar to avoid potentially accumulating unbounded numbers of triggers in an empty Ivar.

As simple as it is, the design of Trigger is far from arbitrary:

  • First of all, Trigger has single-assignment semantics. After being signaled, a trigger takes a constant amount of space and does not point to any other heap object. This makes it easier to reason about the behavior and can also help to avoid leaks or optimize data structures containing triggers, because it is safe to hold bounded amounts of signaled triggers.

  • The Trigger abstraction is essentially first-order, which provides a clear separation between a scheduler and programs, or fibers, running on a scheduler. The await operation performs the Await effect, which passes the trigger to the scheduler. The scheduler then attaches its own callback to the trigger using on_signal. This way a scheduler does not call arbitrary user specified code in the Await effect handler.

  • Separating the creation of a trigger from the await operation allows one to easily insert a trigger into any number of places and allows the trigger to be potentially concurrently signaled before the Await effect is performed in which case the effect can be skipped entirely.

  • No value is propagated with a trigger. This makes triggers simpler and makes it less likely for one to e.g. accidentally drop such a value. In many cases, like with the Ivar, there is already a data structure through which values can be propagated.

  • The signal operation gives no indication of whether a fiber will then be resumed as canceled or not. This gives maximal flexibility for the scheduler and also makes it clear that cancelation must be handled based on the return value of await.

3.3 Computation

A Computation basically holds the status, i.e. running, returned, or canceled, of some sort of computation and allows anyone with access to the computation to attach triggers to it to be signaled in case the computation stops running.

Here is an extract from the signature of the Computation module:

type 'a t

val create : unit -> 'a t

val try_attach : 'a t -> Trigger.t -> bool
val detach : 'a t -> Trigger.t -> unit

val try_return : 'a t -> 'a -> bool
val try_cancel : 'a t -> Exn_bt.t -> bool

val check : 'a t -> unit
val await : 'a t -> 'a

A Computation directly provides a superset of the functionality of the Ivar we sketched in the previous section:

type 'a t = 'a Computation.t
let create : unit -> 'a t = Computation.create
let try_fill : 'a t -> 'a -> bool =
  Computation.try_return
let read : 'a t -> 'a = Computation.await

However, what really makes the Computation useful is the ability to momentarily attach triggers to it. A Computation essentially implements a specialized lock-free bag of triggers, which allows one to implement dynamic completion propagation networks.

The Computation abstraction is also designed with both simplicity and flexibility in mind:

  • Similarly to Trigger, Computation has single-assignment semantics, which makes it easier to reason about.

  • Unlike a typical cancelation context of a structured concurrency model, Computation is unopinionated in that it does not impose a specific hierarchical structure.

  • Anyone may ask to be notified when a Computation is completed by attaching triggers to it and anyone may complete a Computation. This makes Computation an omnidirectional communication primitive.

Interestingly, and unintentionally, it turns out that Computation is almost expressive enough to implement the event abstraction of Concurrent ML20. The same features that make Computation suitable for implementing more or less arbitrary dynamic completion propagation networks make it suitable for implementing Concurrent ML style abstractions. The only thing missing is the ability to complete two computations atomically. At the time of writing this, there is actually a draft PR that implements an obstruction-free transactional API to complete any number of computations atomically and aims to provide a full implementation of Concurrent ML style events and channels.

3.4 Fiber

A fiber corresponds to an independent thread of execution. Technically an effects based scheduler creates a fiber, effectively giving it an identity, as it runs some function under its handler. The Fiber abstraction provides a way to share a proxy identity, and a bit of state, between a scheduler and other concurrent abstractions.

Here is an extract from the signature of the Fiber module:

type t

val current : unit -> t

val create : forbid:bool -> 'a Computation.t -> t
val spawn : t -> (t -> unit) -> unit

val get_computation : t -> Computation.packed
val set_computation : t -> Computation.packed -> unit

val has_forbidden : t -> bool
val exchange : t -> forbid:bool -> bool

module FLS : sig (* ... *) end

Fibers are where all of the low level bits and pieces of Picos come together, which makes it difficult to give both meaningful and concise examples, but let's implement a slightly simplistic structured concurrency mechanism:

type t (* represents a scope *)
val run : (t -> unit) -> unit
val fork : t -> (unit -> unit) -> unit

The idea here is that run creates a "scope" and waits until all of the fibers forked into the scope have finished. In case any fiber raises an unhandled exception, or the main fiber that created the scope is canceled, all of the fibers are canceled and an exception is raised. To keep things slightly simpler, only the first exception is kept.

A scope can be represented by a simple record type:

type t = {
  count : int Atomic.t;
  inner : unit Computation.t;
  ended : Trigger.t;
}

The idea is that after a fiber is finished, we decrement the count and if it becomes zero, we finish the computation and signal the main fiber that the scope has ended:

let decr t =
  let n = Atomic.fetch_and_add t.count (-1) in
  if n = 1 then begin
    Computation.finish t.inner;
    Trigger.signal t.ended
  end

When forking a fiber, we increment the count unless it already was zero, in which case we raise an error:

let rec incr t =
  let n = Atomic.get t.count in
  if n = 0 then invalid_arg "ended";
  if not (Atomic.compare_and_set t.count n (n + 1))
  then incr t

The fork operation is now relatively straightforward to implement:

let fork t action =
  incr t;
  try
    let main _ =
      match action () with
      | () -> decr t
      | exception exn ->
          let exn_bt = Exn_bt.get exn in
          Computation.cancel t.inner exn_bt;
          decr t
    in
    let fiber =
      Fiber.create ~forbid:false t.inner
    in
    Fiber.spawn fiber main
  with canceled_exn ->
    decr t;
    raise canceled_exn

The above fork first increments the count and then tries to spawn a fiber. The Picos interface specifies that when Fiber.spawn returns normally, the action, main, must be called by the scheduler. This allows us to ensure that the increment is always matched with a decrement.

Setting up a scope is the most complex operation:

let run body =
  let count = Atomic.make 1 in
  let inner = Computation.create () in
  let ended = Trigger.create () in
  let t = { count; inner; ended } in
  let fiber = Fiber.current () in
  let (Packed outer) =
    Fiber.get_computation fiber
  in
  let canceler =
    Computation.attach_canceler
      ~from:outer
      ~into:t.inner
  in
  match
    Fiber.set_computation fiber (Packed t.inner);
    body t
  with
  | () -> join t outer canceler fiber
  | exception exn ->
      let exn_bt = Exn_bt.get exn in
      Computation.cancel t.inner exn_bt;
      join t outer canceler fiber;
      Exn_bt.raise exn_bt

The Computation.attach_canceler operation attaches a special trigger to propagate cancelation from one computation into another. After the body exits, join

let join t outer canceler fiber =
  decr t;
  Fiber.set_computation fiber (Packed outer);
  let forbid = Fiber.exchange fiber ~forbid:true in
  Trigger.await t.ended |> ignore;
  Fiber.set fiber ~forbid;
  Computation.detach outer canceler;
  Computation.check t.inner;
  Fiber.check fiber

is called to wait for the scoped fibers and restore the state of the main fiber. An important detail is that propagation of cancelation is forbidden by setting the forbid flag to true before the call of Trigger.await. This is necessary to ensure that join does not exit, due to the fiber being canceled, before all of the child fibers have actually finished. Finally, join checks the inner computation and the fiber, which means that an exception will be raised in case either was canceled.

The design of Fiber includes several key features:

  • The low level design allows one to both avoid unnecessary overheads, such as allocating a Computation.t for every fiber, when implementing simple abstractions and also to implement more complex behaviors that might prove difficult given e.g. a higher level design with a built-in notion of hierarchy.

  • As Fiber.t stores the forbid flag and the Computation.t associated with the fiber one need not pass those as arguments through the program. This allows various concurrent abstractions to be given traditional interfaces, which would otherwise need to be complicated.

  • Effects are relatively expensive. The cost of performing effects can be amortized by obtaining the Fiber.t once and then manipulating it multiple times.

  • A Fiber.t also provides an identity for the fiber. It has so far proven to be sufficient for most purposes. Fiber local storage, which we do not cover here, can be used to implement, for example, a unique integer id for fibers.

3.5 Assumptions

Now, consider the Ivar abstraction presented earlier as an example of the use of the Trigger abstraction. That Ivar implementation, as well as the Computation based implementation, works exactly as desired inside the scope abstraction presented in the previous section. In particular, a blocked Ivar.read can be canceled, either when another fiber in a scope raises an unhandled exception or when the main fiber of the scope is canceled, which allows the fiber to continue by raising an exception after cleaning up. In fact, Picos comes with a number of libraries that all would work quite nicely with the examples presented here.

For example, a library provides an operation to run a block with a timeout on the current fiber. One could use it with Ivar.read to implement a read operation with a timeout:

let read_in ~seconds ivar =
  Control.terminate_after ~seconds @@ fun () ->
  Ivar.read ivar

This interoperability is not accidental. For example, the scope abstraction basically assumes that one does not use Fiber.set_computation, in an arbitrary unscoped manner inside the scoped fibers. An idea with the Picos interface actually is that it is not supposed to be used by applications at all and most higher level libraries should be built on top of libraries that do not directly expose elements of the Picos interface.

Perhaps more interestingly, there are obviously limits to what can be achieved in an "interoperable" manner. Imagine an operation like

val at_exit : (unit -> unit) -> unit

that would allow one to run an action just before a fiber exits. One could, of course, use a custom spawn function that would support such cleanup, but then at_exit could only be used on fibers spawned through that particular spawn function. This and related use cases are important enough that the Picos interface is being extended to support finalization of resources stored in the fiber local storage, FLS, provided by Picos.

3.6 The effects

As mentioned previously, the Picos interface is implemented partially in terms of five effects:

type _ Effect.t +=
  | Await : Trigger.t -> Exn_bt.t option Effect.t
  | Cancel_after : {
      seconds : float;
      exn_bt : Exn_bt.t;
      computation : 'a Computation.t;
    }
      -> unit Effect.t
  | Current : t Effect.t
  | Yield : unit Effect.t
  | Spawn : {
      fiber : Fiber.t;
      main : (Fiber.t -> unit);
    }
      -> unit Effect.t

A scheduler must handle those effects as specified in the Picos documentation.

The Picos interface does not, in particular, dictate which ready fibers a scheduler must run next and on which domains. Picos also does not require that a fiber should stay on the domain on which it was spawned. Abstractions implemented against the Picos interface should not assume any particular scheduling.

Picos actually comes with a randomized multithreaded scheduler, that, after handling any of the effects, picks the next ready fiber randomly. It has proven to be useful for testing that abstractions implemented in Picos do not make invalid scheduling assumptions.

When a concurrent abstraction requires a particular scheduling, it should primarily be achieved through the use of synchronization abstractions like when programming with traditional threads. Application programs may, of course, pick specific schedulers.

4 Status and results

We have an experimental design and implementation of the core Picos interface as illustrated in the previous section. We have also created several Picos compatible sample schedulers. A scheduler, in this context, just multiplexes fibers to run on one or more system level threads. We have also created some sample higher-level scheduler agnostic libraries Implemented in Picos. These libraries include a library for resource management, a library for structured concurrency, a library of synchronization primitives, and an asynchronous I/O library. The synchronization library and the I/O library intentionally mimic libraries that come with the OCaml distribution. All of the libraries work with all of the schedulers and all of these elements are interoperable and entirely opt-in.

What is worth explicitly noting is that all of these schedulers and libraries are small, independent, and highly modular pieces of code. They all crucially depend on and are decoupled from each other via the core Picos interface library. A basic single threaded scheduler implementation requires only about 100 lines of code (LOC). A more complex parallel scheduler might require a couple of hundred LOC. The scheduler agnostic libraries are similarly small16.

Here is an example of a concurrent echo server using the scheduler agnostic libraries provided as samples:

let run_server server_fd =
  Unix.listen server_fd 8;
  Bundle.join_after begin fun bundle ->
    while true do
      let^ client_fd = finally Unix.close @@ fun () ->
        Unix.accept ~cloexec:true server_fd |> fst
      in
      Bundle.fork bundle begin fun () ->
        let@ client_fd = move client_fd in
        Unix.set_nonblock client_fd;
        let bs = Bytes.create 100 in
        let n =
          Unix.read client_fd bs 0 (Bytes.length bs)
        in
        Unix.write client_fd bs 0 n |> ignore
      end
    done
  end

The Unix module21 is provided by the I/O library. The operations on file descriptors on that module, such as accept, read, and write, use the Picos interface to suspend fibers allowing other fibers to run while waiting for I/O. The Bundle module comes from the structured concurrency library. A call of join_after returns only after all the fibers forked into the bundle have terminated. If the main fiber of the bundle is canceled, or any fiber within the bundle raises an unhandled exception, all the fibers within the bundle will be canceled and an exception will be raised on the main fiber of the bundle. The let^, finally, let@, and move operations come from the resource management library and allow dealing with resources in a leak-free manner. The responsibility to close the client_fd socket is moved from the main server fiber to a fiber forked to handle that client.

We should emphasize that the above is just an example. The Picos interface should be both expressive and efficient enough to support practical implementations of many different kinds of concurrent programming models. Also, as described previously, the Picos interface does not, for example, internally implement structured concurrency. However, the abstractions provided by Picos are designed to allow structured and unstructured concurrency to be Implemented in Picos as libraries that will then work with any Picos compatible scheduler and with other concurrent abstractions.

Finally, an interesting demonstration that Picos really fundamentally is an interface is a prototype Picos compatible direct style interface to Lwt. The implementation uses shallow effect handlers and defers all scheduling decisions to Lwt. Running a program with the scheduler returns a Lwt promise.

5 Future work

As mentioned previously, Picos is still an ongoing project and the design is considered experimental. We hope that Picos soon matures to serve the needs of both the commercial users of OCaml and the community at large.

Previous sections already mentioned a couple of updates currently in development, such as the support for finalizing resources stored in FLS and the development of Concurrent ML style abstractions. We also have ongoing work to formalize aspects of the Picos interface.

One potential change we will be investigating is whether the Computation abstraction should be simplified to only support cancelation.

The implementation of some operations, such as Fiber.current to retrieve the current fiber proxy identity, do not strictly need to be effects. Performing an effect is relatively expensive and we will likely design a mechanism to store a reference to the current fiber in some sort of local storage, which could significantly improve the performance of certain abstractions, such as checked mutexes, that need to access the current fiber.

We also plan to develop a minimalist library for spawning threads over domains, much like Moonpool9, in a cooperative manner for schedulers and other libraries.

We also plan to make Domainslib5 Picos compatible, which will require developing a more efficient non-effects based interface for spawning fibers, and investigate making Eio6 Picos compatible.

We also plan to design and implement asynchronous IO libraries for Picos using various system call interface for asynchronous IO such as io_uring.

Structure of the talk

We will first talk about the technical, economical, and social problems, that the mutual incompatibility of existing effects based schedulers implies.

We will then discuss the design and technical implementation of the Picos interface taking note of several key insights behind the design and how they relate to previous work on schedulers and attempts at interoperability, such as a previously presented higher-order Suspend effect proposal22.

We will also look at some of the samples provided with Picos to understand how one can create Picos compatible schedulers and concurrent abstractions Implemented in Picos.

We will also examine the performance implications of the Picos interface and how OCaml could be extended to allow better performance for cooperative concurrency.

We will conclude with a vision of how an interface like Picos could fundamentally change the concept of "schedulers" in OCaml, foster future innovation on concurrency, and allow the community to cooperatively build an ecosystem of interoperable concurrent libraries.

Acknowledgments

The design of Picos has been directly influenced or supported, or made possible indirectly by discussions with or through work of several people. I would like to thank23 Arthur Wendling, Carine Morel, Christiano Haesbaert, David Allsopp, Deepali Ande, Hannes Mehnert, Jan Midtgaard, Jon Ludlam, KC Sivaramakrishnan, Leandro Ostera, Leo White, Mark Elvers, Romain Calascibetta, Sadiq Jaffer, Simon Cruanes, Stephen Dolan, Sudha Parimala, Thomas Gazagnaire, Thomas Leonard, Vincent Balat, Yaron Minsky and apologize to those I forgot to mention here.

References

Footnotes

  1. [@retrofitting-parallelism]

  2. [@bounding-data-races]

  3. [@retrofitting-effects]

  4. [@affect]

  5. [@domainslib] 2

  6. [@eio] 2

  7. [@fuseau]

  8. [@miou]

  9. [@moonpool] 2

  10. [@riot]

  11. [@what-color]

  12. [@cohttp]

  13. [@async]

  14. [@lwt]

  15. [@abandoning-async]

  16. "Picos" was initially short for "pico scheduler framework" based on the vision that it would allow composing concurrent software from tiny modules. 2

  17. [@unhandled-wait]

  18. [@domain-api]

  19. [@picos]

  20. [@concurrent-programming-in-ml]

  21. Which provides a signature compatible with the Unix module that comes with the OCaml distribution.

  22. [@composing-schedulers]

  23. Ask me why!

Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
@misc{abandoning-async,
title = {Abandoning Async},
howpublished = {\url{http://rgrinberg.com/posts/abandoning-async/}},
year = 2014
}
@misc{affect,
title = {Affect — Composable concurrency primitives with OCaml effects handlers},
howpublished = {\url{https://github.com/dbuenzli/affect}},
year = 2022
}
@misc{async,
title = {Async — Jane Street Capital's asynchronous execution library},
howpublished = {\url{https://github.com/janestreet/async}},
year = 2013
}
@misc{cohttp,
title = {Cohttp — an OCaml library for HTTP clients and servers},
howpublished = {\url{https://github.com/mirage/ocaml-cohttp}},
year = 2009
}
@misc{what-color,
title = {What Color is Your Function?},
howpublished = {\url{https://journal.stuffwithstuff.com/2015/02/01/what-color-is-your-function/}},
year = 2015
}
@misc{domainslib,
title = {Domainslib — Nested-parallel programming library},
howpublished = {\url{https://github.com/ocaml-multicore/domainslib}},
year = 2019
}
@misc{eio,
title = {Eio — Effects-Based Parallel IO for OCaml},
howpublished = {\url{https://github.com/ocaml-multicore/eio}},
year = 2021
}
@misc{fuseau,
title = {Fuseau — Lightweight fiber library for OCaml 5},
howpublished = {\url{https://github.com/c-cube/fuseau}},
year = 2023
}
@misc{lwt,
title = {Lwt — OCaml promises and concurrent IO},
howpublished = {\url{https://github.com/ocsigen/lwt}},
year = 2008
}
@misc{miou,
title = {Miou — A simple scheduler for OCaml 5},
howpublished = {\url{https://github.com/robur-coop/miou}},
year = 2023
}
@misc{moonpool,
title = {Moonpool — Commodity thread pools and concurrency primitives for OCaml 5},
howpublished = {\url{https://github.com/c-cube/moonpool}},
year = 2023
}
@misc{picos,
title = {Picos — Interoperable effects based concurrency},
howpublished = {\url{https://github.com/ocaml-multicore/picos/}},
year = 2024
}
@misc{riot,
title = {Riot — An actor-model multi-core scheduler for OCaml 5},
howpublished = {\url{https://github.com/riot-ml/riot}},
year = 2023
}
@unpublished{composing-schedulers,
title = {Composing Schedulers Using Effect Handlers},
author = {Deepali, Ande and Sivaramakrishnan, KC},
year = {2022},
note = {OCaml Users and Developers Workshop 2022},
URL = {https://kcsrk.info/papers/compose_ocaml22.pdf},
}
@misc{unhandled-wait,
title = {Interaction between eio and domainslib, unhandled exceptions?},
year = 2023,
howpublished = {\url{https://discuss.ocaml.org/t/interaction-between-eio-and-domainslib-unhandled-exceptions/11971}}
}
@misc{domain-api,
title = {Module Domain},
howpublished = {\url{https://ocaml.org/manual/5.2/api/Domain.html}},
year = 2023,
}
@book{concurrent-programming-in-ml,
author = {Reppy, John H.},
title = {Concurrent programming in ML},
publisher = {Cambridge University Press},
year = {1999},
address = {Cambridge [England]}
}
@inproceedings{retrofitting-effects,
author = {Sivaramakrishnan, KC and Dolan, Stephen and White, Leo and Kelly, Tom and Jaffer, Sadiq and Madhavapeddy, Anil},
title = {Retrofitting effect handlers onto OCaml},
year = {2021},
isbn = {9781450383912},
publisher = {Association for Computing Machinery},
address = {New York, NY, USA},
url = {https://doi.org/10.1145/3453483.3454039},
doi = {10.1145/3453483.3454039},
abstract = {Effect handlers have been gathering momentum as a mechanism for modular programming with user-defined effects. Effect handlers allow for non-local control flow mechanisms such as generators, async/await, lightweight threads and coroutines to be composably expressed. We present a design and evaluate a full-fledged efficient implementation of effect handlers for OCaml, an industrial-strength multi-paradigm programming language. Our implementation strives to maintain the backwards compatibility and performance profile of existing OCaml code. Retrofitting effect handlers onto OCaml is challenging since OCaml does not currently have any non-local control flow mechanisms other than exceptions. Our implementation of effect handlers for OCaml: (i)&nbsp;imposes a mean 1\% overhead on a comprehensive macro benchmark suite that does not use effect handlers; (ii)&nbsp;remains compatible with program analysis tools that inspect the stack; and (iii)&nbsp;is efficient for new code that makes use of effect handlers.},
booktitle = {Proceedings of the 42nd ACM SIGPLAN International Conference on Programming Language Design and Implementation},
pages = {206–221},
numpages = {16},
keywords = {Fibers, Effect handlers, Continuations, Backwards compatibility, Backtraces},
location = {Virtual, Canada},
series = {PLDI 2021}
}
@article{retrofitting-parallelism,
author = {Sivaramakrishnan, KC and Dolan, Stephen and White, Leo and Jaffer, Sadiq and Kelly, Tom and Sahoo, Anmol and Parimala, Sudha and Dhiman, Atul and Madhavapeddy, Anil},
title = {Retrofitting parallelism onto OCaml},
year = {2020},
issue_date = {August 2020},
publisher = {Association for Computing Machinery},
address = {New York, NY, USA},
volume = {4},
number = {ICFP},
url = {https://doi.org/10.1145/3408995},
doi = {10.1145/3408995},
abstract = {OCaml is an industrial-strength, multi-paradigm programming language, widely used in industry and academia. OCaml is also one of the few modern managed system programming languages to lack support for shared memory parallel programming. This paper describes the design, a full-fledged implementation and evaluation of a mostly-concurrent garbage collector (GC) for the multicore extension of the OCaml programming language. Given that we propose to add parallelism to a widely used programming language with millions of lines of existing code, we face the challenge of maintaining backwards compatibility--not just in terms of the language features but also the performance of single-threaded code running with the new GC. To this end, the paper presents a series of novel techniques and demonstrates that the new GC strikes a balance between performance and feature backwards compatibility for sequential programs and scales admirably on modern multicore processors.},
journal = {Proc. ACM Program. Lang.},
month = {aug},
articleno = {113},
numpages = {30},
keywords = {backwards compatibility, concurrent garbage collection}
}
@inproceedings{bounding-data-races,
author = {Dolan, Stephen and Sivaramakrishnan, KC and Madhavapeddy, Anil},
title = {Bounding data races in space and time},
year = {2018},
isbn = {9781450356985},
publisher = {Association for Computing Machinery},
address = {New York, NY, USA},
url = {https://doi.org/10.1145/3192366.3192421},
doi = {10.1145/3192366.3192421},
abstract = {We propose a new semantics for shared-memory parallel programs that gives strong guarantees even in the presence of data races. Our local data race freedom property guarantees that all data-race-free portions of programs exhibit sequential semantics. We provide a straightforward operational semantics and an equivalent axiomatic model, and evaluate an implementation for the OCaml programming language. Our evaluation demonstrates that it is possible to balance a comprehensible memory model with a reasonable (no overhead on x86, ~0.6\% on ARM) sequential performance trade-off in a mainstream programming language.},
booktitle = {Proceedings of the 39th ACM SIGPLAN Conference on Programming Language Design and Implementation},
pages = {242–255},
numpages = {14},
keywords = {weak memory models, operational semantics},
location = {Philadelphia, PA, USA},
series = {PLDI 2018}
}
title theme revealOptions
Picos &mdash; Interoperable effects based concurrency for OCaml
black
transition width height
fade
1280
800

Picos — Interoperable effects based concurrency for OCaml

Vesa Karvonen

Tarides
(Multicore Applications Team)


The problem

What color is your function?*

  • a → b
  • a → b m

You can't just call an async function from a sync function and obtain the result synchronously.

OCaml 5 🍻 — Let's introduce effects!


What are Effects? (1/2)

"Resumable exceptions" 👋

type exn += Hello

match
  raise Hello
with
| result -> String.capitalize_ascii result
| exception Hello -> "hello"
"hello"

What are effects? (2/2)

With effects

type _ eff += Hello : string eff

match
  Effect.perform Hello
with
| result -> String.capitalize_ascii result
| effect Hello, k -> Effect.Deep.continue k "hello"
"Hello"

A toy scheduler with effects? (1/4)

The interface:

val fork : (unit -> unit) -> unit

val yield : unit -> unit

(usually you also want an await — we'll get to that later)


A toy scheduler with effects? (2/4)

let main () =
  fork begin fun () ->
    for i=1 to 4 do
      Printf.printf "%d%!" (i * 2 - 1); (* odd *)
      yield ();
    done
  end;

  for i=1 to 4 do
    Printf.printf "%d%!" (i * 2); (* even *)
    yield ();
  done
12345678 or 21436587

(assuming FIFO scheduling)


A toy scheduler with effects? (3/4)

type _ eff += Fork : (unit -> unit) -> unit eff

let fork fiber = Effect.perform (Fork fiber)

type _ eff += Yield : unit eff

let yield () = Effect.perform Yield

A toy scheduler with effects? (4/4)

let run main =
  let ready = Queue.create () in
  let enqueue k = Queue.push k ready in
  let dequeue () =
    match Queue.take_opt ready with
    | Some k -> Effect.Deep.continue k ()
    | None -> () in
  let rec run fiber =
    match fiber () with
    | effect Yield, k -> enqueue k; dequeue ()
    | effect (Fork fiber), k -> enqueue k; run fiber
    | () -> dequeue () in
  run main

Effects recap

Effects allow you to capture the continuation and implement e.g. a cooperative concurrency model relatively easily and avoid the problem with functions of two different colors.


How did it go in OCaml?

Affect, Domainslib, Eio, Fuseau, Miou, Moonpool, Riot, ...

All mutually incompatible!

All using different effects.

So, you can't just call Eio's IO APIs from a Domainslib fiber, for example.

Because a Domainslib effect handler does not handle Eio's effects.

Functions of 8 different colors and counting! 🌈


Issues

  • Duplication of functionality
  • Prohibitive cost to innovate
  • Competition over resources
  • Community split

Recap

Code written against a specific scheduler...

...cannot run on other schedulers.

Solutions?

A level of indirection, perhaps? 🤔


Picos

Interface between schedulers and concurrent abstractions


  Libraries & Applications

------ Sync and Comm \
----------- Async IO |
---------- Conn pool |
------ Concurrent DS +-->  Picos  <-- Schedulers
- Structuring models |
----- Parallel algos |
---------------- STM /

  "Implemented in Picos"         "Picos compatible"

In other words

Picos aims to be a just rich enough interface that one can write most concurrent software without depending on a specific scheduler.

Duplication? Cost? Competition? Split?


The architecture of Picos


Understanding cancelation

Mutex.protect mutex begin fun () -> (* lock *)
  while true do
    Condition.wait condition mutex (* unlock..lock *)
  done
end (* unlock *)

Liveness?

Safety?


Trio of concepts working together

Design largely driven by need to support cancelable await with e.g. mutexes and condition variables, and structured concurrency

Triggerto await

Computationto publish, observe, and control

Fiberfor autonomy

These are not for application programming.


Trigger

type t

val create : unit -> t (* initial *)

val await : t -> Exn_bt.t option (* awaiting *)

val signal : t -> unit (* signaled *)

Ivar using Trigger (1/3)
type 'a t
val create : unit -> 'a t
val try_fill : 'a t -> 'a -> bool
val read : 'a t -> 'a
type 'a state =
  | Filled of 'a
  | Empty of Trigger.t list
type 'a t = 'a state Atomic.t

Ivar using Trigger (2/3)
let rec try_fill t value =
  match Atomic.get t with
  | Filled _ -> false
  | Empty triggers as before ->
    let after = Filled value in
    if Atomic.compare_and_set t before after then begin
      List.iter Trigger.signal triggers;
      true
    end
    else
      try_fill t value

Ivar using Trigger (3/3)
let rec read t =
  match Atomic.get t with
  | Filled value -> value
  | Empty triggers as before ->
    let trigger = Trigger.create () in
    let after = Empty (trigger :: triggers) in
    if Atomic.compare_and_set t before after then
      match Trigger.await trigger with
      | None -> read t (* not canceled *)
      | Some exn_bt -> (* canceled *)
        cleanup t trigger; (* omitted *)
        Exn_bt.raise exn_bt
    else
      read t

Ivar Recap

Owner

  1. create trigger
  2. insert to data structure
  3. await
  4. determine result and cleanup

Other

  1. update data structure
  2. signal triggers

Design of Trigger
  • Single assignment, constant space final state
    • Simple and non-leaky
  • First order
    • Strong separation for scheduler
  • Separates allocation from await
    • Attach trigger to any number of places
    • May be signaled concurrently
    • Await performed last — fast path to skip effect
  • Does not propagate value
    • Simpler and safer (i.e. no value dropped)
  • Does not tell signaler whether target will be canceled
    • Maximal flexibility for scheduler

Computation

type 'a t

val create : unit -> 'a t (* running *)

val try_attach : 'a t -> Trigger.t -> bool
val detach : 'a t -> Trigger.t -> unit

val try_return : 'a t -> 'a -> bool (* completed *)
val try_cancel : 'a t -> Exn_bt.t -> bool (* completed *)

(* completion signals attached triggers *)

val check : 'a t -> unit (* may raise *)
val await : 'a t -> 'a (* or suspend or raise *)

Essence of Computation?

Cancelation context or token?

Promise? (Ivar)

type 'a t = 'a Computation.t
let create : unit -> 'a t = Computation.create
let try_fill : 'a t -> 'a -> bool = Computation.try_return
let read : 'a t -> 'a = Computation.await

Observability: Attach triggers momentarily.

Bidirectionality: Return or Cancel (also from outside).


Design of Computation
  • Single assignment, supporting resource cleanup
    • Simple and non-leaky
  • Unopinionated
    • Non-hierarchical
    • Not just for structured concurrency
  • Observability
    • Propagate in desired manner
  • Bidirectionality
    • Cancelation

Accidentally (almost) expressive enough to implement CML.


Fiber

val yield : unit -> unit
val spawn : forbid:bool -> 'a Computation.t -> (unit -> unit) list -> unit
type t

val current : unit -> t

val get_computation : t -> Computation.packed (* Packed : 'a t -> packed *)
val set_computation : t -> Computation.packed -> unit

val has_forbidden : t -> bool
val exchange : t -> forbid:bool -> bool

module FLS : sig (* ... *) end

Purpose: Share state with scheduler. Identity.


Design of Fiber
  • Avoid need to explicitly pass along a Computation
    • Convenience & Modularity (e.g. Mutex)
  • Effects are (relatively) expensive
    • Need ability to share some state with scheduler
      • forbid flag
      • associated computation
    • After you obtain the fiber record, accesses are fast
  • Identity of fiber record seems sufficient
  • FLS for extensions
    • faster & easier than separate hash tables on id

Cancelation protocol

Every fiber has a computation. When awaiting a trigger to be signaled, the trigger can be (momentarily) attached to a computation. If the computation is canceled, the trigger will be signaled.


Picos is implemented through Effects

  • Trigger.Await
  • Computation.Cancel_after (*)
  • Fiber.Current
  • Fiber.Yield
  • Fiber.Spawn

These are to be handled by a scheduler.


How does it work out then?


Samples


Performance

  • Ported a couple of benchmarks from Eio: Yield, Mutex

  • Caveat: Picos is different

    • Picos does not require structured concurrency
    • Picos supports parallelism-safe cancelation
    • Picos does not require IO to be done in the scheduler loop

Result: Picos_fifos seems to be competitive with (faster than) Eio

Point: It seems the interface allows efficient concurrency.


Future of Schedulers


Status quo

"Schedulers" are DEEP

Eio, Riot, Miou, ...

...have their own implementations of everything:

  • Synchronization: Mutex, Condition, ...
  • Async IO
  • Data structures
  • Structured concurrency

Thousands of LOC!


With Picos?

  • Scheduler: 100-500 LOC

    • It is just a data structure and a loop!
  • IO as a library

    • 500 LOC (IO event loop) + 500 LOC (IO wrappers)
    • Works on any Picos compatible scheduler
  • Mutex, Condition, Lazy, ..., Event: 100 LOC modules

  • Structured concurrency: 300 LOC

Infinite Diversity in Infinite Combinations 🖖


Future? (1/2)

  • "Schedulers" could become shallow "curated" layers

    • Want actors? Use Riot
    • Want capabilities? Use Eio
    • Want work-stealing? Use Moonpool or Domainslib
    • Want more traditional concurrency? Use Fuseau or Miou
  • Or maybe we will use different terminology

    • Scheduler is just a loop and a collection!
    • "Concurrent programming model"
  • The depth, e.g. IO libraries could be largely shared


Future? (2/2)

  • A single app could use libraries internally using different models

  • Collaboration instead of competition!

  • How to get everyone on board?

  • How should we organize development?

  • Interesting times...

Much bigger change than what might be obvious


Next steps for Picos

  • Alignment
  • Finish the API
  • Real IO libraries (uring, poll, iocp, ...)
  • Domainslib
  • Saturn, Kcas
  • Semantics

Questions?

title theme revealOptions
Picos &mdash; Interoperable effects based concurrency for OCaml
black
transition width height
fade
1280
720

Picos — Interoperable effects based concurrency for OCaml

Vesa Karvonen

Tarides
(Multicore Applications Team)


RvB

What color is your function?*

  • a → b
  • a → b m

You can't just call an async function from a sync function and obtain the result synchronously.

OCaml 5 🍻 — Let's introduce effects!

notes: 2015, Bob Nystrom


How did it go in OCaml?

Affect, Domainslib, Eio, Fuseau, Miou, Moonpool, Riot, ...

All mutually incompatible!

All using different effects.

So, you can't just call Eio's IO APIs from a Domainslib fiber, for example.

Because a Domainslib effect handler does not handle Eio's effects.

Functions of 8 different colors and counting! 🌈


Issues

  • Duplication of functionality
  • Prohibitive cost to innovate
  • Competition over resources
  • Community split

notes: cohttp, anything asynchronous


Picos

Interface between schedulers and concurrent abstractions

"POSIX of effects based schedulers"

notes: How could we avoid these issues?


Decoupling through the Picos interface

notes: dependencies, implements vs depends on


What is a "scheduler"? (1/2)

Analogy: Dispenser ADT (stack, queue, bag, ...)

val push : 'a dispenser -> 'a -> unit
val pop_opt : 'a dispenser -> 'a option

"Trivializes" the data structure

val pop_at_most_n : 'a special -> int -> 'a list

Not used by clients using the Dispenser ADT


What is a "scheduler"? (2/2)

  • Cooperative fiber multiplexer?
  • Sync and comm primitives?
  • Async IO integration?
  • Actors? Structured concurrency?
  • ...

Picos: multiplexer, timeouts, and enough to implement the rest as libs

"Trivializes" the scheduler, i.e. moves functionality outside of it


Motivating problems (1/2)

Cancelable synchronization

Mutex.protect mutex begin fun () -> (* lock *)
  while true do
    Condition.wait condition mutex (* unlock..lock *)
  done
end (* unlock *)

(Safety and Liveness?)

notes: running in a fiber, canceled at any point, mutex must be re-acquired


Motivating problems (2/2)

Structured concurrency

(* ... allocate resources ... *)
Fun.protect ~finally begin fun () ->

  Flock.join_after begin fun () ->
    (* ... *)
    Flock.fork begin fun () ->
      (* ... may use resources ... *)
    end;
    (* ... *)
  end;
  (* ... resources no longer used ... *)
end

(Safety and Liveness?)

notes: RESOURCES, again running in a fiber, canceled at any point, must not call finally too early


The architecture of Picos


Trio of abstractions

Trigger — await, signal

Computation — attach/detach trigger, complete, timeout

Fiber — identity, forbid/permit, set/get computation, FLS

(These are not for application programming. Should not appear in APIs.)

notes: TRIGGER for suspending a fiber to await. COMPUTATION for communicating the status of a computation. FIBER for sharing a bit of state with the scheduler.


Trigger

(This is and following are extracts from the full signatures.)

type t

val create : unit -> t (* initial *)

val signal : t -> unit (* signaled *)

val await : t -> (exn * raw_backtrace) option (* awaiting *)

(single-assignment, MPSC, first-order, fast path, non-leaky, awaiter handles cancelation)


Computation

type 'a t

val create : unit -> 'a t (* running *)

val try_return : 'a t -> 'a -> bool (* completed *)
val try_cancel : 'a t -> exn -> raw_backtrace -> bool (* completed *)

val check : 'a t -> unit (* may raise *)
val await : 'a t -> 'a (* or suspend or raise *)

val try_attach : 'a t -> Trigger.t -> bool (* signal on completion *)
val detach : 'a t -> Trigger.t -> unit

val cancel_after : 'a t -> seconds:float -> exn -> raw_backtrace -> unit

(single-assignment, MPMC, non-hierarchical, bracket, bidirectional observability, CML)

notes: Computation just holds the state of whether something running or completed either with a return value or with an exception.


Fiber

val yield : unit -> unit
type t

val current : unit -> t

val create : ~forbid:bool -> 'a Computation.t -> t
val spawn : t -> (t -> unit) -> unit

val get_computation : t -> Computation.packed (* Packed : 'a t -> packed *)
val set_computation : t -> Computation.packed -> unit

val has_forbidden : t -> bool
val exchange : t -> forbid:bool -> bool

module FLS : sig (* ... *) end

(identity, SO, share state with scheduler, context)


Cancelation protocol

Every fiber is associated with a computation.

When awaiting a trigger to be signaled, the scheduler attaches the trigger to the computation associated with the fiber.

Cancelation of the computation signals the trigger.

notes: That is how an await can be canceled through the cancelation of a computation.


Picos is implemented through Effects

To be handled by a scheduler:

  • Trigger.Await
  • Computation.Cancel_after
  • Fiber.Current
  • Fiber.Yield
  • Fiber.Spawn

(The Picos interface performs these effects. Scheduler is free to decide which ready fiber to run next.)


And that is all

(You implement all the concurrent abstractions through those.)

notes: Of course, contracts, operations for schedulers, helpers. Picos is all about cancelation, i.e. setting up contexts or scopes and reacting to cancelation.


Results

(i.e. What can you actually implement?)


Samples

You could already write some applications with these?

notes: Simon Cruanes


A toy program (1/2)

open Picos_std_structured (* Flock *)
open Picos_std_sync (* Mutex *)

let main () =
  let m = Mutex.create () in
  Flock.join_after begin fun () ->
    for _=1 to 4 do
      Flock.fork @@ fun () ->
        Unix.sleepf 1.0; (* block thread to simulate work *)
        let dom = (Domain.self () :> int) in
        Mutex.protect m @@ fun () ->
          Printf.printf "%d %!" dom
    done
  end

notes: Forks four fibers. Simulates work. Prints the domain id. Uses mutex to show it exists.


A toy program (2/2)

# Picos_mux_fifo.run main ;;
0 0 0 0 - : unit = ()
# Picos_mux_multififo.run_on ~n_domains:4 main ;;
3 2 0 1 - : unit = ()
# Picos_mux_random.run_on ~n_domains:2 main ;;
0 4 4 0 - : unit = ()
# Picos_lwt_unix.run main ;;
0 0 0 0 - : unit = ()

(Picos is not a scheduler!)


HTTP server (1/4)

open Cohttp (* Body *)
open Picos_io (* Unix *)
open Picos_io_cohttp (* Server, Client *)
open Picos_std_finally (* let@, finally *)
open Picos_std_structured (* Flock *)

(Whoa! Is this Haskell?)


HTTP server (2/4)

let server server_socket () =
  let callback _conn _req req_body =
    let res_body =
      Printf.sprintf "Hello, %s from %d!"
        (Body.to_string req_body)
        (Domain.self () :> int)
    in
    Server.respond_string ~status:`OK ~body:res_body ()
  in
  Server.run (Server.make ~callback ()) server_socket
let client server_uri name () =
  let _resp, res_body =
    Client.post ~body:(`String name) server_uri
  in
  Printf.printf "%s\n%!" (Body.to_string res_body)

notes: Client sends name in body. Server responds with a greeting. Should be familiar to Cohttp users.


HTTP server (3/4)

let main () =
  let@ server_socket = finally Unix.close @@ fun () ->
    Unix.socket ~cloexec:true PF_INET SOCK_STREAM 0
  in
  Unix.set_nonblock server_socket;
  Unix.bind server_socket Unix.(ADDR_INET (inet_addr_loopback, 8000));
  Unix.listen server_socket 100;
  Flock.join_after ~on_return:`Terminate @@ fun () ->
    Flock.fork (server server_socket);
    Flock.join_after ~on_return:`Wait @@ fun () ->
      let server_uri = Uri.of_string "http://127.0.0.1:8000" in
      for i=1 to 10 do
        Flock.fork @@ client server_uri (Printf.sprintf "Picos %d" i)
      done

notes: No leaks. Setup. Terminate server. Fork server. Wait for clients. Fork clients.


HTTP server (4/4)

# Picos_mux_random.run_on ~n_domains:4 main ;;
Hello, Picos 3 from 0!
Hello, Picos 9 from 1!
Hello, Picos 2 from 1!
Hello, Picos 8 from 1!
Hello, Picos 1 from 2!
Hello, Picos 7 from 0!
Hello, Picos 5 from 3!
Hello, Picos 4 from 0!
Hello, Picos 6 from 3!
Hello, Picos 10 from 3!
- : unit = ()

(Again, we could run this with all the sample schedulers (including Lwt).)


Insert your sample here!

notes: Picos interface vs Samples. You could implement your own! Everything is interchangeable and interoperable.


Performance?


First

  • Picos does not require structured concurrency
  • Picos supports parallelism-safe cancelation
  • Picos does not require IO to be done in the scheduler loop
  • Attention has been paid to performance

A data point on memory use

Eio currently seems to take about 504 bytes per fiber.

With similar features Picos takes between 160 and 312 bytes per fiber using the Picos_mux_fifo scheduler.

160 bytes for non-promise fiber in a Flock and 312 for a promise in a Bundle. (Promise allows an individual fiber to be canceled.)

I believe roughly 32 bytes of that is due to an unnecessary closure allocation in the Stdlib. IOW, it could be reduced to 128 bytes per fiber.

notes: All the abstractions (Trigger, Computation, Fiber) are designed to be light-weight.


Benchmarks

"Ported" couple of benchmarks from Eio using Picos_mux_fifo.

Yield (lower is better):

Eio   190ns - 950ns
Picos 130ns - 330ns

Semaphore (fibers, lower is better):

Eio   1800ns - 7400ns
Picos  310ns - 1200ns

(Apples and Oranges.)

notes: Number of fibers 1 to 10_000 in Yield. Four fibers acquiring, yielding, releasing. The number of resources from 4 (fastest) to 2 (slowest).


tiny_httpd experiment (c-cube)

i7-12700 (16 or 18 domains)
$ wrk -c 300 -t 4 -d 20 http://127.0.0.1:8082/hello/world
Requests/sec: 514297.71

M3 Max (6 domains)
$ wrk -c 300 -t 4 -d 20 http://127.0.0.1:8082/hello/world
Requests/sec: 168557.74

i7-1165G7 (8 domains)
$ wrk -c 300 -t 4 -d 20 http://127.0.0.1:8082/hello/world
Requests/sec: 189782.59

(This is not terrible. There is definitely room to improve!)

notes: Simon Cruanes


Bottom line

Picos allows efficient concurrency.


Future of Schedulers


Status quo

"Schedulers" are DEEP

Eio, Riot, Miou, ...

...have their own implementations of everything:

  • Synchronization: Mutex, Condition, ...
  • Async IO
  • Data structures
  • Structured concurrency

Thousands of LOC!

notes: And let's not forget all the libraries outside of the scheduler!


With Picos?

  • Scheduler: 100-500 LOC

    • It is just a data structure and a loop!
  • IO as a library

    • 500 LOC (IO event loop) + 500 LOC (IO wrappers)
    • Works on any Picos compatible scheduler
  • Mutex, Condition, Lazy, ..., Event: 100 LOC modules

  • Structured concurrency: 300 LOC

Infinite Diversity in Infinite Combinations 🖖

notes: These are rough numbers!


Future? (1/2)

  • Maybe we will use different terminology

    • Scheduler is just a loop and a collection!
    • Write your own pthreads implementation? No
    • "Concurrent programming model"
  • Models could become shallow "curated" layers

    • Want actors? Use ...
    • Want capabilities? Use ...
    • Want more traditional concurrency? Use ...
  • SchedulersMultiplexers, basic libs, and IO integrations could be largely shared

notes: Model = how you structured fibers, how do you synchronize and communicate, and so on.


Future? (2/2)

  • A single app could use libraries internally using different models

  • A single app could also run multiple schedulers

  • Collaboration instead of competition!

Much bigger change than what might be obvious


Next steps for Picos

  • Computation vs CancellationToken
  • TLS based Fiber.current and fiber spawning
  • Missing functionality (thread pool over domains, fork-join)
  • Work-stealing scheduler
  • Better IO integrations (epoll, uring, kqueue, iocp, ...)
  • Saturn, Kcas
  • Semantics

Questions?

More is more!

title theme revealOptions
Picos &mdash; Interoperable effects based concurrency
black
transition width height
fade
1280
800

Picos — Interoperable effects based concurrency

Vesa Karvonen


The problem

What color is your function?*

  • a -> b
  • a -> b m

You can't just call an async function from a sync function and obtain the result synchronously.

OCaml 5: Let's use effects!


How did it go?

Affect, Domainslib, Eio, Fuseau, Miou, Moonpool, Riot, ...

All mutually incompatible!

You can't just call Eio's IO APIs from a Domainslib fiber.

Functions of 8 different colors and counting! 🌈


Issues

  • Duplication of functionality
  • Prohibitive cost to innovate
  • Competition over resources
  • Community split

Recap

Code written against a specific scheduler...

...cannot run on other schedulers.

Solutions?

A level of indirection, perhaps? 🤔


Picos

Interface between schedulers and concurrent abstractions


  Libraries & Applications

------ Sync and Comm \
----------- Async IO |
---------- Conn pool |
------ Concurrent DS +-->  Picos  <-- Schedulers
- Structuring models |
----- Parallel algos |
---------------- STM /

  "Implemented in Picos"         "Picos compatible"

In other words

Picos aims to be a just rich enough interface that one can write most concurrent software without depending on a specific scheduler.

Duplication? Cost? Competition? Split?


The architecture of Picos


Understanding cancelation

Mutex.protect mutex begin fun () -> (* lock *)
  while true do
    Condition.wait condition mutex (* unlock..lock *)
  done
end (* unlock *)

Liveness?

Safety?


Trio of concepts working together

Design largely driven by need to support cancelable await with e.g. mutexes and condition variables, and structured concurrency

Triggerto await

Computationto publish, observe, and control

Fiberfor autonomy

These are not for application programming.


Trigger

type t

val create : unit -> t (* initial *)

val await : t -> Exn_bt.t option (* awaiting *)

val signal : t -> unit (* signaled *)

Ivar using Trigger (1/3)
type 'a t
val create : unit -> 'a t
val try_fill : 'a t -> 'a -> bool
val read : 'a t -> 'a
type 'a state =
  | Filled of 'a
  | Empty of Trigger.t list
type 'a t = 'a state Atomic.t

Ivar using Trigger (2/3)
let rec try_fill t value =
  match Atomic.get t with
  | Filled _ -> false
  | Empty triggers as before ->
    let after = Filled value in
    if Atomic.compare_and_set t before after then begin
      List.iter Trigger.signal triggers;
      true
    end
    else
      try_fill t value

Ivar using Trigger (3/3)
let rec read t =
  match Atomic.get t with
  | Filled value -> value
  | Empty triggers as before ->
    let trigger = Trigger.create () in
    let after = Empty (trigger :: triggers) in
    if Atomic.compare_and_set t before after then
      match Trigger.await trigger with
      | None -> read t (* not canceled *)
      | Some exn_bt -> (* canceled *)
        cleanup t trigger; (* omitted *)
        Exn_bt.raise exn_bt
    else
      read t

Ivar Recap

Owner

  1. create trigger
  2. insert to data structure
  3. await
  4. determine result and cleanup

Other

  1. update data structure
  2. signal triggers

Design of Trigger
  • Single assignment, constant space final state
    • Simple and non-leaky
  • First order (Arthur!)
    • Strong separation for scheduler
  • Separates allocation from await
    • Attach trigger to any number of places
    • May be signaled concurrently
    • Await performed last — fast path to skip effect
  • Does not propagate value
    • Simpler and safer (i.e. no value dropped)
  • Does not tell signaler whether target will be canceled
    • Maximal flexibility for scheduler

Computation

type 'a t

val create : unit -> 'a t (* running *)

val try_attach : 'a t -> Trigger.t -> bool
val detach : 'a t -> Trigger.t -> unit

val try_return : 'a t -> 'a -> bool (* completed *)
val try_cancel : 'a t -> Exn_bt.t -> bool (* completed *)

(* completion signals attached triggers *)

val check : 'a t -> unit (* may raise *)
val await : 'a t -> 'a (* or suspend or raise *)

Essence of Computation?

Cancelation context?

Promise? (Ivar)

type 'a t = 'a Computation.t
let create : unit -> 'a t = Computation.create
let try_fill : 'a t -> 'a -> bool = Computation.try_return
let read : 'a t -> 'a = Computation.await

Observability: Attach triggers momentarily.

Bidirectionality: Return or Cancel (also from outside).


Design of Computation
  • Single assignment, with resource cleanup
    • Simple and non-leaky
  • Unopinionated
    • Non-hierarchical
    • Not just for structured concurrency
  • Observability
    • Propagate in desired manner
  • Bidirectionality
    • Cancelation

Accidentally (almost) expressive enough to implement CML.


Fiber

val yield : unit -> unit
val spawn : forbid:bool -> 'a Computation.t -> (unit -> unit) list -> unit
type t

val current : unit -> t

val get_computation : t -> Computation.packed (* Packed : 'a t -> packed *)
val set_computation : t -> Computation.packed -> unit

val has_forbidden : t -> bool
val exchange : t -> forbid:bool -> bool

module FLS : sig (* ... *) end

Purpose: Share state with scheduler. Identity.


Design of Fiber
  • Effects are (relatively) expensive
  • Need ability to share some state with scheduler
    • forbid flag
    • associated computation
  • After you obtain the fiber record, accesses are fast
  • Identity of fiber record seems sufficient
  • FLS for extensions
    • faster than separate hash tables on id
    • and easier to manage

Cancelation protocol

Every fiber has a computation. When awaiting a trigger to be signaled, the trigger can be (momentarily) attached to a computation. If the computation is canceled, the trigger will be signaled.


Implemented through Effects

  • Trigger.Await
  • Computation.Cancel_after
  • Fiber.Current
  • Fiber.Yield
  • Fiber.Spawn

These are to be handled by a scheduler.


What is a "Scheduler"?

Data structure for ready fibers

Has a loop to process ready fibers

val run : ?forbid:bool -> (unit -> 'a) -> 'a

100 to 500 LOC


Samples


Performance

  • Ported a couple of benchmarks from Eio: Yield, Mutex

  • Caveat: Picos is different

    • Picos does not require structured concurrency
    • Picos supports parallelism-safe cancelation
    • Picos does not require IO to be done in the scheduler loop

Result: Picos_fifos seems to be competitive with (faster than) Eio

Point: It seems the interface allows efficient concurrency.


Future of Schedulers


Status quo

"Schedulers" are DEEP

Eio, Riot, Miou, ...

...have their own implementations of everything:

  • Synchronization: Mutex, Condition, ...
  • Async IO
  • Data structures
  • Structured concurrency

Thousands of LOC!


With Picos?

  • Scheduler: 100-500 LOC

    • It is just a data structure and a loop!
  • IO as a library

    • 500 LOC (IO event loop) + 500 LOC (IO wrappers)
    • Works on any Picos compatible scheduler
  • Mutex, Condition, Lazy, ..., Event: 100 LOC modules

  • Structured concurrency: 300 LOC

Infinite Diversity in Infinite Combinations 🖖


Future? (1/2)

  • "Schedulers" could become shallow(!) "curated" layers

    • Want actors? Use Riot
    • Want capabilities? Use Eio
    • Want work-stealing? Use Moonpool or Domainslib
    • Want more traditional concurrency? Use Fuseau or Miou
  • Or maybe we will use some other name

    • Scheduler is just a loop and a collection!
    • Concurrent programming model
  • The depth, e.g. IO libraries could be largely shared


Future? (2/2)

  • A single app could use libraries internally using different models

  • Collaboration instead of competition!

  • How to get everyone on board?

  • How should we organize development?

  • Interesting times...

Much bigger change than what might be obvious


Next steps for Picos

  • Alignment
  • Real IO libraries (uring, poll, iocp, ...)
  • More libraries
  • Domainslib
  • Saturn, Kcas

Questions?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment