Skip to content

Instantly share code, notes, and snippets.

@remeniuk
Created August 10, 2012 11:52
Show Gist options
  • Save remeniuk/3313758 to your computer and use it in GitHub Desktop.
Save remeniuk/3313758 to your computer and use it in GitHub Desktop.
/**
* K-means clustering algorithm requires the input to be represented as vectors.
* In out case, the vector, itself, represents the player, where other users, the player has played with, are
* vector axises/features (the weigh of the feature is a number of games, played together)
* User: remeniuk
*/
class VectorBuilder(args: Args) extends Job(args) {
import Dictionary._
// initializes dictionary pipe
val dictionary = TextLine(args("dictionary"))
.read
.map(('offset -> 'line) -> ('userId -> 'dictionaryIdx)) {
tuple: (Int, String) =>
(tuple._2 -> tuple._1 / 5)
}
.project(('userId -> 'dictionaryIdx))
val input = new HBaseSource("hand", args("hbasehost"), 'handId, Array("d"), Array('blob))
val output = WritableSequenceFile(args("output"), classOf[Text], classOf[VectorWritable],
'player1Id -> 'vector)
input
.read
.flatMap('blob -> ('player1Id -> 'player2Id)) {
// builds a flat list of pairs of users that player together
blob: Array[Byte] =>
val playerList = HandsHistoryCoreInternalDomain.HandHistory.parseFrom(blob).getPlayerList.map(userId)
playerList.flatMap {
playerId =>
playerList.filterNot(_ == playerId).map(otherPlayerId => (playerId -> otherPlayerId.toString))
}
}
.joinWithSmaller('player2Id -> 'userId, dictionary) // joins the list of pairs of user that played together with
// the dictionary, so that the second member of the tuple (ID of the second player) is replaced with th index
//in the dictionary
.groupBy('player1Id -> 'dictionaryIdx) {
group => group.size // groups pairs of players, played together, counting the number of hands
}
.map(('player1Id, 'dictionaryIdx, 'size) ->('playerId, 'partialVector)) {
tuple: (String, Int, Int) =>
val partialVector = new NamedVector(
new SequentialAccessSparseVector(args("dictionarySize").toInt), tuple._1) // turns a tuple of two users
// into a vector with one feature
partialVector.set(tuple._2, tuple._3)
(new Text(tuple._1), new VectorWritable(partialVector))
}
.groupBy('player1Id) {
// combines partial vectors into one vector that represents the number of hands, played with other players
group => group.reduce('partialVector -> 'vector) {
(left: VectorWritable, right: VectorWritable) =>
new VectorWritable(left.get.plus(right.get))
}
}
.write(output)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment