Skip to content

Instantly share code, notes, and snippets.

@oerpli
Created April 20, 2018 12:18
Show Gist options
  • Save oerpli/03222f03f8ef7ee38615d2487477981e to your computer and use it in GitHub Desktop.
Save oerpli/03222f03f8ef7ee38615d2487477981e to your computer and use it in GitHub Desktop.
import java.io.PrintWriter
// import java.util
import org.apache.spark.sql.DataFrame
// import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.Row
import scala.collection.mutable
// import spark.implicits._
def fileWSL(n : String) = "/mnt/c/linking/git/Linkability/xmr_data/csv/chronologic_rings_" + n + ".csv" // from WSL use this path
def file(n : String) = "c:/linking/git/Linkability/xmr_data/csv/chronologic_rings_" + n + ".csv" // from W use this path
// Call Main.main(file[WSL]("<x>")) where x in [1E5,1E6,1E7,full]
object Main {
def readFile(file:String) = {
val lines = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true") // should recognize that the table is (int,int)
.load(file)
.toDF("inid", "outid")
.rdd
lines
}
def main(file: String) {
val spark = org.apache.spark.sql.SparkSession.builder
.appName("Monero Linkability v2")
.getOrCreate
val in_out = this.readFile(file)
val in_outs = in_out // row: input and its ref'd outputs (CWA)
.map(row=>(row(0),mutable.Set(row(1))))
.reduceByKey(_.union(_))
.collectAsMap
.map(a=> (a._1.asInstanceOf[Int], a._2.asInstanceOf[mutable.Set[Int]]))
val out_ins = in_out // row: output and inputs where it's ref'd (OWA)
.map(row=>(row(1),mutable.Set(row(0))))
.reduceByKey(_.union(_))
.collectAsMap
.map(a=> (a._1.asInstanceOf[Int], a._2.asInstanceOf[mutable.Set[Int]]))
val totalRefs = in_outs.map(_._2.size).foldLeft(0)(_+_)
println("Total references: " + totalRefs)
println("Starting matching algorithm:")
var numOfIterations = 0
var Match = in_outs.filter(a => a._2.size == 1).map(a=> (a._1,a._2.head))
while (Match.size > 0) {
numOfIterations += 1
var matchNext = mutable.Map.empty[Int, Int] // cache elements that only have one remaining
println("Iteration: " + numOfIterations + ", matching " + Match.size + " new inputs")
for ((in, spent) <- Match) { // match transactions with only one input
// Remove inputs used in matched transactions from other Txs
out_ins(spent) // get set of inputs that ref out
.filter(in != _) // inputs except in
.foreach(input => {
val outs = in_outs(input)
outs.remove(spent)
if (outs.size == 1) { // if single output match in next step
matchNext.put(input, outs.head)
}
if (outs.size == 0){// if single output also gets removed. happens in truncated data sets (should not be though)
println("WTF: " + (in,input,spent))
}
})
}
Match = matchNext
}
val totalRefsRem = in_outs.map(_._2.size).foldLeft(0)(_+_)
println("References remaining: " + totalRefsRem)
println("References removed: " + (totalRefs - totalRefsRem))
val tx_realInput = in_outs
.filter(_._2.size == 1) // if only one remaining input
.map(a => (a._1, a._2.head)) // put (tx,input) in result set
val percentage = tx_realInput.size * 1.0 / in_outs.size
println(percentage)
// } finally {
// spark.stop()
// }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment