Created
August 10, 2012 11:00
-
-
Save remeniuk/3313403 to your computer and use it in GitHub Desktop.
Poker Collusion Detector
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val conf = new Configuration | |
conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization," | |
+ "org.apache.hadoop.io.serializer.WritableSerialization") | |
// the path, where the vectors will be stored to | |
val vectorsPath = new Path("job/vectors") | |
// enumeration of all users involved in a selected subset of hand history records | |
val dictionaryPath = new Path("job/dictionary") | |
// text file with the dictionary size | |
val dictionarySizePath = new Path("job/dictionary-size") | |
// indexed dictionary (every user ID in the dictionary is mapped to an index, from 0) | |
val indexedDictionaryPath = new Path("job/indexed-dictionary") | |
// textual dump of clusterization results | |
println("Building dictionary...") | |
// extracts IDs of all the users, participating in selected subset of hand history records | |
Tool.main(Array(classOf[Dictionary.Builder].getName, "--hdfs", | |
"--hbasehost", "localhost", "--output", dictionaryPath.toString)) | |
// adds index to the dictionary | |
Tool.main(Array(classOf[Dictionary.Indexer].getName, "--hdfs", | |
"--input", dictionaryPath.toString, "--output", indexedDictionaryPath.toString)) | |
// calculates dictionary size, and stores it to the FS | |
Tool.main(Array(classOf[Dictionary.Size].getName, "--hdfs", | |
"--input", dictionaryPath.toString, "--output", dictionarySizePath.toString)) | |
// reads dictionary size | |
val fs = FileSystem.get(dictionaryPath.toUri, conf) | |
val dictionarySize = new BufferedReader( | |
new InputStreamReader( | |
fs.open(new Path(dictionarySizePath, "part-00000")) | |
)).readLine().toInt | |
println("Vectorizing...") | |
// builds vectors (player -> other players in the game) | |
// IDs of other players (in the vectors) are replaces with indices, taken from dictionary | |
Tool.main(Array(classOf[VectorBuilder].getName, "--hdfs", | |
"--dictionary", dictionaryPath.toString, "--hbasehost", "localhost", | |
"--output", vectorsPath.toString, "--dictionarySize", dictionarySize.toString)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// clusterization results | |
val outputClustersPath = new Path("job/output-clusters") | |
// textual dump of clusterization results | |
val dumpPath = "job/dump" | |
println("Running K-means...") | |
// runs K-means algorithm with up to 20 iterations, to find clusters of colluding players (assumption of collusion is | |
// made on the basis of number hand player together with other player[s]) | |
KMeansDriver.run(conf, vectorsPath, inputClustersPath, outputClustersPath, | |
new CosineDistanceMeasure(), 0.01, 20, true, 0, false) | |
println("Printing results...") | |
// dumps clusters to a text file | |
val clusterizationResult = finalClusterPath(conf, outputClustersPath, 20) | |
val clusteredPoints = new Path(outputClustersPath, "clusteredPoints") | |
val clusterDumper = new ClusterDumper(clusterizationResult, clusteredPoints) | |
clusterDumper.setNumTopFeatures(10) | |
clusterDumper.setOutputFile(dumpPath) | |
clusterDumper.setTermDictionary(new Path(indexedDictionaryPath, "part-00000").toString, | |
"sequencefile") | |
clusterDumper.printClusters(null) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// randomly selected cluster the will be passed as an input to K-means | |
val inputClustersPath = new Path("job/input-clusters") | |
val distanceMeasure = new EuclideanDistanceMeasure | |
println("Making random seeds...") | |
// build 30 initial random clusters/centroids | |
RandomSeedGenerator.buildRandom(conf, vectorsPath, inputClustersPath, 30, distanceMeasure) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// extract user ID from hand history record | |
val userId = (playerHistory: PlayerHandHistory) => | |
new Text(playerHistory.getPokerPlayer.getPlayerInfo.getUserId.toString) | |
// Builds basic dixtionary (enumeration, in fact) of all the players, participated in the selected subset of hand | |
// history records | |
class Builder(args: Args) extends Job(args) { | |
// input tap is an HTable with hand history entries: hand history id -> hand history record, serialized with ProtoBuf | |
val input = new HBaseSource("hand", args("hbasehost"), 'handId, Array("d"), Array('blob)) | |
// output tap - plain text file with player IDs | |
val output = TextLine(args("output")) | |
input | |
.read | |
.flatMap('blob -> 'player) { | |
// every hand history record contains the list of players, participated in the hand | |
blob: Array[Byte] => // at the first stage, we simply extract the list of IDs, and add it to the flat list | |
HandsHistoryCoreInternalDomain.HandHistory.parseFrom(blob).getPlayerList.map(userId) | |
} | |
.unique('player) // remove duplicate user IDs | |
.project('player) // leave only 'player column from the tuple | |
.write(output) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Indexer(args: Args) extends Job(args) { | |
val output = WritableSequenceFile(args("output"), classOf[Text], classOf[IntWritable], | |
'userId -> 'idx) | |
TextLine(args("input")).read | |
.map(('offset -> 'line) -> ('userId -> 'idx)) { | |
// dictionary lines are read with indices from TextLine source | |
// out of the box. For some reason, in my case, indices were multiplied by 5, so I have had to divide them | |
tuple: (Int, String) => (new Text(tuple._2.toString) -> new IntWritable((tuple._1 / 5))) | |
} | |
.project(('userId -> 'idx)) // only userId -> index tuple is passed to the output | |
.write(output) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Size(args: Args) extends Job(args) { | |
TextLine(args("input")) | |
.read | |
.groupAll(_.size) | |
.project('size) | |
.write(TextLine(args("output"))) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// extract user ID from hand history record | |
val userId = (playerHistory: PlayerHandHistory) => | |
new Text(playerHistory.getPokerPlayer.getPlayerInfo.getUserId.toString) | |
// Builds basic dixtionary (enumeration, in fact) of all the players, participated in the selected subset of hand | |
// history records | |
class Builder(args: Args) extends Job(args) { | |
// input tap is an HTable with hand history entries: hand history id -> hand history record, serialized with ProtoBuf | |
val input = new HBaseSource("hand", args("hbasehost"), 'handId, Array("d"), Array('blob)) | |
// output tap - plain text file with player IDs | |
val output = TextLine(args("output")) | |
input | |
.read | |
.flatMap('blob -> 'player) { | |
// every hand history record contains the list of players, participated in the hand | |
blob: Array[Byte] => // at the first stage, we simply extract the list of IDs, and add it to the flat list | |
HandsHistoryCoreInternalDomain.HandHistory.parseFrom(blob).getPlayerList.map(userId) | |
} | |
.unique('player) // remove duplicate user IDs | |
.project('player) // leave only 'player column from the tuple | |
.write(output) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Indexer(args: Args) extends Job(args) { | |
val output = WritableSequenceFile(args("output"), classOf[Text], classOf[IntWritable], | |
'userId -> 'idx) | |
TextLine(args("input")).read | |
.map(('offset -> 'line) -> ('userId -> 'idx)) { | |
// dictionary lines are read with indices from TextLine source | |
// out of the box. For some reason, in my case, indices were multiplied by 5, so I have had to divide them | |
tuple: (Int, String) => (new Text(tuple._2.toString) -> new IntWritable((tuple._1 / 5))) | |
} | |
.project(('userId -> 'idx)) // only userId -> index tuple is passed to the output | |
.write(output) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Size(args: Args) extends Job(args) { | |
TextLine(args("input")) | |
.read | |
.groupAll(_.size) | |
.project('size) | |
.write(TextLine(args("output"))) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* K-means clustering algorithm requires the input to be represented as vectors. | |
* In out case, the vector, itself, represents the player, where other users, the player has played with, are | |
* vector axises/features (the weigh of the feature is a number of games, played together) | |
* User: remeniuk | |
*/ | |
class VectorBuilder(args: Args) extends Job(args) { | |
import Dictionary._ | |
// initializes dictionary pipe | |
val dictionary = TextLine(args("dictionary")) | |
.read | |
.map(('offset -> 'line) -> ('userId -> 'dictionaryIdx)) { | |
tuple: (Int, String) => | |
(tuple._2 -> tuple._1 / 5) | |
} | |
.project(('userId -> 'dictionaryIdx)) | |
val input = new HBaseSource("hand", args("hbasehost"), 'handId, Array("d"), Array('blob)) | |
val output = WritableSequenceFile(args("output"), classOf[Text], classOf[VectorWritable], | |
'player1Id -> 'vector) | |
input | |
.read | |
.flatMap('blob -> ('player1Id -> 'player2Id)) { | |
// builds a flat list of pairs of users that player together | |
blob: Array[Byte] => | |
val playerList = HandsHistoryCoreInternalDomain.HandHistory.parseFrom(blob).getPlayerList.map(userId) | |
playerList.flatMap { | |
playerId => | |
playerList.filterNot(_ == playerId).map(otherPlayerId => (playerId -> otherPlayerId.toString)) | |
} | |
} | |
.joinWithSmaller('player2Id -> 'userId, dictionary) // joins the list of pairs of user that played together with | |
// the dictionary, so that the second member of the tuple (ID of the second player) is replaced with th index | |
//in the dictionary | |
.groupBy('player1Id -> 'dictionaryIdx) { | |
group => group.size // groups pairs of players, played together, counting the number of hands | |
} | |
.map(('player1Id, 'dictionaryIdx, 'size) ->('playerId, 'partialVector)) { | |
tuple: (String, Int, Int) => | |
val partialVector = new NamedVector( | |
new SequentialAccessSparseVector(args("dictionarySize").toInt), tuple._1) // turns a tuple of two users | |
// into a vector with one feature | |
partialVector.set(tuple._2, tuple._3) | |
(new Text(tuple._1), new VectorWritable(partialVector)) | |
} | |
.groupBy('player1Id) { | |
// combines partial vectors into one vector that represents the number of hands, played with other players | |
group => group.reduce('partialVector -> 'vector) { | |
(left: VectorWritable, right: VectorWritable) => | |
new VectorWritable(left.get.plus(right.get)) | |
} | |
} | |
.write(output) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment