Created
August 6, 2018 18:24
-
-
Save hkolbeck/a4379fc56630a9caa43e2321b1101bcc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.urbanairship.goro.task; | |
import com.google.common.collect.Lists; | |
import com.google.common.primitives.Ints; | |
import org.apache.commons.io.IOUtils; | |
import org.apache.log4j.LogManager; | |
import org.apache.log4j.Logger; | |
import java.io.BufferedReader; | |
import java.io.BufferedWriter; | |
import java.io.FileOutputStream; | |
import java.io.IOException; | |
import java.io.InputStreamReader; | |
import java.io.OutputStreamWriter; | |
import java.net.InetSocketAddress; | |
import java.net.ServerSocket; | |
import java.net.Socket; | |
import java.net.SocketTimeoutException; | |
import java.time.Duration; | |
import java.util.ArrayList; | |
import java.util.BitSet; | |
import java.util.concurrent.ArrayBlockingQueue; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.RejectedExecutionException; | |
import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.SynchronousQueue; | |
import java.util.concurrent.ThreadPoolExecutor; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicLong; | |
public class NumServer { | |
private static final Logger log = LogManager.getLogger(NumServer.class); | |
private static final String FILE_NAME = "numbers.log"; | |
private static final String LINE_TERMINATOR = String.format("%n"); | |
private static final String EXIT_CMD = "terminate"; | |
private static final int MAX_CONN = 5; | |
private static final int LINE_LEN = 9; | |
private static final int LISTEN_PORT = 4000; | |
private static final int WRITE_QUEUE_SIZE = 10_000; | |
private static final Duration AWAIT_TIME = Duration.ofMillis(1000); | |
private static final Duration LOG_INTERVAL = Duration.ofSeconds(10); | |
private final ExecutorService connectionHandlers = | |
new ThreadPoolExecutor(MAX_CONN, 5, 0, TimeUnit.MILLISECONDS, new SynchronousQueue<>()); | |
private final ScheduledExecutorService logger = Executors.newSingleThreadScheduledExecutor(); | |
private final ExecutorService writer = Executors.newSingleThreadExecutor(); | |
private final AtomicBoolean running = new AtomicBoolean(true); | |
public static void main(String[] args) { | |
final NumServer numServer = new NumServer(); | |
final boolean cleanExit = numServer.runServer(); | |
if (cleanExit) { | |
System.exit(0); | |
} else { | |
System.exit(1); | |
} | |
} | |
public boolean runServer() { | |
final BufferedWriter output; | |
try { | |
output = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(FILE_NAME, false))); | |
} catch (Exception e) { | |
log.error(String.format("Couldn't open file '%s' for writing, exiting", FILE_NAME), e); | |
return false; | |
} | |
final ArrayBlockingQueue<ParsedInt> queue = new ArrayBlockingQueue<>(WRITE_QUEUE_SIZE); | |
final NumberWriter numberWriter = new NumberWriter(queue, output); | |
writer.execute(numberWriter); | |
try (final ServerSocket serverSocket = new ServerSocket()) { | |
serverSocket.setSoTimeout((int) AWAIT_TIME.toMillis()); | |
serverSocket.bind(new InetSocketAddress(LISTEN_PORT)); | |
logger.scheduleAtFixedRate(numberWriter::issueReport, LOG_INTERVAL.toMillis(), LOG_INTERVAL.toMillis(), TimeUnit.MILLISECONDS); | |
while (running.get()) { | |
final Socket conn; | |
try { | |
conn = serverSocket.accept(); | |
} catch (SocketTimeoutException ste) { | |
continue; | |
} | |
try { | |
connectionHandlers.execute(new ConnectionHandler(queue, conn)); | |
} catch (RejectedExecutionException ree) { | |
IOUtils.closeQuietly(conn); | |
} | |
} | |
return true; | |
} catch (IOException e) { | |
running.set(false); | |
log.error("Exception in server accept loop, shutting down", e); | |
return false; | |
} finally { | |
writer.shutdownNow(); | |
logger.shutdownNow(); | |
connectionHandlers.shutdownNow(); | |
} | |
} | |
private class NumberWriter implements Runnable { | |
private final BitSet seen = new BitSet(); | |
private final AtomicLong dupSinceLastReport = new AtomicLong(0); | |
private final AtomicLong totalUniq = new AtomicLong(0); | |
private final AtomicLong uniqAtLastReport = new AtomicLong(0); | |
private final BlockingQueue<ParsedInt> inputs; | |
private final BufferedWriter uniques; | |
private NumberWriter(BlockingQueue<ParsedInt> inputs, BufferedWriter uniques) { | |
this.inputs = inputs; | |
this.uniques = uniques; | |
} | |
@Override | |
public void run() { | |
try { | |
final ArrayList<ParsedInt> batch = Lists.newArrayListWithCapacity(100); | |
while (running.get()) { | |
final int drained = inputs.drainTo(batch, 100); | |
if (drained > 0) { | |
for (ParsedInt candidate : batch) { | |
if (seen.get(candidate.parsed)) { | |
dupSinceLastReport.incrementAndGet(); | |
} else { | |
totalUniq.incrementAndGet(); | |
seen.set(candidate.parsed); | |
uniques.write(candidate.raw); | |
uniques.write(LINE_TERMINATOR); | |
} | |
} | |
batch.clear(); | |
} | |
} | |
} catch (Exception e) { | |
// e could be an interrupt, but we're exiting so just swallow it | |
log.error("Fatal error in writer!", e); | |
running.set(false); | |
} finally { | |
IOUtils.closeQuietly(uniques); | |
} | |
} | |
private void issueReport() { | |
final long dupsInInterval = dupSinceLastReport.getAndSet(0); | |
final long currentUniq = totalUniq.get(); | |
final long uniqInInterval = currentUniq - uniqAtLastReport.getAndSet(currentUniq); | |
final double readPerSecond = (dupsInInterval + uniqInInterval) / (double) LOG_INTERVAL.getSeconds(); | |
log.info(String.format("Since last report: %d unique, %d duplicates. Lifetime uniques: %d. %f/s", | |
uniqInInterval, dupsInInterval, currentUniq, readPerSecond)); | |
} | |
} | |
private class ConnectionHandler implements Runnable { | |
private final BlockingQueue<ParsedInt> numbers; | |
private final Socket conn; | |
private ConnectionHandler(BlockingQueue<ParsedInt> numbers, Socket conn) { | |
this.numbers = numbers; | |
this.conn = conn; | |
} | |
@Override | |
public void run() { | |
try { | |
conn.setSoTimeout((int) AWAIT_TIME.toMillis()); | |
final BufferedReader connInput = new BufferedReader(new InputStreamReader(conn.getInputStream())); | |
while (running.get()) { | |
final String line = connInput.readLine(); | |
// Any unicode weirdness not caught here should cause a parse failure | |
if (line == null || line.length() != LINE_LEN) { | |
return; | |
} | |
final Integer parsed = Ints.tryParse(line, 10); | |
if (parsed != null) { | |
final ParsedInt parsedInt = new ParsedInt(line, parsed); | |
boolean queued = false; | |
while (!queued && running.get()) { | |
queued = numbers.offer(parsedInt, AWAIT_TIME.toMillis(), TimeUnit.MILLISECONDS); | |
} | |
} else { | |
if (EXIT_CMD.equals(line)) { | |
running.set(false); | |
} | |
return; | |
} | |
} | |
} catch (SocketTimeoutException e) { | |
log.info("Read from socket timed out, assuming client died"); | |
} catch (Exception e) { | |
log.error("Error in connection handler, killing connection", e); | |
} finally { | |
IOUtils.closeQuietly(conn); | |
} | |
} | |
} | |
private static class ParsedInt { | |
private final String raw; | |
private final int parsed; | |
private ParsedInt(String raw, int parsed) { | |
this.raw = raw; | |
this.parsed = parsed; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment