Skip to content

Instantly share code, notes, and snippets.

@everpeace
Created March 15, 2012 15:13
Show Gist options
  • Save everpeace/2044725 to your computer and use it in GitHub Desktop.
Save everpeace/2044725 to your computer and use it in GitHub Desktop.
Matrix Multiplication in Scalding.
1 2 3
1 2 3
1 2 3
1 2 3 4
1 2 3 4
1 2 3 4
24.0 18.0 12.0 6.0
24.0 18.0 12.0 6.0
24.0 18.0 12.0 6.0
1. build scalding (https://github.com/twitter/scalding/blob/master/README.md)
2. copy A.txt, B.txt and MatrixMult.scala to some directory.
3. cd the direcotry
4. $scald.rb --local MatrixMult.scala
5. less C.txt
1. build scalding (https://github.com/twitter/scalding/blob/master/README.md)
2. copy A.txt, B.txt and MatrixMult.scala to some directory.
3. cd the direcotry
4. $scald.rb --local MatrixMult.scala
5. less C.txt
import cascading.pipe.Pipe
import com.twitter.scalding._
/**
* Matrix Multiplication in Scalding.
*
* @author Shingo Omura (everpeace _at_ gmail _dot_ com)
*/
class MatrixMult(args: Args) extends Job(args) {
val As = TextLine("A.txt")
val Bs = TextLine("B.txt")
val Cs = TextLine("C.txt")
// A is I x J matrix: each row of A is (i,j,a_{i,j})
val (rowsA,colsA,_A) = decomposeMatrix(As)
val A = _A.rename(('row,'column,'val)->('ia,'ja,'a_ij))
// B is J x K matrix: each row of B is (j,k,b_{j,k})
val (rowsB,colsB,_B) = decomposeMatrix(Bs)
val B = _B.rename(('row,'column,'val)->('jb,'kb,'b_jk))
// join with 'ja == 'jb
// and calculate c_{i,k}= a_{i,j} * b_{j,k}
val C = A.joinWithSmaller( 'ja -> 'jb, B)
.map(('a_ij, 'b_jk) -> 'ab) { ab : (Double, Double) => ab._1 * ab._2 }
.project('ia, 'kb, 'ab)
.groupBy('ia,'kb) { _.sum('ab) }
.rename(('ia,'kb,'ab) -> ('ic,'kc,'cik))
composeMatrix(C.rename(('ic,'kc,'cik)->('row,'column,'val)))
.write(Cs)
// make matrix data
// [0 0 a00]
// ...
// [i j aij]
// ...
def decomposeMatrix(s:TextLine)
= {
// decompose [ai0 ai1 ...] to [i 0 ai0]
// [i 1 ai1]
// ...
val mat = s.read.rename('num -> 'row)
.flatMap('line -> ('column,'val)){ s:String => {
val cols = s.split("\\s")
val index = 0 until cols.length
index.zip(cols)
}}.project(('row,'column,'val))
// calculate the number of rows
val rows = mat.project('row).groupAll{_.max('row)}
.map('row->'row){rowMax:Int => rowMax+1}.rename('row->'rows)
// calculate the number of columns
val cols = mat.project('column).groupAll{_.max('column)}
.map('column->'column){colMax:Int => colMax+1}.rename('column->'cols)
(rows,cols,mat)
}
// format matrix data for output
def composeMatrix(p:Pipe)
= p.groupBy('row){_.sortBy('column).toList[String]('val -> '_rows)}
.map('_rows -> 'rows){row:List[Double] => row.mkString(" ")}
.groupAll{_.sortBy('row)}.project('rows)
}
@pomadchin
Copy link

I'm interested in converting matrix from native for scalding matrix format, to common format; what i mean:

1 1 1
1 3 8
2 1 4
2 2 5
2 3 6
3 2 8
3 3 9

to

1 0 8
4 5 6
0 8 9

as u see, I have some problems with zero lines, which r skipped. Have u got any ideas?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment