|
fs = require 'fs' |
|
net = require 'net' |
|
path = require 'path' |
|
async = require 'async' |
|
crypto = require 'crypto' |
|
readline = require 'readline' |
|
msgpack = require 'msgpack' |
|
secure = require './secure' |
|
{BigInteger} = require 'bigdecimal' |
|
|
|
# Network state |
|
# Server information |
|
[peers, pendingHookups, pendingDownloads, chunks] = [{}, {}, {}, {}] |
|
[peersLength, chunksLength, serverPort] = [0, 0, 0] |
|
routingKey = secure.randomHex() |
|
|
|
console.log 'Hearsay File Sharer v0.0.1' |
|
console.log 'Starting up...' |
|
|
|
# Start the server. Handles inter-cluster requests. (Insecure) |
|
server = net.createServer (conn) -> |
|
conn.once 'data', (data) => |
|
data = data.toString().split ' ' |
|
switch data[0] |
|
when 'hookup' |
|
address = conn.remoteAddress + ':' + data[1] |
|
conn.end() |
|
|
|
# 1. Check that given port is valid. |
|
# 2. Decide if we want this connection for ourselves. |
|
# 3. Make sure we don't know this peer. |
|
# 4. Make sure this isn't one of ours. |
|
port = parseInt data[1] |
|
want = secure.want peersLength |
|
know = peers[address]? |
|
ours = pendingHookups[data[2]]? |
|
|
|
if not isNaN(port) and want and not know and not ours |
|
# Connect and befriend. |
|
conn = net.connect data[1], conn.remoteAddress, -> |
|
conn.write 'mate ' + serverPort + ' ' + data[2] |
|
|
|
conn.once 'data', (res) -> |
|
if res.toString() is 'ok' then peer conn, data[1], true |
|
|
|
conn.on 'error', -> |
|
|
|
else if not isNaN port and not ours |
|
# Pass on. |
|
drain.push cmd: 'hookup', address: address, tracker: data[2] |
|
|
|
when 'mate' |
|
# If ours, acknowledge. If not, throw away. |
|
if pendingHookups[data[2]]? |
|
delete pendingHookups[data[2]] |
|
conn.write 'ok' |
|
peer conn, data[1], false |
|
else |
|
conn.end 'no' |
|
|
|
# So there's never an 'unhandeled' error. |
|
conn.on 'error', -> |
|
|
|
server.listen -> |
|
serverPort = server.address().port |
|
|
|
console.log 'Server port:', serverPort |
|
frontend() |
|
|
|
|
|
# Peers |
|
drainer = (task, done) -> |
|
if peersLength is 0 then return |
|
cb = task.cb |
|
delete task.cb |
|
|
|
if (task.cmd is 'req' or task.cmd is 'res') and task.from? |
|
# Hash tracker |
|
c = crypto.createCipher 'aes-256-ctr', routingKey |
|
c.end task.tracker, 'hex' |
|
tracker = c.read().toString 'hex' |
|
|
|
# Generate lists of peers to reference. |
|
first = (addr for addr of peers).sort() |
|
second = (addr for addr of peers).sort (a, b) -> |
|
c = crypto.createCipher 'aes-256-ctr', routingKey |
|
c.write tracker, 'hex' |
|
c.end a |
|
a = c.read().toString 'hex' |
|
|
|
c = crypto.createCipher 'aes-256-ctr', routingKey |
|
c.write tracker, 'hex' |
|
c.end b |
|
b = c.read().toString 'hex' |
|
|
|
a > b |
|
|
|
if task.cmd is 'res' then [first, second] = [second, first] |
|
|
|
# Choose next. |
|
n = first.indexOf task.from |
|
delete task.from |
|
addr = second[n] |
|
else |
|
# Randomly choose peer. |
|
[n, m] = [secure.random(peersLength), 0] |
|
for addr of peers |
|
if n is m then break else m++ |
|
|
|
# Encrypt. |
|
peers[addr].writeCipher.write msgpack.pack task |
|
ct = peers[addr].writeCipher.read().toString 'base64' |
|
|
|
# MAC. |
|
mac = crypto.createHmac 'sha256', peers[addr].writeMACKey |
|
mac.end ct, 'base64' |
|
tag = mac.read().toString 'base64' |
|
|
|
peers[addr].conn.write ct + tag + '\n', 'utf8', (err) -> |
|
if cb? then cb err |
|
done err |
|
|
|
drain = async.queue drainer, 1 |
|
|
|
peer = (conn, port, init) -> |
|
# Secure the connection. |
|
secure.remote conn, init |
|
address = conn.remoteAddress + ':' + port |
|
|
|
conn.on 'secure', (writeCipher, writeMKey) -> |
|
peersLength++ |
|
peers[address] = |
|
conn: conn |
|
writeCipher: writeCipher |
|
writeMACKey: writeMKey |
|
|
|
conn.on 'end', -> |
|
peersLength-- |
|
delete peers[address] |
|
|
|
conn.on 'line', (data) -> |
|
if data.cmd is 'hookup' and data.address? and data.tracker? |
|
want = secure.want peersLength |
|
know = peers[data.address]? |
|
ours = pendingHookups[data.tracker]? |
|
[host, port] = data.address.split ':' |
|
|
|
if want and not know and not ours |
|
# Connect and befriend. |
|
conn = net.connect port, host, -> |
|
conn.write 'mate ' + serverPort + ' ' + data.tracker |
|
|
|
conn.once 'data', (res) -> |
|
if res.toString() is 'ok' then peer conn, port, true |
|
|
|
conn.on 'error', -> |
|
|
|
else if not ours and not secure.drop() |
|
drain.push data |
|
|
|
else if data.cmd is 'distr' and data.tag? and data.chunk? |
|
know = chunks[data.tag]? |
|
|
|
if secure.want() and not know |
|
chunks[data.tag] = data: data.chunk, own: false |
|
chunksLength++ |
|
|
|
if not secure.drop() then drain.push data |
|
|
|
else if data.cmd is 'req' and data.tracker? and data.tag? |
|
if chunks[data.tag]? |
|
drain.push { |
|
cmd: 'res', |
|
tracker: data.tracker, |
|
chunk: chunks[data.tag].data |
|
} |
|
|
|
else if not secure.drop() |
|
drain.push data |
|
|
|
else if data.cmd is 'res' and data.tracker? and data.chunk? |
|
if pendingDownloads[data.tracker]? |
|
pendingDownloads[data.tracker](data.chunk) |
|
else if not secure.drop() |
|
drain.push data |
|
|
|
# General maintenance. |
|
maintenance = -> |
|
# Clean up old hookups. |
|
d = (new Date()).getTime() |
|
|
|
clean = (tracker) -> |
|
if d - pendingHookups[tracker] >= 5000 |
|
delete pendingHookups[tracker] |
|
|
|
clean tracker for tracker of pendingHookups |
|
|
|
# Clean out unwanted chunks. |
|
clean = (tag) -> |
|
if chunks[tag].own is false and secure.drop() |
|
delete chunks[tag] |
|
|
|
clean tag for tag of chunks |
|
|
|
# Play matchmaker. |
|
if peersLength is 0 or peersLength >= 5 then return |
|
[n, m] = [secure.random(peersLength), 0] |
|
for addr of peers |
|
if n is m then break |
|
m++ |
|
|
|
[host, port] = addr.split ':' |
|
conn = net.connect port, host, -> |
|
d = new Date() |
|
tracker = secure.randomHex() |
|
pendingHookups[tracker] = d.getTime() |
|
conn.end 'hookup ' + serverPort + ' ' + tracker |
|
|
|
conn.on 'error', -> |
|
|
|
setInterval maintenance, 5000 |
|
|
|
publish = () -> |
|
if peersLength is 0 then return setTimeout publish, 10000 |
|
|
|
# Publish chunks. |
|
args = ({tag: tag, chunk: chunk} for tag, chunk of chunks) |
|
pub = (item, done) -> |
|
drain.push cmd: 'distr', tag: item.tag, chunk: item.chunk.data, cb: -> |
|
setTimeout done, Math.random() * 50000 |
|
|
|
async.mapSeries args, pub, -> setTimeout publish, 10000 |
|
|
|
publish() |
|
|
|
# Command Line Interface |
|
frontend = -> |
|
# Completion service. |
|
completer = (line) -> |
|
[completions, hits] = [[], []] |
|
|
|
line = path.normalize line |
|
[p, n] = [path.dirname(line), path.basename(line)] |
|
exists = fs.existsSync p |
|
|
|
completions = if exists then fs.readdirSync p else [] |
|
|
|
hits = completions.filter (c) -> c.indexOf(n) is 0 |
|
|
|
if hits.length is 1 |
|
h = hits[0] |
|
hits[0] = p |
|
hits[0] += if p[p.length - 1] is path.sep then '' else path.sep |
|
hits[0] += h |
|
|
|
stats = fs.statSync hits[0] |
|
if stats.isDirectory() then hits[0] += path.sep |
|
|
|
rec = if hits.length > 0 then hits else completions |
|
return [rec, line] |
|
|
|
# Create interface. |
|
rl = readline.createInterface { |
|
input: process.stdin |
|
output: process.stdout |
|
completer: completer |
|
} |
|
|
|
# Parses and handles lines entered by the user. |
|
rl.on 'line', (line) -> |
|
line = line.trim().split ' ' |
|
|
|
switch line[0] |
|
when "" then console.log "" |
|
when "chunks" |
|
console.log tag for tag of chunks |
|
|
|
when "peers" |
|
console.log addr for addr, _ of peers |
|
if peersLength is 0 then console.log 'No peers.' |
|
|
|
when "enter" |
|
if line.length isnt 3 then return rl.prompt true |
|
conn = net.connect line[2], line[1], -> |
|
d = new Date() |
|
tracker = secure.randomHex() |
|
pendingHookups[tracker] = d.getTime() |
|
conn.end 'hookup ' + serverPort + ' ' + tracker |
|
|
|
end = (error) => |
|
if error? then console.log error |
|
rl.prompt true |
|
|
|
conn.on 'error', end |
|
conn.on 'end', end |
|
|
|
when "upload" |
|
if line.length isnt 1 then return rl.prompt true |
|
readChunk = (inFd, outFd, cipher) -> |
|
buff = new Buffer 262144 |
|
fs.read inFd, buff, 0, 262144, null, (err, n, buff) -> |
|
if err isnt null or n is 0 |
|
return rl.prompt true |
|
|
|
# Encrypt chunk. |
|
cipher.write buff.slice(0, n) |
|
chunk = cipher.read() |
|
|
|
# Hash encrypted chunk. |
|
h = crypto.createHash 'sha256' |
|
h.end chunk |
|
tag = h.read().toString 'base64' |
|
|
|
# Publish |
|
chunks[tag] = data: chunk.toJSON(), ours: true |
|
chunksLength++ |
|
|
|
tb = new Buffer tag |
|
fs.write outFd, tb, 0, tb.length, null, -> |
|
readChunk inFd, outFd, cipher |
|
|
|
# Get files needed, open them, and prepare for uploading. |
|
questions = [ |
|
{ask: 'Input file? ', mode: 'r'}, |
|
{ask: 'Output file? ', mode: 'w'}, |
|
] |
|
|
|
openFile = (task, done) -> |
|
rl.question task.ask, (file) -> |
|
fs.open file, task.mode, null, done |
|
|
|
async.mapSeries questions, openFile, (err, fds) -> |
|
if err? |
|
console.log err |
|
rl.prompt true |
|
else |
|
key = secure.randomBase64() |
|
kb = new Buffer key |
|
fs.write fds[1], kb, 0, kb.length, null, -> |
|
c = crypto.createCipher 'aes-256-ctr', key |
|
readChunk fds[0], fds[1], c |
|
|
|
return |
|
|
|
when "download" |
|
if line.length isnt 1 then return rl.prompt true |
|
|
|
nextLine = (fd, cb) -> |
|
buff = new Buffer 44 |
|
fs.read fd, buff, 0, 44, null, (err, n, buff) -> |
|
if err is null and n is 44 |
|
cb buff.toString() |
|
else |
|
cb() |
|
|
|
timeout = (tracker) -> |
|
if not pendingDownloads[tracker]? then return |
|
pendingDownloads[tracker]('timeout') |
|
|
|
fetchChunk = (inFd, outFd, cipher, tag) -> |
|
if not tag? then return rl.prompt true |
|
|
|
# Choose a random tracker, put it as pending, and send a |
|
# request with it. |
|
tracker = secure.randomHex() |
|
pendingDownloads[tracker] = (chunk) -> |
|
if chunk is 'timeout' |
|
# Timed out waiting for this chunk. |
|
# Add limiters here later. |
|
delete pendingDownloads[tracker] |
|
fetchChunk inFd, outFd, cipher, tag |
|
else |
|
chunk = new Buffer chunk |
|
|
|
# Hash encrypted chunk to check integrity. |
|
h = crypto.createHash 'sha256' |
|
h.end chunk |
|
candidateTag = h.read().toString 'base64' |
|
|
|
if tag isnt candidateTag then return |
|
delete pendingDownloads[tracker] |
|
|
|
# Decrypt chunk. |
|
cipher.write chunk |
|
chunk = cipher.read() |
|
|
|
# Output, and get next chunk. |
|
fs.write outFd, chunk, 0, chunk.length, null, -> |
|
nextLine inFd, (tag) -> |
|
fetchChunk inFd, outFd, cipher, tag |
|
|
|
drain.push cmd: 'req', tracker: tracker, tag: tag |
|
setTimeout timeout, 5000, tracker |
|
|
|
# Get files needed, open them, and prepare for downloading. |
|
questions = [ |
|
{ask: 'Input file? ', mode: 'r'}, |
|
{ask: 'Output file? ', mode: 'w'}, |
|
] |
|
|
|
openFile = (task, done) -> |
|
rl.question task.ask, (file) -> |
|
fs.open file, task.mode, null, done |
|
|
|
async.mapSeries questions, openFile, (err, fds) -> |
|
if err? |
|
console.log err |
|
rl.prompt true |
|
else |
|
nextLine fds[0], (key) -> |
|
c = crypto.createCipher 'aes-256-ctr', key |
|
nextLine fds[0], (tag) -> |
|
fetchChunk fds[0], fds[1], c, tag |
|
|
|
return |
|
|
|
else console.log 'Command not known.' |
|
|
|
rl.prompt true |
|
|
|
rl.on 'close', -> |
|
console.log '\nGoodbye' |
|
process.exit 0 |
|
|
|
rl.prompt true |