Created
November 23, 2018 21:56
-
-
Save NoraCodes/59e2285c5713c696694c765b16df9043 to your computer and use it in GitHub Desktop.
Python script to listen for and dump info on Dropbox sync sessions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
import socketserver | |
import json | |
import datetime | |
import sys | |
import getopt | |
known_hosts = {} | |
QUIET = False | |
def usage(): | |
print("dropsniff.py [-l address] [-p port] [-c filename] [-q]") | |
print("Listen for Dropbox LAN sync announcements.") | |
print("-l address Listen on a specific address (default 0.0.0.0)") | |
print("-p port Listen on a specific port (default 17500)") | |
print("-c filename Dump state to a CSV when killed") | |
print("-q Quiet; don't print to the console at all") | |
class DropboxAnnounceData: | |
def __init__(self, displayname, namespaces, port, host_int, version): | |
self.displayname = displayname | |
self.namespaces = namespaces | |
self.port = port | |
self.host_int = host_int | |
self.version = version | |
def validate_data(data): | |
try: | |
d = json.loads(data) | |
except json.JSONDecodeError as e: | |
return None | |
return DropboxAnnounceData( | |
d['displayname'], | |
d['namespaces'], | |
d['port'], | |
d['host_int'], | |
d['version']) | |
class DropsniffHandler(socketserver.DatagramRequestHandler): | |
def handle(self): | |
data = self.request[0].strip() | |
socket = self.request[1] | |
d = validate_data(data) | |
if d is None: | |
return | |
# If the host has never been seen before, insert it. | |
if known_hosts.get(self.client_address[0]) is None: | |
known_hosts[self.client_address[0]] = (d, 1) | |
# If it has been seen, update the counter and the data. | |
else: | |
(old_d, times_seen) = known_hosts[self.client_address[0]] | |
known_hosts[self.client_address[0]] = (d, times_seen + 1) | |
if QUIET: | |
return | |
# Print out the result of the | |
print("\nAt {} we know:".format(datetime.datetime.now())) | |
for ip in known_hosts: | |
(dbox, times_seen) = known_hosts[ip] | |
print("{} running v{} on {} (nickname '{}') seen {} time(s)".format( | |
ip, | |
dbox.version, | |
dbox.port, | |
dbox.displayname, | |
times_seen)) | |
#print("{} wrote:".format(self.client_address[0])) | |
#print(data) | |
if __name__ == "__main__": | |
HOST, PORT = "0.0.0.0", 17500 | |
CSV = None | |
try: | |
opts, args = getopt.getopt(sys.argv[1:], "l:p:c:q") | |
except getopt.GetoptError as e: | |
print(e) | |
usage() | |
sys.exit(2) | |
for opt, arg in opts: | |
print("opt {} arg {}".format(opt, arg)) | |
if opt == '-l': | |
HOST = arg | |
elif opt == '-p': | |
PORT = arg | |
elif opt == '-c': | |
CSV = arg | |
elif opt == '-q': | |
QUIET = True | |
else: | |
assert False, "unhandled option" | |
try: | |
with socketserver.UDPServer((HOST, PORT), DropsniffHandler) as server: | |
server.serve_forever() | |
except KeyboardInterrupt as e: | |
if CSV is not None: | |
print("Writing {}".format(CSV)) | |
with open(CSV, "w") as f: | |
f.write("ip\tdisplayname\tnamespaces\tport\thost_int\tversion\tseen\n") | |
for ip in known_hosts: | |
(dbox, times_seen) = known_hosts[ip] | |
f.write("{}\t{}\t{}\t{}\t{}\t{}\t{}\n".format( | |
ip, | |
dbox.displayname, | |
dbox.namespaces, | |
dbox.port, | |
dbox.host_int, | |
dbox.version, | |
times_seen)) | |
sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment