Created
April 20, 2018 12:18
-
-
Save oerpli/03222f03f8ef7ee38615d2487477981e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.PrintWriter | |
// import java.util | |
import org.apache.spark.sql.DataFrame | |
// import org.apache.spark.sql.types.{LongType, StructField, StructType} | |
import org.apache.spark.sql.Row | |
import scala.collection.mutable | |
// import spark.implicits._ | |
def fileWSL(n : String) = "/mnt/c/linking/git/Linkability/xmr_data/csv/chronologic_rings_" + n + ".csv" // from WSL use this path | |
def file(n : String) = "c:/linking/git/Linkability/xmr_data/csv/chronologic_rings_" + n + ".csv" // from W use this path | |
// Call Main.main(file[WSL]("<x>")) where x in [1E5,1E6,1E7,full] | |
object Main { | |
def readFile(file:String) = { | |
val lines = spark.read | |
.format("csv") | |
.option("header", "true") | |
.option("inferSchema", "true") // should recognize that the table is (int,int) | |
.load(file) | |
.toDF("inid", "outid") | |
.rdd | |
lines | |
} | |
def main(file: String) { | |
val spark = org.apache.spark.sql.SparkSession.builder | |
.appName("Monero Linkability v2") | |
.getOrCreate | |
val in_out = this.readFile(file) | |
val in_outs = in_out // row: input and its ref'd outputs (CWA) | |
.map(row=>(row(0),mutable.Set(row(1)))) | |
.reduceByKey(_.union(_)) | |
.collectAsMap | |
.map(a=> (a._1.asInstanceOf[Int], a._2.asInstanceOf[mutable.Set[Int]])) | |
val out_ins = in_out // row: output and inputs where it's ref'd (OWA) | |
.map(row=>(row(1),mutable.Set(row(0)))) | |
.reduceByKey(_.union(_)) | |
.collectAsMap | |
.map(a=> (a._1.asInstanceOf[Int], a._2.asInstanceOf[mutable.Set[Int]])) | |
val totalRefs = in_outs.map(_._2.size).foldLeft(0)(_+_) | |
println("Total references: " + totalRefs) | |
println("Starting matching algorithm:") | |
var numOfIterations = 0 | |
var Match = in_outs.filter(a => a._2.size == 1).map(a=> (a._1,a._2.head)) | |
while (Match.size > 0) { | |
numOfIterations += 1 | |
var matchNext = mutable.Map.empty[Int, Int] // cache elements that only have one remaining | |
println("Iteration: " + numOfIterations + ", matching " + Match.size + " new inputs") | |
for ((in, spent) <- Match) { // match transactions with only one input | |
// Remove inputs used in matched transactions from other Txs | |
out_ins(spent) // get set of inputs that ref out | |
.filter(in != _) // inputs except in | |
.foreach(input => { | |
val outs = in_outs(input) | |
outs.remove(spent) | |
if (outs.size == 1) { // if single output match in next step | |
matchNext.put(input, outs.head) | |
} | |
if (outs.size == 0){// if single output also gets removed. happens in truncated data sets (should not be though) | |
println("WTF: " + (in,input,spent)) | |
} | |
}) | |
} | |
Match = matchNext | |
} | |
val totalRefsRem = in_outs.map(_._2.size).foldLeft(0)(_+_) | |
println("References remaining: " + totalRefsRem) | |
println("References removed: " + (totalRefs - totalRefsRem)) | |
val tx_realInput = in_outs | |
.filter(_._2.size == 1) // if only one remaining input | |
.map(a => (a._1, a._2.head)) // put (tx,input) in result set | |
val percentage = tx_realInput.size * 1.0 / in_outs.size | |
println(percentage) | |
// } finally { | |
// spark.stop() | |
// } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment