Created
February 3, 2011 23:19
-
-
Save shellac/810440 to your computer and use it in GitHub Desktop.
This has a little test at the start, btw
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.zip.GZIPInputStream; | |
import java.io.*; | |
import java.util.*; | |
import java.util.concurrent.*; | |
public class Calc | |
{ | |
public static void main(final String... args) throws Exception { | |
if (args.length == 0) { | |
System.err.println("Usage: Calc <filename> [ <outfile> ]"); | |
System.exit(1); | |
} | |
final String file = args[0]; | |
final String outfile = (args.length > 1) ? args[1] : file + ".result"; | |
final int chunkSize = (args.length > 2) ? Integer.parseInt(args[2]) : 1000; | |
final int numT = (args.length > 3) ? Integer.parseInt(args[3]) : 2; | |
System.err.printf("In:\t<%s>\nOut:\t<%s>\nChunk size: %s, Threads: %s\n", file, outfile, chunkSize, numT); | |
final InputStream in = (file.endsWith(".gz")) ? | |
new GZIPInputStream(new FileInputStream(file)) : | |
new FileInputStream(file) ; | |
final Writer out = new BufferedWriter(new FileWriter(outfile)); | |
BufferedReader reader = new BufferedReader(new InputStreamReader(in, "UTF-8")); | |
String line; | |
List<Data> data = new ArrayList<Data>(); | |
int count = 0; | |
while ((line = reader.readLine()) != null) { | |
String[] split = line.split("\t"); | |
for (int i = 0; i < split.length; i++) split[i] = split[i].replace("\"","\\\""); | |
count++; | |
Data d = new Data(split); | |
data.add(new Data(split)); | |
} | |
System.err.printf("Number of lines: %s\n", count); | |
int aNumT; | |
CountDownLatch finishedLatch; | |
final Task[] tasks = new Task[numT]; | |
final int stepSize = numT * chunkSize; | |
final Output output = new Output(out); // handles output for all other threads | |
final Thread outthread = new Thread(output); | |
outthread.start(); | |
for (int i = 0; i < count - 1; i+=stepSize) { | |
aNumT = ( i + stepSize < count - 1 ) ? numT : (count - 1 - i) / chunkSize + 1; | |
finishedLatch = new CountDownLatch(aNumT); | |
for (int t = 0; t < aNumT; t++) { | |
System.err.printf("#%s:\t%s\t%s\n", t, i + t * chunkSize, i + t * chunkSize + chunkSize); | |
tasks[t] = new Task(data, i + t * chunkSize, i + t * chunkSize + chunkSize, count, finishedLatch, output); | |
new Thread(tasks[t]).start(); | |
} | |
finishedLatch.await(); | |
System.err.printf("-------- %s --------\n", i); | |
} | |
output.pipe.put(Output.FINISHED); | |
outthread.join(); | |
} | |
final static Collection<String> intersection(final String[] a, final String[] b) { | |
int ia = 0; | |
int ib = 0; | |
final LinkedList<String> items = new LinkedList<String>(); | |
int comp; | |
final int al = a.length; | |
final int bl = b.length; | |
while (ia < al && ib < bl) { | |
comp = a[ia].compareTo(b[ib]); | |
if (comp == 0) { | |
items.add(a[ia]); | |
ia++; ib++; | |
continue; | |
} else if (comp < 0) ia++; | |
else ib++; | |
} | |
return items; | |
} | |
static final class Data { | |
final String pid; | |
final String crid; | |
final String[] tags; | |
//final Set<String> tagset; | |
final int checker; | |
public Data(final String[] line) { | |
pid = line[0]; | |
crid = line[1]; | |
final int taglen = line.length - 2; | |
tags = new String[taglen]; | |
System.arraycopy(line, 2, tags, 0, taglen); | |
Arrays.sort(tags); | |
int c = 0; | |
for (String tag: tags) { | |
c = c | tag.hashCode(); | |
} | |
checker = c; | |
//tagset = new HashSet<String>(Arrays.asList(tags)); | |
} | |
public String toString() { | |
StringBuilder sb = new StringBuilder(); | |
sb.append("["); | |
sb.append(pid); | |
sb.append(","); | |
sb.append(crid); | |
sb.append("] "); | |
for (String tag: tags) { | |
sb.append(tag).append(" , "); | |
} | |
return sb.toString(); | |
} | |
} | |
static class Output implements Runnable { | |
final BlockingQueue<String> pipe; | |
final Writer out; | |
static final String FINISHED = "!!!!FINISHED!!!!!"; | |
public Output(final Writer out) { | |
this.out = out; | |
this.pipe = new ArrayBlockingQueue<String>(100); | |
} | |
public void run() { | |
try { | |
while (true) { | |
String next = pipe.take(); | |
if (next == FINISHED) { | |
out.flush(); | |
break; | |
} else { | |
out.write(next); | |
} | |
} | |
} catch (Exception e) { System.err.println(e.getMessage()); } | |
} | |
} | |
static final class Task implements Runnable { | |
final List<Data> data; | |
final int start, end, count; | |
final CountDownLatch finishedLatch; | |
final Output output; | |
public Task(final List<Data> data, final int start, final int end, final int count, CountDownLatch finishedLatch, Output output) { | |
this.data = data; | |
this.start = start; | |
this.end = (end > count) ? count : end; | |
this.output = output; | |
this.count = count; | |
this.finishedLatch = finishedLatch; | |
} | |
public void run() { | |
Data d1, d2; | |
double val; | |
int inter; | |
Collection<String> items; | |
StringBuilder out; | |
for (int i = start; i < end; i++) { | |
out = new StringBuilder(10000); | |
for (int j = i + 1; j < count; j++) { | |
d1 = data.get(i); | |
d2 = data.get(j); | |
if (d1.pid.equals(d2.pid) || d1.crid.equals(d2.crid)) continue; | |
if ((d1.checker & d2.checker) == 0) continue; | |
items = intersection(d1.tags, d2.tags); | |
//items = new HashSet<String>(d1.tagset); | |
//items.retainAll(d2.tagset); | |
/*items = new LinkedList<String>(); | |
for (String a: d1.tags) { | |
if (d2.tagset.contains(a)) items.add(a); | |
}*/ | |
inter = items.size(); | |
if (inter == 0) continue; | |
val = (double) inter / (double) ( d1.tags.length + d2.tags.length - inter ); | |
out.append(d1.pid); | |
out.append("\t"); | |
out.append(d2.pid); | |
out.append("\t"); | |
out.append(Double.toString(val)); | |
out.append("\t"); | |
out.append(Integer.toString(inter)); | |
out.append("\t"); | |
boolean first = true; | |
for (String item: items) { | |
if (first) first = false; | |
else out.append(","); | |
out.append(" '"); | |
out.append(item); | |
out.append("'"); | |
} | |
out.append("\n"); | |
} | |
try { | |
output.pipe.put(out.toString()); | |
} catch (Exception e) { System.err.println(e.getMessage()); } | |
} | |
finishedLatch.countDown(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment