Last active
January 29, 2017 03:30
-
-
Save dasch/f198c76b3420c4fc5e961399e957dd2b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Kafka exposing (..) | |
import Stream exposing (Source, Sink) | |
source : String -> Source String String | |
sink : String -> Sink String String |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module SearchClickThroughs exposing (main) | |
import Stream | |
import SessionWindow | |
import Kafka | |
import SearchClickThroughs.Decode exposing (decodeSearchEvent, decodeClickEvent) | |
import SearchClickThroughs.Encode exposing (encodeClickThrough) | |
type alias SearchId = String | |
type alias SearchEvent = | |
{ searchId : String | |
, query : String | |
, time : Time | |
} | |
type alias ClickEvent = | |
{ searchId : String | |
, resultId : String | |
, time : Time | |
} | |
type alias SearchClickThrough = | |
{ searchId : String | |
, query : String | |
, clicks : List Click | |
} | |
type alias Click = | |
{ resultId : String | |
, time : Time | |
} | |
searchEvents : Stream SearchId SearchEvent | |
searchEvents = | |
Stream.readFrom (Kafka.source "searches") | |
|> Stream.removeDuplicateKeys | |
|> Stream.map decodeSearchEvent | |
|> Stream.keyBy .searchId | |
|> Stream.assignTimestampsWith .time | |
clickEvents : Stream SearchId (List ClickEvent) | |
clickEvents = | |
Stream.readFrom (Kafka.source "clicks") | |
|> Stream.removeDuplicateKeys | |
|> Stream.map decodeClickEvent | |
|> Stream.assignTimestampsWith .time | |
|> Stream.keyBy .searchId | |
|> Stream.groupByKey | |
window = | |
SessionWindow.window | |
|> SessionWindow.withGapDuration (10 * minute) | |
toClickThrough : (SearchEvent, List ClickEvent) -> SearchClickThrough | |
toClickThrough (searchEvent, clickEvents) = | |
let | |
toClick : ClickEvent -> Click | |
toClick clickEvent = | |
{ resultId = clickEvent.resultId | |
, time = clickEvent.time | |
} | |
in | |
{ searchId = searchEvent.searchId | |
, query = searchEvent.query | |
, clicks = List.map toclick clickEvents | |
} | |
main = | |
Stream.join searchEvents clickEvents | |
|> Stream.withWindow window | |
|> Stream.map toClickThrough | |
|> Stream.map encodeClickThrough | |
|> Stream.write (Kafka.sink "search-clickthroughs") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Stream exposing (..) | |
map : (v -> v') -> Stream k v -> Stream k v' | |
mapKey : (k -> k') -> Stream k v -> Stream k' v | |
mapWithKey : ((k, v) -> (k', v')) -> Stream k v -> Stream k' v' | |
keyBy : (v -> k') -> Stream k v -> Stream k' v | |
reduce : (v -> v -> v) -> Stream k v -> Stream k v | |
join : Stream k v -> Stream k v' -> Stream k (v, v') | |
assignTimestampsWith : (v -> Time) -> Stream k v -> Stream k v | |
withWindow : Window -> Stream k v -> Stream k v | |
removeDuplicateKeys : Stream k v -> Stream k v | |
removeDuplicateValues : Stream k v -> Stream k v | |
readFrom : Source k v -> Stream k v | |
writeTo : Sink k v -> Stream k v -> Pipeline |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment