Created
August 10, 2012 11:52
-
-
Save remeniuk/3313758 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* K-means clustering algorithm requires the input to be represented as vectors. | |
* In out case, the vector, itself, represents the player, where other users, the player has played with, are | |
* vector axises/features (the weigh of the feature is a number of games, played together) | |
* User: remeniuk | |
*/ | |
class VectorBuilder(args: Args) extends Job(args) { | |
import Dictionary._ | |
// initializes dictionary pipe | |
val dictionary = TextLine(args("dictionary")) | |
.read | |
.map(('offset -> 'line) -> ('userId -> 'dictionaryIdx)) { | |
tuple: (Int, String) => | |
(tuple._2 -> tuple._1 / 5) | |
} | |
.project(('userId -> 'dictionaryIdx)) | |
val input = new HBaseSource("hand", args("hbasehost"), 'handId, Array("d"), Array('blob)) | |
val output = WritableSequenceFile(args("output"), classOf[Text], classOf[VectorWritable], | |
'player1Id -> 'vector) | |
input | |
.read | |
.flatMap('blob -> ('player1Id -> 'player2Id)) { | |
// builds a flat list of pairs of users that player together | |
blob: Array[Byte] => | |
val playerList = HandsHistoryCoreInternalDomain.HandHistory.parseFrom(blob).getPlayerList.map(userId) | |
playerList.flatMap { | |
playerId => | |
playerList.filterNot(_ == playerId).map(otherPlayerId => (playerId -> otherPlayerId.toString)) | |
} | |
} | |
.joinWithSmaller('player2Id -> 'userId, dictionary) // joins the list of pairs of user that played together with | |
// the dictionary, so that the second member of the tuple (ID of the second player) is replaced with th index | |
//in the dictionary | |
.groupBy('player1Id -> 'dictionaryIdx) { | |
group => group.size // groups pairs of players, played together, counting the number of hands | |
} | |
.map(('player1Id, 'dictionaryIdx, 'size) ->('playerId, 'partialVector)) { | |
tuple: (String, Int, Int) => | |
val partialVector = new NamedVector( | |
new SequentialAccessSparseVector(args("dictionarySize").toInt), tuple._1) // turns a tuple of two users | |
// into a vector with one feature | |
partialVector.set(tuple._2, tuple._3) | |
(new Text(tuple._1), new VectorWritable(partialVector)) | |
} | |
.groupBy('player1Id) { | |
// combines partial vectors into one vector that represents the number of hands, played with other players | |
group => group.reduce('partialVector -> 'vector) { | |
(left: VectorWritable, right: VectorWritable) => | |
new VectorWritable(left.get.plus(right.get)) | |
} | |
} | |
.write(output) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment