Skip to content

Instantly share code, notes, and snippets.

@shellac
Created February 3, 2011 23:19
Show Gist options
  • Save shellac/810440 to your computer and use it in GitHub Desktop.
Save shellac/810440 to your computer and use it in GitHub Desktop.
This has a little test at the start, btw
import java.util.zip.GZIPInputStream;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
public class Calc
{
public static void main(final String... args) throws Exception {
if (args.length == 0) {
System.err.println("Usage: Calc <filename> [ <outfile> ]");
System.exit(1);
}
final String file = args[0];
final String outfile = (args.length > 1) ? args[1] : file + ".result";
final int chunkSize = (args.length > 2) ? Integer.parseInt(args[2]) : 1000;
final int numT = (args.length > 3) ? Integer.parseInt(args[3]) : 2;
System.err.printf("In:\t<%s>\nOut:\t<%s>\nChunk size: %s, Threads: %s\n", file, outfile, chunkSize, numT);
final InputStream in = (file.endsWith(".gz")) ?
new GZIPInputStream(new FileInputStream(file)) :
new FileInputStream(file) ;
final Writer out = new BufferedWriter(new FileWriter(outfile));
BufferedReader reader = new BufferedReader(new InputStreamReader(in, "UTF-8"));
String line;
List<Data> data = new ArrayList<Data>();
int count = 0;
while ((line = reader.readLine()) != null) {
String[] split = line.split("\t");
for (int i = 0; i < split.length; i++) split[i] = split[i].replace("\"","\\\"");
count++;
Data d = new Data(split);
data.add(new Data(split));
}
System.err.printf("Number of lines: %s\n", count);
int aNumT;
CountDownLatch finishedLatch;
final Task[] tasks = new Task[numT];
final int stepSize = numT * chunkSize;
final Output output = new Output(out); // handles output for all other threads
final Thread outthread = new Thread(output);
outthread.start();
for (int i = 0; i < count - 1; i+=stepSize) {
aNumT = ( i + stepSize < count - 1 ) ? numT : (count - 1 - i) / chunkSize + 1;
finishedLatch = new CountDownLatch(aNumT);
for (int t = 0; t < aNumT; t++) {
System.err.printf("#%s:\t%s\t%s\n", t, i + t * chunkSize, i + t * chunkSize + chunkSize);
tasks[t] = new Task(data, i + t * chunkSize, i + t * chunkSize + chunkSize, count, finishedLatch, output);
new Thread(tasks[t]).start();
}
finishedLatch.await();
System.err.printf("-------- %s --------\n", i);
}
output.pipe.put(Output.FINISHED);
outthread.join();
}
final static Collection<String> intersection(final String[] a, final String[] b) {
int ia = 0;
int ib = 0;
final LinkedList<String> items = new LinkedList<String>();
int comp;
final int al = a.length;
final int bl = b.length;
while (ia < al && ib < bl) {
comp = a[ia].compareTo(b[ib]);
if (comp == 0) {
items.add(a[ia]);
ia++; ib++;
continue;
} else if (comp < 0) ia++;
else ib++;
}
return items;
}
static final class Data {
final String pid;
final String crid;
final String[] tags;
//final Set<String> tagset;
final int checker;
public Data(final String[] line) {
pid = line[0];
crid = line[1];
final int taglen = line.length - 2;
tags = new String[taglen];
System.arraycopy(line, 2, tags, 0, taglen);
Arrays.sort(tags);
int c = 0;
for (String tag: tags) {
c = c | tag.hashCode();
}
checker = c;
//tagset = new HashSet<String>(Arrays.asList(tags));
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("[");
sb.append(pid);
sb.append(",");
sb.append(crid);
sb.append("] ");
for (String tag: tags) {
sb.append(tag).append(" , ");
}
return sb.toString();
}
}
static class Output implements Runnable {
final BlockingQueue<String> pipe;
final Writer out;
static final String FINISHED = "!!!!FINISHED!!!!!";
public Output(final Writer out) {
this.out = out;
this.pipe = new ArrayBlockingQueue<String>(100);
}
public void run() {
try {
while (true) {
String next = pipe.take();
if (next == FINISHED) {
out.flush();
break;
} else {
out.write(next);
}
}
} catch (Exception e) { System.err.println(e.getMessage()); }
}
}
static final class Task implements Runnable {
final List<Data> data;
final int start, end, count;
final CountDownLatch finishedLatch;
final Output output;
public Task(final List<Data> data, final int start, final int end, final int count, CountDownLatch finishedLatch, Output output) {
this.data = data;
this.start = start;
this.end = (end > count) ? count : end;
this.output = output;
this.count = count;
this.finishedLatch = finishedLatch;
}
public void run() {
Data d1, d2;
double val;
int inter;
Collection<String> items;
StringBuilder out;
for (int i = start; i < end; i++) {
out = new StringBuilder(10000);
for (int j = i + 1; j < count; j++) {
d1 = data.get(i);
d2 = data.get(j);
if (d1.pid.equals(d2.pid) || d1.crid.equals(d2.crid)) continue;
if ((d1.checker & d2.checker) == 0) continue;
items = intersection(d1.tags, d2.tags);
//items = new HashSet<String>(d1.tagset);
//items.retainAll(d2.tagset);
/*items = new LinkedList<String>();
for (String a: d1.tags) {
if (d2.tagset.contains(a)) items.add(a);
}*/
inter = items.size();
if (inter == 0) continue;
val = (double) inter / (double) ( d1.tags.length + d2.tags.length - inter );
out.append(d1.pid);
out.append("\t");
out.append(d2.pid);
out.append("\t");
out.append(Double.toString(val));
out.append("\t");
out.append(Integer.toString(inter));
out.append("\t");
boolean first = true;
for (String item: items) {
if (first) first = false;
else out.append(",");
out.append(" '");
out.append(item);
out.append("'");
}
out.append("\n");
}
try {
output.pipe.put(out.toString());
} catch (Exception e) { System.err.println(e.getMessage()); }
}
finishedLatch.countDown();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment