Skip to content

Instantly share code, notes, and snippets.

@kputnam
Last active August 29, 2015 14:03
Show Gist options
  • Save kputnam/007b04fefa49e2670000 to your computer and use it in GitHub Desktop.
Save kputnam/007b04fefa49e2670000 to your computer and use it in GitHub Desktop.
module Window
( Window
-- | Monoidal windows
, empty
, update
, shift
-- | Non-monoidal windows
, replicate
, modify
, push
-- | Examples
, ksRainfall
, moRainfall
, moCurrent
, pFoldHomomorphism
, ksRainfall0
, ksRainfall1
, ksRainfall2
, ksRainfall3
, ksTotal
, pExtendHomomorphism
, Max(..)
, maxRainfall
, Union(..)
, userLogins0
, userLogins1
, isRecent
, countUsers
, countUsersPerFrame
, pNondisjointSum
) where
import Prelude hiding (foldr, replicate)
import Data.Monoid
import Data.Foldable
import Data.Traversable
import Control.Comonad
import Control.Applicative hiding (empty)
import qualified Data.Set as S
-- | Slidable window of at least one value (frame).
--
-- The window can be advanced in one direction using `shift` and `push`.
--
-- The current frame can be updated using `update` or `modify`.
--
-- The comonad interface provides access to the current frame with the
-- `extract` function. It also lets us compute a *cumulative* sliding
-- window over another sliding window.
--
-- Computing the running total of the window is accomplished using the
-- Foldable interface: `fold` for monoidal frames or `foldMap` otherwise.
-- This interface also provides `toList`.
--
-- If the frames are effectful applicative or monad actions, we can
-- sequence those effects using the Traversable interface. For instance,
-- the frames might have effects endowed by Maybe, Either, or IO.
--
data Window a
= Window a [a]
deriving (Eq, Read, Show)
instance Monoid m => Monoid (Window m) where
-- | Single frame window
mempty = Window mempty []
-- | Merge two windows. Truncates to the size of the shortest
-- window if their sizes differ
mappend (Window a as) (Window b bs) =
Window (a `mappend` b) (zipWith mappend as bs)
instance Functor Window where
fmap f (Window x xs) = Window (f x) (fmap f xs)
instance Applicative Window where
-- | Infinitely long window
pure x = Window x (repeat x)
-- | Apply a window of functions to a window of arguments
Window f fs <*> Window x xs =
Window (f x) (zipWith ($) fs xs)
-- TODO: What sensible monad instances exist?
instance Comonad Window where
-- | Return the current frame
extract (Window x _) = x
-- | Compute a window of "running sums" on each suffix of this window
extend f w@(Window _ []) = Window (f w) []
extend f w@(Window _ (x:xs)) = Window (f w) (toList.extend f $ Window x xs)
instance Foldable Window where
-- | Merge frames into single monoidal value
foldMap f (Window a as) = foldMap f (a:as)
-- | Merge frames into single value (not necesarrily monoidal)
foldr f z (Window a as) = foldr f z (a:as)
instance Traversable Window where
-- | Map each frame to an action, evaluate (newest to oldest), and collect the results
traverse f (Window a as) = Window <$> f a <*> traverse f (a:as)
-- Monoidal windows
-------------------------------------------------------------------------------
-- | Construct an empty fixed-size window
empty :: Monoid m => Int -> Window m
empty size = Window mempty [mempty | _ <- [2..size]]
-- | Update the current frame
update :: Monoid m => m -> Window m -> Window m
update m' (Window m ms) = Window (m' `mappend` m) ms
-- | Slide the window, expiring the oldest frame
shift :: Monoid m => Window m -> (Window m, m)
shift (Window m ms) = (Window mempty (m:init ms), last (m:ms))
-- Non-monoidal windows
-------------------------------------------------------------------------------
-- | Construct a fixed-size window
replicate :: Int -> a -> Window a
replicate size a = Window a [a | _ <- [2..size]]
-- | Update the current frame
modify :: (a -> a) -> Window a -> Window a
modify f (Window a as) = Window (f a) as
-- | Slide the window, expiring the oldest frame
push :: Window a -> a -> (Window a, a)
push (Window a as) a' = (Window a' (a:init as), last (a:as))
-- Examples
-------------------------------------------------------------------------------
-- Incremental MO rainfall measurements (eg, cm per minute). The `Sum`
-- wrapper around each frame means we can combine multiple writes to the
-- active frame by adding elements together. It also lets us merge two
-- windows together, by merging corresponding frames.
moRainfall :: Window (Sum Double)
moRainfall = Sum `fmap` Window 3.9 [3.7, 3.1, 4.0, 3.3, 4.1, 4.4]
-- The rainfall in the current frame (delta since last measurement)
moCurrent = extract moRainfall
-- => Sum {getSum = 3.9}
-- Incremental KS rainfall measurements (eg, cm per minute)
ksRainfall :: Window (Sum Double)
ksRainfall = Sum `fmap` Window 3.9 [0.7, 0.1, 0.0, 0.3, 0.1, 0.4]
-- Total rainfall for the duration of the window
ksTotal :: Sum Double
ksTotal = fold ksRainfall
-- => Sum {getSum = 5.5}
-- This property means we can distribute data sets across nodes and
-- when we want to compute the running total across the whole data set,
-- it doesn't matter if each node computes a running total and merge
-- them, or if we merge the partitions and then compute the running total
-- on the whole data set. Of course the first option is probably more
-- efficient, because a running total is a small value compared to a
-- partitioned data set, and requires less communication overhead!
--
-- This fails because of stupid double precision!
pFoldHomomorphism :: (Eq m, Monoid m) => Window m -> Window m -> Bool
pFoldHomomorphism a b = fold (a `mappend` b) == fold a `mappend` fold b
-- >> pFoldHomomorphism ksRainfall moRainfall
-- If the last frame in ksRainfall represents time 0, and the active
-- frame represents now, how much total rain has fallen at each time
-- frame?
ksRainfall0 = extend fold ksRainfall
-- >> getSum `fmap` ksRainfall0
-- => Window 5.5 [1.6,0.9,0.8,0.8,0.5,0.4]
-- We can either combine two windows frame-wise and compute the
-- sliding cumulative window OR we can first compute a sliding
-- cumulative window for each and combine the two cumulative windows.
--
-- This fails because of stupid double precision!
pExtendHomomorphism :: (Eq m, Monoid m) => Window m -> Window m -> Bool
pExtendHomomorphism a b = extend fold (a `mappend` b) == extend fold a `mappend` extend fold b
-- >> pExtendHomomorphism ksRainfall moRainfall
-- It rained 10.1cm more within the current time interval!
ksRainfall1 = update (Sum 10.1) ksRainfall
-- >> getSum `fmap` ksRainfall1
-- => Window 14.0 [0.7,0.1,0.0,0.3,0.1,0.4]
-- Or we can replace the current frame, rather than accumulate
ksRainfall2 = modify (const (Sum 10.1)) ksRainfall
-- >> getSum `fmap` ksRainfall2
-- => Window 10.1 [0.7,0.1,0.0,0.3,0.1,0.4]
-- We can move the window forward, creating an empty active frame, then
-- update the new active frame.
--
-- Conceivably we could have an MVar that contains a window, shared
-- with multiple threads, and a background thread to slide it forward
-- at some frequency. Worker threads could update the current frame.
ksRainfall3 = update (Sum 0.1) . fst . shift $ ksRainfall
-- >> getSum `fmap` ksRainfall3
-- => Window 0.1 [3.9,0.7,0.1,0.0,0.3,0.1]
newtype Max a
= Max { getMax :: Maybe a }
deriving (Eq, Show, Read, Ord)
instance Ord a => Monoid (Max a) where
mempty = Max Nothing
mappend (Max Nothing) b = b
mappend a (Max Nothing) = a
mappend (Max a) (Max b) = Max (max <$> a <*> b)
-- Let's compute the peak rainfall at each time frame across both cities
maxRainfall = ksMax `mappend` moMax
where ksMax = (Max . Just . getSum) `fmap` ksRainfall
moMax = (Max . Just . getSum) `fmap` moRainfall
-- >> getMax `fmap` maxRainfall
-- => Window (Just 3.9) [Just 3.7,Just 3.1,Just 4.0,Just 3.3,Just 4.1,Just 4.4]
-- What was the peak rainfall rate for the duration of the window?
-- >> fold maxRainfall
-- => Max {getMax = Just 4.4}
-- What was the peak rainfall up to each point in time?
-- >> extend fold maxRainfall
-- => Window (Just 4.4) [Just 4.4,Just 4.4,Just 4.4,Just 4.4,Just 4.4,Just 4.4]
newtype Union a = Union { getUnion :: S.Set a }
deriving (Eq, Read, Show)
instance Ord a => Monoid (Union a) where
mempty = Union S.empty
mappend (Union a) (Union b) = Union (a `S.union` b)
-- Windowed data set representing which users authenticated during
-- each time frame. Users are single characters, so "abc" is ['a','b','c']
userLogins0 :: Window (Union Char)
userLogins0 = (Union . S.fromList) `fmap` Window "nm" ["jkl", "ih", "efg"]
-- In the next time interval, users 'a', 'b', and 'c' login
userLogins1 :: Window (Union Char)
userLogins1 = update (Union (S.fromList "abc")) (fst (shift userLogins0))
-- Did this user login recently?
isRecent :: Window (Union Char) -> Char -> Bool
isRecent us u = u `S.member` (getUnion $ fold us)
-- >> isRecent userLogins0 'a'
-- => False
-- How many users logged in recently?
countUsers :: Window (Union Char) -> Int
countUsers = S.size . getUnion . fold
-- >> countUsers userLogins0
-- => 10
-- How many users logged in during each frame?
countUsersPerFrame :: Window (Union Char) -> Window Int
countUsersPerFrame = fmap (S.size . getUnion)
-- >> countUsersPerFrame userLogins0
-- => Window 2 [3,2,3]
-- The whole does not equal the sum of the parts! Compare what happens when
-- each frame has no common users to what happens when some users login during
-- different frames (e.g, (Union . S.fromList) `fmap` Window "a" ["ab", "ma", "a"])
pNondisjointSum :: Window (Union Char) -> Bool
pNondisjointSum us = countUsers us == getSum (foldMap Sum (countUsersPerFrame us))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment