Skip to content

Instantly share code, notes, and snippets.

@remeniuk
Created August 10, 2012 11:00
Show Gist options
  • Save remeniuk/3313403 to your computer and use it in GitHub Desktop.
Save remeniuk/3313403 to your computer and use it in GitHub Desktop.
Poker Collusion Detector
val conf = new Configuration
conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+ "org.apache.hadoop.io.serializer.WritableSerialization")
// the path, where the vectors will be stored to
val vectorsPath = new Path("job/vectors")
// enumeration of all users involved in a selected subset of hand history records
val dictionaryPath = new Path("job/dictionary")
// text file with the dictionary size
val dictionarySizePath = new Path("job/dictionary-size")
// indexed dictionary (every user ID in the dictionary is mapped to an index, from 0)
val indexedDictionaryPath = new Path("job/indexed-dictionary")
// textual dump of clusterization results
println("Building dictionary...")
// extracts IDs of all the users, participating in selected subset of hand history records
Tool.main(Array(classOf[Dictionary.Builder].getName, "--hdfs",
"--hbasehost", "localhost", "--output", dictionaryPath.toString))
// adds index to the dictionary
Tool.main(Array(classOf[Dictionary.Indexer].getName, "--hdfs",
"--input", dictionaryPath.toString, "--output", indexedDictionaryPath.toString))
// calculates dictionary size, and stores it to the FS
Tool.main(Array(classOf[Dictionary.Size].getName, "--hdfs",
"--input", dictionaryPath.toString, "--output", dictionarySizePath.toString))
// reads dictionary size
val fs = FileSystem.get(dictionaryPath.toUri, conf)
val dictionarySize = new BufferedReader(
new InputStreamReader(
fs.open(new Path(dictionarySizePath, "part-00000"))
)).readLine().toInt
println("Vectorizing...")
// builds vectors (player -> other players in the game)
// IDs of other players (in the vectors) are replaces with indices, taken from dictionary
Tool.main(Array(classOf[VectorBuilder].getName, "--hdfs",
"--dictionary", dictionaryPath.toString, "--hbasehost", "localhost",
"--output", vectorsPath.toString, "--dictionarySize", dictionarySize.toString))
// clusterization results
val outputClustersPath = new Path("job/output-clusters")
// textual dump of clusterization results
val dumpPath = "job/dump"
println("Running K-means...")
// runs K-means algorithm with up to 20 iterations, to find clusters of colluding players (assumption of collusion is
// made on the basis of number hand player together with other player[s])
KMeansDriver.run(conf, vectorsPath, inputClustersPath, outputClustersPath,
new CosineDistanceMeasure(), 0.01, 20, true, 0, false)
println("Printing results...")
// dumps clusters to a text file
val clusterizationResult = finalClusterPath(conf, outputClustersPath, 20)
val clusteredPoints = new Path(outputClustersPath, "clusteredPoints")
val clusterDumper = new ClusterDumper(clusterizationResult, clusteredPoints)
clusterDumper.setNumTopFeatures(10)
clusterDumper.setOutputFile(dumpPath)
clusterDumper.setTermDictionary(new Path(indexedDictionaryPath, "part-00000").toString,
"sequencefile")
clusterDumper.printClusters(null)
// randomly selected cluster the will be passed as an input to K-means
val inputClustersPath = new Path("job/input-clusters")
val distanceMeasure = new EuclideanDistanceMeasure
println("Making random seeds...")
// build 30 initial random clusters/centroids
RandomSeedGenerator.buildRandom(conf, vectorsPath, inputClustersPath, 30, distanceMeasure)
// extract user ID from hand history record
val userId = (playerHistory: PlayerHandHistory) =>
new Text(playerHistory.getPokerPlayer.getPlayerInfo.getUserId.toString)
// Builds basic dixtionary (enumeration, in fact) of all the players, participated in the selected subset of hand
// history records
class Builder(args: Args) extends Job(args) {
// input tap is an HTable with hand history entries: hand history id -> hand history record, serialized with ProtoBuf
val input = new HBaseSource("hand", args("hbasehost"), 'handId, Array("d"), Array('blob))
// output tap - plain text file with player IDs
val output = TextLine(args("output"))
input
.read
.flatMap('blob -> 'player) {
// every hand history record contains the list of players, participated in the hand
blob: Array[Byte] => // at the first stage, we simply extract the list of IDs, and add it to the flat list
HandsHistoryCoreInternalDomain.HandHistory.parseFrom(blob).getPlayerList.map(userId)
}
.unique('player) // remove duplicate user IDs
.project('player) // leave only 'player column from the tuple
.write(output)
}
class Indexer(args: Args) extends Job(args) {
val output = WritableSequenceFile(args("output"), classOf[Text], classOf[IntWritable],
'userId -> 'idx)
TextLine(args("input")).read
.map(('offset -> 'line) -> ('userId -> 'idx)) {
// dictionary lines are read with indices from TextLine source
// out of the box. For some reason, in my case, indices were multiplied by 5, so I have had to divide them
tuple: (Int, String) => (new Text(tuple._2.toString) -> new IntWritable((tuple._1 / 5)))
}
.project(('userId -> 'idx)) // only userId -> index tuple is passed to the output
.write(output)
}
class Size(args: Args) extends Job(args) {
TextLine(args("input"))
.read
.groupAll(_.size)
.project('size)
.write(TextLine(args("output")))
}
// extract user ID from hand history record
val userId = (playerHistory: PlayerHandHistory) =>
new Text(playerHistory.getPokerPlayer.getPlayerInfo.getUserId.toString)
// Builds basic dixtionary (enumeration, in fact) of all the players, participated in the selected subset of hand
// history records
class Builder(args: Args) extends Job(args) {
// input tap is an HTable with hand history entries: hand history id -> hand history record, serialized with ProtoBuf
val input = new HBaseSource("hand", args("hbasehost"), 'handId, Array("d"), Array('blob))
// output tap - plain text file with player IDs
val output = TextLine(args("output"))
input
.read
.flatMap('blob -> 'player) {
// every hand history record contains the list of players, participated in the hand
blob: Array[Byte] => // at the first stage, we simply extract the list of IDs, and add it to the flat list
HandsHistoryCoreInternalDomain.HandHistory.parseFrom(blob).getPlayerList.map(userId)
}
.unique('player) // remove duplicate user IDs
.project('player) // leave only 'player column from the tuple
.write(output)
}
class Indexer(args: Args) extends Job(args) {
val output = WritableSequenceFile(args("output"), classOf[Text], classOf[IntWritable],
'userId -> 'idx)
TextLine(args("input")).read
.map(('offset -> 'line) -> ('userId -> 'idx)) {
// dictionary lines are read with indices from TextLine source
// out of the box. For some reason, in my case, indices were multiplied by 5, so I have had to divide them
tuple: (Int, String) => (new Text(tuple._2.toString) -> new IntWritable((tuple._1 / 5)))
}
.project(('userId -> 'idx)) // only userId -> index tuple is passed to the output
.write(output)
}
class Size(args: Args) extends Job(args) {
TextLine(args("input"))
.read
.groupAll(_.size)
.project('size)
.write(TextLine(args("output")))
}
/**
* K-means clustering algorithm requires the input to be represented as vectors.
* In out case, the vector, itself, represents the player, where other users, the player has played with, are
* vector axises/features (the weigh of the feature is a number of games, played together)
* User: remeniuk
*/
class VectorBuilder(args: Args) extends Job(args) {
import Dictionary._
// initializes dictionary pipe
val dictionary = TextLine(args("dictionary"))
.read
.map(('offset -> 'line) -> ('userId -> 'dictionaryIdx)) {
tuple: (Int, String) =>
(tuple._2 -> tuple._1 / 5)
}
.project(('userId -> 'dictionaryIdx))
val input = new HBaseSource("hand", args("hbasehost"), 'handId, Array("d"), Array('blob))
val output = WritableSequenceFile(args("output"), classOf[Text], classOf[VectorWritable],
'player1Id -> 'vector)
input
.read
.flatMap('blob -> ('player1Id -> 'player2Id)) {
// builds a flat list of pairs of users that player together
blob: Array[Byte] =>
val playerList = HandsHistoryCoreInternalDomain.HandHistory.parseFrom(blob).getPlayerList.map(userId)
playerList.flatMap {
playerId =>
playerList.filterNot(_ == playerId).map(otherPlayerId => (playerId -> otherPlayerId.toString))
}
}
.joinWithSmaller('player2Id -> 'userId, dictionary) // joins the list of pairs of user that played together with
// the dictionary, so that the second member of the tuple (ID of the second player) is replaced with th index
//in the dictionary
.groupBy('player1Id -> 'dictionaryIdx) {
group => group.size // groups pairs of players, played together, counting the number of hands
}
.map(('player1Id, 'dictionaryIdx, 'size) ->('playerId, 'partialVector)) {
tuple: (String, Int, Int) =>
val partialVector = new NamedVector(
new SequentialAccessSparseVector(args("dictionarySize").toInt), tuple._1) // turns a tuple of two users
// into a vector with one feature
partialVector.set(tuple._2, tuple._3)
(new Text(tuple._1), new VectorWritable(partialVector))
}
.groupBy('player1Id) {
// combines partial vectors into one vector that represents the number of hands, played with other players
group => group.reduce('partialVector -> 'vector) {
(left: VectorWritable, right: VectorWritable) =>
new VectorWritable(left.get.plus(right.get))
}
}
.write(output)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment