Skip to content

Instantly share code, notes, and snippets.

@rupertlssmith
Last active April 13, 2026 21:56
Show Gist options
  • Select an option

  • Save rupertlssmith/845f40ec6cb3e005538c78e24e8071cf to your computer and use it in GitHub Desktop.

Select an option

Save rupertlssmith/845f40ec6cb3e005538c78e24e8071cf to your computer and use it in GitHub Desktop.
Elm actor model with Kafka like messaging and typed mail boxes.
module Actor.Core exposing
( Process
, Actor
, spawn
, self
, kill
)
import Debug
import Platform.Cmd exposing (Cmd)
import Platform.Sub exposing (Sub)
import Task exposing (Task)
-- PROCESS / ACTOR LIFECYCLE
type Process msg
= Process
type alias Actor flags model msg =
{ init : flags -> ( model, Cmd msg )
, update : msg -> model -> ( model, Cmd msg )
, subscriptions : model -> Sub msg
}
spawn :
Actor flags model msg
-> flags
-> Task x (Process msg)
spawn =
Debug.todo "spawn"
self :
Task x (Process msg)
self =
Debug.todo "self"
kill :
Process msg
-> Task x ()
kill =
Debug.todo "kill"
module Actor.P2P exposing
( Key
, Subject
, Selector
, subject
, send
, sendKeyed
, selector
, selectMap
, map
, filter
, orElse
, withTimeout
, select
, subscribe
)
import Debug
import Platform.Sub exposing (Sub)
import Task exposing (Task)
import Time exposing (Time)
-- P2P SUBJECTS AND SELECTORS (NORMAL LAYER)
type alias Key =
String
type Subject a
= Subject
type Selector msg result
= Selector
subject :
Task x (Subject a)
subject =
Debug.todo "subject"
send :
Subject a
-> a
-> Task x ()
send =
Debug.todo "send"
sendKeyed :
Subject a
-> Key
-> a
-> Task x ()
sendKeyed =
Debug.todo "sendKeyed"
selector :
Selector msg msg
selector =
Debug.todo "selector"
selectMap :
Subject a
-> (a -> msg)
-> Selector msg result
-> Selector msg result
selectMap =
Debug.todo "selectMap"
map :
(result -> result2)
-> Selector msg result
-> Selector msg result2
map =
Debug.todo "map"
filter :
(result -> Bool)
-> Selector msg result
-> Selector msg result
filter =
Debug.todo "filter"
orElse :
Selector msg result
-> Selector msg result
-> Selector msg result
orElse =
Debug.todo "orElse"
withTimeout :
Time
-> result
-> Selector msg result
-> Selector msg result
withTimeout =
Debug.todo "withTimeout"
select :
Selector msg result
-> Task x result
select =
Debug.todo "select"
subscribe :
Selector msg result
-> (result -> parentMsg)
-> Sub parentMsg
subscribe =
Debug.todo "subscribe"
module Actor.Advanced.P2P exposing
( Key
, Meta
, Envelope
, Subject
, Selector
, subject
, send
, sendKeyed
, resendAsPossibleDuplicate
, sendWithPartitionKey
, selector
, selectMap
, map
, filter
, orElse
, withTimeout
, select
, subscribe
, selectWithMeta
, subscribeWithMeta
)
import Debug
import Platform.Sub exposing (Sub)
import Task exposing (Task)
import Time exposing (Time)
-- TYPES
type alias Key =
String
type alias Meta =
{ key : Maybe Key
, possibleDuplicate : Bool
, sequence : Maybe Int
}
type alias Envelope a =
{ meta : Meta
, payload : a
}
type Subject a
= Subject
type Selector msg result
= Selector
-- SUBJECT / SEND (ADVANCED)
subject :
Task x (Subject a)
subject =
Debug.todo "subject"
send :
Subject a
-> a
-> Task x ()
send =
Debug.todo "send"
sendKeyed :
Subject a
-> Key
-> a
-> Task x ()
sendKeyed =
Debug.todo "sendKeyed"
resendAsPossibleDuplicate :
Subject a
-> Key
-> a
-> Task x ()
resendAsPossibleDuplicate =
Debug.todo "resendAsPossibleDuplicate"
sendWithPartitionKey :
(a -> Key)
-> Subject a
-> a
-> Task x ()
sendWithPartitionKey =
Debug.todo "sendWithPartitionKey"
-- SELECTOR COMBINATORS (ADVANCED, SAME SHAPES AS NORMAL)
selector :
Selector msg msg
selector =
Debug.todo "selector"
selectMap :
Subject a
-> (a -> msg)
-> Selector msg result
-> Selector msg result
selectMap =
Debug.todo "selectMap"
map :
(result -> result2)
-> Selector msg result
-> Selector msg result2
map =
Debug.todo "map"
filter :
(result -> Bool)
-> Selector msg result
-> Selector msg result
filter =
Debug.todo "filter"
orElse :
Selector msg result
-> Selector msg result
-> Selector msg result
orElse =
Debug.todo "orElse"
withTimeout :
Time
-> result
-> Selector msg result
-> Selector msg result
withTimeout =
Debug.todo "withTimeout"
select :
Selector msg result
-> Task x result
select =
Debug.todo "select"
subscribe :
Selector msg result
-> (result -> parentMsg)
-> Sub parentMsg
subscribe =
Debug.todo "subscribe"
-- META-AWARE SELECTION / SUBSCRIPTION
selectWithMeta :
Selector ( Meta, msg ) result
-> Task x result
selectWithMeta =
Debug.todo "selectWithMeta"
subscribeWithMeta :
Selector ( Meta, msg ) result
-> (result -> parentMsg)
-> Sub parentMsg
subscribeWithMeta =
Debug.todo "subscribeWithMeta"
module Actor.PubSub exposing
( Key
, Topic
, topic
, publish
, publishKeyed
, subscribe
)
import Debug
import Platform.Sub exposing (Sub)
import Task exposing (Task)
-- PUB/SUB TOPICS (NORMAL LAYER)
type alias Key =
String
type Topic a
= Topic
topic :
Task x (Topic a)
topic =
Debug.todo "topic"
publish :
Topic a
-> a
-> Task x ()
publish =
Debug.todo "publish"
publishKeyed :
Topic a
-> Key
-> a
-> Task x ()
publishKeyed =
Debug.todo "publishKeyed"
subscribe :
Topic a
-> (a -> parentMsg)
-> Sub parentMsg
subscribe =
Debug.todo "subscribe"
module Actor.Topic exposing
( Key
, Group
, Partition
, Offset
, Topic
, Consumer
, topic
, publish
, publishKeyed
, publishWithPartitionKey
, subscribeGroup
, seekToBeginning
, seekToEnd
, seekToOffset
)
import Debug
import Platform.Sub exposing (Sub)
import Task exposing (Task)
-- DURABLE, PARTITIONED TOPICS (KAFKA-LIKE, NORMAL LAYER)
type alias Key =
String
type alias Group =
String
type alias Partition =
Int
type alias Offset =
Int
-- A durable, partitioned log of messages of type `a`.
type Topic a
= Topic
-- A consumer handle representing membership in a consumer group for a topic.
type Consumer a
= Consumer
-- TOPIC CREATION & PUBLISHING
{-| Create a new durable topic with the specified number of partitions.
The implementation decides where and how the log is stored (in-memory,
on disk, etc.). The important contracts are:
* Messages are appended to a partition's log.
* Offsets are monotonic per partition.
-}
topic :
{ partitions : Int }
-> Task x (Topic a)
topic =
Debug.todo "topic"
{-| Publish a message to the topic.
Partition selection is implementation-defined (e.g. round-robin) if
no key is provided. Returns when the message has been appended.
-}
publish :
Topic a
-> a
-> Task x ()
publish =
Debug.todo "publish"
{-| Publish a keyed message to the topic.
Partition is chosen as `hash(key) mod numPartitions`, and ordering
is guaranteed per key within a given partition.
-}
publishKeyed :
Topic a
-> Key
-> a
-> Task x ()
publishKeyed =
Debug.todo "publishKeyed"
{-| Publish using a function that derives a key from the payload.
Convenience wrapper for `publishKeyed`:
publishWithPartitionKey f topic payload
== publishKeyed topic (f payload) payload
-}
publishWithPartitionKey :
(a -> Key)
-> Topic a
-> a
-> Task x ()
publishWithPartitionKey =
Debug.todo "publishWithPartitionKey"
-- CONSUMPTION (CONSUMER GROUPS)
{-| Subscribe a consumer group to a topic.
All consumers with the same `Group` name form a group. Partitions
are distributed across group members; each partition is consumed by
exactly one group member at a time.
For each message consumed by this group member, `toMsg` is called to
produce the program's `parentMsg`.
-}
subscribeGroup :
Topic a
-> Group
-> (a -> parentMsg)
-> Sub parentMsg
subscribeGroup =
Debug.todo "subscribeGroup"
{-| Create a handle for a consumer in a group.
This can be used to control offsets (seek/replay). The returned
Consumer is tied to a specific topic and group.
-}
consumer :
Topic a
-> Group
-> Task x (Consumer a)
consumer =
Debug.todo "consumer"
-- OFFSET CONTROL (REPLAY / SEEK)
{-| Seek this consumer's position to the beginning of all assigned partitions.
Subsequent messages delivered via `subscribeGroup` for this consumer
group member will start from the earliest available offsets.
-}
seekToBeginning :
Consumer a
-> Task x ()
seekToBeginning =
Debug.todo "seekToBeginning"
{-| Seek this consumer's position to the end of all assigned partitions.
Subsequent messages delivered via `subscribeGroup` for this consumer
group member will only include new messages published after the seek.
-}
seekToEnd :
Consumer a
-> Task x ()
seekToEnd =
Debug.todo "seekToEnd"
{-| Seek this consumer's position to a specific offset in a specific partition.
Semantics:
* If the consumer does not own the partition (in the current rebalance),
the implementation may fail or defer the seek until it owns it.
* Offsets are topic/partition-specific; the implementation enforces that.
-}
seekToOffset :
Consumer a
-> Partition
-> Offset
-> Task x ()
seekToOffset =
Debug.todo "seekToOffset"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment