Skip to content

Instantly share code, notes, and snippets.

@carl297r
Last active August 29, 2015 14:07
Show Gist options
  • Select an option

  • Save carl297r/25ca308c27b6ed5eeba7 to your computer and use it in GitHub Desktop.

Select an option

Save carl297r/25ca308c27b6ed5eeba7 to your computer and use it in GitHub Desktop.
Transfrom Map[K, Stream[R]] into Stream[Map[K,R]]. Then operate over the new combined Stream to produce a Stream of sum products.
// Here is my dummy row data for a single commodity. Only has 2 things, price and holding but yours will have more like date.
// I left out date because it was needed to calc sum product but not hard to include.
case class Row(price: Double, holding: Double)
// This function is generates my stream of dummy row prices and holdings
def RowStream(start: Row): Stream[Row] = start #:: RowStream(Row(start.price+1, start.holding + 2))
// Here are my streams for NG, CL & AG
val ng = RowStream(Row(1,1))
val cl = RowStream(Row(100,10))
val ag = RowStream(Row(30,5))
// That was just setting some dummy date
// Let's start with an example on a List[Stream[Row]] which is a bit simpler than a Map[Key, Stream[Row]]
// Here is a function that takes the List[Stream[Row]] and converts it to a single Stream[List[Row]]
def mmerge(ss: List[Stream[Row]]): Stream[List[Row]] = {
ss.foldRight(Nil: List[Row])((sr: Stream[Row], b: List[Row]) => sr.head :: b) #::
mmerge(ss map (_.tail))
}
def mmergeL(ss: List[Stream[Row]]): Stream[List[Row]] = {
ss.reverse.foldLeft(Nil: List[Row])((b: List[Row], sr: Stream[Row]) => sr.head :: b) #::
mmerge(ss map (_.tail))
}
(mmerge(List(ng, cl, ag)) take 10) == (mmergeL(List(ng, cl, ag)) take 10)
// So let's use the function on a List of our Stream of Row
val mm = mmerge(List(ng, cl, ag))
// This stream still only has the head evaluated
println(mm)
// Here are the first 3 items
mm take 3 foreach println
// Now the stream has 3 items evaluated
println(mm)
// Now we can produce a Stream ofthe sum product of price and holding
val sumProdStream: Stream[Double] = mmerge(List(ng, cl, ag)).map(lr => lr.map(r => r.price * r. holding).reduce(_ + _) )
// This stream still only has the head evaluated
println(sumProdStream)
// Here are the first 10 items
sumProdStream take 10 foreach println
// Now the stream has 10 items evaluated
println(sumProdStream)
// Btw, you can filter the stream to get a values > 10000. You would do this on date.
println(sumProdStream filter (_ > 10000))
// Now the values up tp the 10000 have been evaluated but nothing beyond
println(sumProdStream)
//
// But you need to do this on a Map[Key, Stream[Row]]
type Key = String
val coms: Map[Key, Stream[Row]] = Map("NG" -> ng, "CL" -> cl, "AG" -> ag)
// Here is the function that takes the Map[Key, Stream[Row]] and turns it in to a Stream[Map[Key, Row]]
def mapStreamTrans(ms: Map[Key, Stream[Row]]): Stream[Map[Key, Row]] = {
ms.foldRight(Map.empty[Key, Row])((op: (Key, Stream[Row]), b: Map[Key, Row]) => b + (op._1 -> op._2.head)) #::
mapStreamTrans(ms map ((op: (Key, Stream[Row])) => op._1 -> op._2.tail))
}
def mapStreamTransL(ms: Map[Key, Stream[Row]]): Stream[Map[Key, Row]] = {
ms.foldLeft(Map.empty[Key, Row])((b: Map[Key, Row], op: (Key, Stream[Row])) => b + (op._1 -> op._2.head)) #::
mapStreamTrans(ms map ((op: (Key, Stream[Row])) => op._1 -> op._2.tail))
}
(mapStreamTrans(coms) take 10) == (mapStreamTransL(coms) take 10)
// Run it on the Map we created
val mst = mapStreamTrans(coms)
// Only the first entry in the stream is calculated. It has a map from the Key to a that days Row data.
println(mst)
// Again we can take 3
mst take 3 foreach println
// Now 3 have been evaluated
println(mst)
// Lets create a stream of sum products on the Stream[Map[Key, Row]] => Stream[Double]
val sumProds: Stream[Double] = mapStreamTrans(coms).map(_.map(kr => kr._2.price * kr._2.holding).reduce(_ + _))
// Same values as the list case.
println(sumProds)
sumProds take 10 foreach println
println(sumProds)
// Instead of doing the sum product in 1 step you could do the product first
// and produce a Stream[Map, Double]. Then sum to a Stream[Double]
// You could also easily drag some other data along, like the date so you don't just end up with a Stream[Double]
// but a Stream[some case class]
//We can also combine the mapStreamTrans and the calc of sumProduct into 1 operation
//Probably need to perameterize the mapStreamTrans
def transMapStream[K, R](ms: Map[K, Stream[R]]): Stream[Map[K, R]] = {
ms.foldRight(Map.empty[K, R])((op:(K, Stream[R]), b: Map[K, R]) => b + (op._1 -> op._2.head)) #::
transMapStream(ms map ((op: (K, Stream[R])) => op._1 -> op._2.tail))
}
// using for may be easier to understandd the sumproduct
def sumProduct(mkr: Stream[Map[Key, Row]]): Stream[Double] =
for {
kr <- mkr
product = for {
(key, row) <- kr
} yield row.price * row.holding
} yield product.reduce(_ + _)
sumProduct(transMapStream(coms)) take 20 foreach println
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment