Skip to content

Instantly share code, notes, and snippets.

@dasch
Last active January 29, 2017 03:30
Show Gist options
  • Save dasch/f198c76b3420c4fc5e961399e957dd2b to your computer and use it in GitHub Desktop.
Save dasch/f198c76b3420c4fc5e961399e957dd2b to your computer and use it in GitHub Desktop.
module Kafka exposing (..)
import Stream exposing (Source, Sink)
source : String -> Source String String
sink : String -> Sink String String
module SearchClickThroughs exposing (main)
import Stream
import SessionWindow
import Kafka
import SearchClickThroughs.Decode exposing (decodeSearchEvent, decodeClickEvent)
import SearchClickThroughs.Encode exposing (encodeClickThrough)
type alias SearchId = String
type alias SearchEvent =
{ searchId : String
, query : String
, time : Time
}
type alias ClickEvent =
{ searchId : String
, resultId : String
, time : Time
}
type alias SearchClickThrough =
{ searchId : String
, query : String
, clicks : List Click
}
type alias Click =
{ resultId : String
, time : Time
}
searchEvents : Stream SearchId SearchEvent
searchEvents =
Stream.readFrom (Kafka.source "searches")
|> Stream.removeDuplicateKeys
|> Stream.map decodeSearchEvent
|> Stream.keyBy .searchId
|> Stream.assignTimestampsWith .time
clickEvents : Stream SearchId (List ClickEvent)
clickEvents =
Stream.readFrom (Kafka.source "clicks")
|> Stream.removeDuplicateKeys
|> Stream.map decodeClickEvent
|> Stream.assignTimestampsWith .time
|> Stream.keyBy .searchId
|> Stream.groupByKey
window =
SessionWindow.window
|> SessionWindow.withGapDuration (10 * minute)
toClickThrough : (SearchEvent, List ClickEvent) -> SearchClickThrough
toClickThrough (searchEvent, clickEvents) =
let
toClick : ClickEvent -> Click
toClick clickEvent =
{ resultId = clickEvent.resultId
, time = clickEvent.time
}
in
{ searchId = searchEvent.searchId
, query = searchEvent.query
, clicks = List.map toclick clickEvents
}
main =
Stream.join searchEvents clickEvents
|> Stream.withWindow window
|> Stream.map toClickThrough
|> Stream.map encodeClickThrough
|> Stream.write (Kafka.sink "search-clickthroughs")
module Stream exposing (..)
map : (v -> v') -> Stream k v -> Stream k v'
mapKey : (k -> k') -> Stream k v -> Stream k' v
mapWithKey : ((k, v) -> (k', v')) -> Stream k v -> Stream k' v'
keyBy : (v -> k') -> Stream k v -> Stream k' v
reduce : (v -> v -> v) -> Stream k v -> Stream k v
join : Stream k v -> Stream k v' -> Stream k (v, v')
assignTimestampsWith : (v -> Time) -> Stream k v -> Stream k v
withWindow : Window -> Stream k v -> Stream k v
removeDuplicateKeys : Stream k v -> Stream k v
removeDuplicateValues : Stream k v -> Stream k v
readFrom : Source k v -> Stream k v
writeTo : Sink k v -> Stream k v -> Pipeline
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment