Last active
August 29, 2015 14:07
-
-
Save carl297r/25ca308c27b6ed5eeba7 to your computer and use it in GitHub Desktop.
Transfrom Map[K, Stream[R]] into Stream[Map[K,R]]. Then operate over the new combined Stream to produce a Stream of sum products.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Here is my dummy row data for a single commodity. Only has 2 things, price and holding but yours will have more like date. | |
| // I left out date because it was needed to calc sum product but not hard to include. | |
| case class Row(price: Double, holding: Double) | |
| // This function is generates my stream of dummy row prices and holdings | |
| def RowStream(start: Row): Stream[Row] = start #:: RowStream(Row(start.price+1, start.holding + 2)) | |
| // Here are my streams for NG, CL & AG | |
| val ng = RowStream(Row(1,1)) | |
| val cl = RowStream(Row(100,10)) | |
| val ag = RowStream(Row(30,5)) | |
| // That was just setting some dummy date | |
| // Let's start with an example on a List[Stream[Row]] which is a bit simpler than a Map[Key, Stream[Row]] | |
| // Here is a function that takes the List[Stream[Row]] and converts it to a single Stream[List[Row]] | |
| def mmerge(ss: List[Stream[Row]]): Stream[List[Row]] = { | |
| ss.foldRight(Nil: List[Row])((sr: Stream[Row], b: List[Row]) => sr.head :: b) #:: | |
| mmerge(ss map (_.tail)) | |
| } | |
| def mmergeL(ss: List[Stream[Row]]): Stream[List[Row]] = { | |
| ss.reverse.foldLeft(Nil: List[Row])((b: List[Row], sr: Stream[Row]) => sr.head :: b) #:: | |
| mmerge(ss map (_.tail)) | |
| } | |
| (mmerge(List(ng, cl, ag)) take 10) == (mmergeL(List(ng, cl, ag)) take 10) | |
| // So let's use the function on a List of our Stream of Row | |
| val mm = mmerge(List(ng, cl, ag)) | |
| // This stream still only has the head evaluated | |
| println(mm) | |
| // Here are the first 3 items | |
| mm take 3 foreach println | |
| // Now the stream has 3 items evaluated | |
| println(mm) | |
| // Now we can produce a Stream ofthe sum product of price and holding | |
| val sumProdStream: Stream[Double] = mmerge(List(ng, cl, ag)).map(lr => lr.map(r => r.price * r. holding).reduce(_ + _) ) | |
| // This stream still only has the head evaluated | |
| println(sumProdStream) | |
| // Here are the first 10 items | |
| sumProdStream take 10 foreach println | |
| // Now the stream has 10 items evaluated | |
| println(sumProdStream) | |
| // Btw, you can filter the stream to get a values > 10000. You would do this on date. | |
| println(sumProdStream filter (_ > 10000)) | |
| // Now the values up tp the 10000 have been evaluated but nothing beyond | |
| println(sumProdStream) | |
| // | |
| // But you need to do this on a Map[Key, Stream[Row]] | |
| type Key = String | |
| val coms: Map[Key, Stream[Row]] = Map("NG" -> ng, "CL" -> cl, "AG" -> ag) | |
| // Here is the function that takes the Map[Key, Stream[Row]] and turns it in to a Stream[Map[Key, Row]] | |
| def mapStreamTrans(ms: Map[Key, Stream[Row]]): Stream[Map[Key, Row]] = { | |
| ms.foldRight(Map.empty[Key, Row])((op: (Key, Stream[Row]), b: Map[Key, Row]) => b + (op._1 -> op._2.head)) #:: | |
| mapStreamTrans(ms map ((op: (Key, Stream[Row])) => op._1 -> op._2.tail)) | |
| } | |
| def mapStreamTransL(ms: Map[Key, Stream[Row]]): Stream[Map[Key, Row]] = { | |
| ms.foldLeft(Map.empty[Key, Row])((b: Map[Key, Row], op: (Key, Stream[Row])) => b + (op._1 -> op._2.head)) #:: | |
| mapStreamTrans(ms map ((op: (Key, Stream[Row])) => op._1 -> op._2.tail)) | |
| } | |
| (mapStreamTrans(coms) take 10) == (mapStreamTransL(coms) take 10) | |
| // Run it on the Map we created | |
| val mst = mapStreamTrans(coms) | |
| // Only the first entry in the stream is calculated. It has a map from the Key to a that days Row data. | |
| println(mst) | |
| // Again we can take 3 | |
| mst take 3 foreach println | |
| // Now 3 have been evaluated | |
| println(mst) | |
| // Lets create a stream of sum products on the Stream[Map[Key, Row]] => Stream[Double] | |
| val sumProds: Stream[Double] = mapStreamTrans(coms).map(_.map(kr => kr._2.price * kr._2.holding).reduce(_ + _)) | |
| // Same values as the list case. | |
| println(sumProds) | |
| sumProds take 10 foreach println | |
| println(sumProds) | |
| // Instead of doing the sum product in 1 step you could do the product first | |
| // and produce a Stream[Map, Double]. Then sum to a Stream[Double] | |
| // You could also easily drag some other data along, like the date so you don't just end up with a Stream[Double] | |
| // but a Stream[some case class] | |
| //We can also combine the mapStreamTrans and the calc of sumProduct into 1 operation | |
| //Probably need to perameterize the mapStreamTrans | |
| def transMapStream[K, R](ms: Map[K, Stream[R]]): Stream[Map[K, R]] = { | |
| ms.foldRight(Map.empty[K, R])((op:(K, Stream[R]), b: Map[K, R]) => b + (op._1 -> op._2.head)) #:: | |
| transMapStream(ms map ((op: (K, Stream[R])) => op._1 -> op._2.tail)) | |
| } | |
| // using for may be easier to understandd the sumproduct | |
| def sumProduct(mkr: Stream[Map[Key, Row]]): Stream[Double] = | |
| for { | |
| kr <- mkr | |
| product = for { | |
| (key, row) <- kr | |
| } yield row.price * row.holding | |
| } yield product.reduce(_ + _) | |
| sumProduct(transMapStream(coms)) take 20 foreach println | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment