Skip to content

Instantly share code, notes, and snippets.

@wagyourtail
Last active June 29, 2023 20:22
Show Gist options
  • Save wagyourtail/dc0e703ca5de4db42f3782a21837fa7f to your computer and use it in GitHub Desktop.
Save wagyourtail/dc0e703ca5de4db42f3782a21837fa7f to your computer and use it in GitHub Desktop.
MCServerScraper.py
# MIT License
#
# Copyright (c) 2022 Wagyourtail
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import argparse
import os
import threading
import time
from mcstatus import JavaServer
lock = threading.Lock()
def compareIp(ip1, ip2):
for i in range(4):
if ip1[i] < ip2[i]:
return -1
elif ip1[i] > ip2[i]:
return 1
return 0
class CIDR:
def __init__(self, cidrString):
cidr = cidrString.split('/')
cidr[0] = list(map(int, cidr[0].split('.')))
self.ip = cidr[0]
self.mask = int(cidr[1] if len(cidr) > 1 else 32)
assert all(0 <= i <= 255 for i in self.ip) and 0 <= self.mask <= 32
self.count = 2 ** (32 - self.mask)
def __str__(self):
return ".".join(map(str, self.ip)) + "/" + str(self.mask)
def min(self):
return self.ip
def max(self):
ip = self.ip.copy()
if self.mask == 32: return ip
ip[self.mask // 8] += 2 ** (8 - self.mask % 8)
return ip
def __contains__(self, item):
if isinstance(item, str):
item = CIDR(item)
if self.mask > item.mask: return False
# compare min
itemMin = item.min()
selfMin = self.min()
for i in range(4):
if itemMin[i] < selfMin[i]: return False
if itemMin[i] > selfMin[i]: break
# compare max
itemMax = item.max()
selfMax = self.max()
for i in range(4):
if itemMax[i] > selfMax[i]: return False
if itemMax[i] < selfMax[i]: break
return True
def overlaps(self, other):
if isinstance(other, str):
other = CIDR(other)
itemMin = other.min()
selfMin = self.min()
itemMax = other.max()
selfMax = self.max()
if compareIp(itemMin, selfMax) != 1:
return compareIp(itemMax, selfMin) != -1
elif compareIp(itemMax, selfMin) != -1:
return compareIp(itemMin, selfMax) != 1
return False
def split(self):
if self.mask == 32: return [self]
second = self.ip.copy()
second[self.mask // 8] += 2 ** (7 - self.mask % 8)
mask = self.mask + 1
return [CIDR(".".join(map(str, self.ip)) + "/" + str(mask)), CIDR(".".join(map(str, second)) + "/" + str(mask))]
def __iter__(self):
current = self.ip.copy()
yield ".".join(map(str, current))
for _ in range(1, self.count):
current[3] += 1
for i in range(3, 0, -1):
if current[i] >= 256:
current[i - 1] += 1
current[i] = 0
if current[0] > 255: raise Exception()
yield ".".join(map(str, current))
def __eq__(self, other):
if isinstance(other, CIDR):
return self.ip == other.ip and self.mask == other.mask
return False
class ScanThread(threading.Thread):
def __init__(self, tid, cidr, blacklist, simulate):
super().__init__()
self.tid = tid
self.cidr = cidr
self.blacklist = list(blacklist)
self.simulate = simulate
self.progress = 0
def run(self) -> None:
prevBl = None
expected = 0
startIp = None
if self.simulate:
print(f"Thread {self.tid}: started")
print(f"Thread {self.tid}: scanning {self.cidr} with blacklist [{', '.join(map(str, self.blacklist))}]")
for ip in self.cidr:
self.progress += 1
for cidr in self.blacklist:
if ip in cidr:
if prevBl != cidr:
prevBl = cidr
if self.simulate:
print(f"Thread {self.tid}: skipping {cidr}")
startIp = None
expected += cidr.count
break
else:
expected += 1
if expected != self.progress:
raise Exception(f"Thread {self.tid}: expected {expected} from skipping {prevBl} got {self.progress}")
if self.simulate:
if startIp is None:
startIp = ip
print(f"Thread {self.tid}: starting scan at {startIp}")
else:
for p in port:
# noinspection PyBroadException
try:
server = JavaServer(ip, p)
status = server.status()
except:
continue
else:
with lock:
with open(output, "a") as f:
desc = status.description.replace('\n', '\\n')
f.write(f"{ip}:{p},{status.players.online}/{status.players.max},\"{desc}\",\"{status.version.name}\",{status.version.protocol},\"{status.favicon}\"\n")
print(f"Thread {self.tid}: finished")
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--cidr", "-c", type=str, help="scan all in cidr(s) (comma separated)")
parser.add_argument("--cidrFile", "-f", type=str, help="scan all in cidrs")
parser.add_argument("--blacklist", "-b", type=str, help="blacklist file")
parser.add_argument("--parallelism", "-p", type=int, default=4, help="2^n = number of threads (per cidr)")
parser.add_argument("--output", "-o", type=str, default="output.txt", help="output file")
parser.add_argument("--port", "-P", type=str, default="25565", help="port(s) to scan (comma separated) dont do too many, each pings each server up to 3 times")
parser.add_argument("--simulate", "-s", action="store_true", help="simulate scan")
args = parser.parse_args()
cidr = args.cidr
cidrFile = args.cidrFile
cidrs = []
if cidr is not None:
cidrs += cidr.split(',')
if cidrFile is not None:
with open(cidrFile, "r") as f:
cidrs += filter(lambda x: not x.strip().startswith("#") and not x.strip() == "", f.readlines())
cidrs = [CIDR(cidr.strip()) for cidr in cidrs]
blacklistFile = args.blacklist
blacklist = []
if blacklistFile is not None:
with open(blacklistFile, "r") as f:
blacklist += filter(lambda x: not x.strip().startswith("#") and not x.strip() == "", f.readlines())
blacklist = [CIDR(cidr.strip()) for cidr in blacklist]
parallelism = args.parallelism
assert parallelism > 0 and parallelism <= 32, "Invalid parallelism"
output = args.output
with open(output, "w") as f:
f.write("ip:port,playerCounts,motd,version,protocol,icon\n")
port = args.port
port = map(lambda x: int(x.strip()), port.split(','))
for p in port:
assert p > 0 and p <= 65535, f"Invalid port {p}"
count = sum([cidr.count for cidr in cidrs])
for _ in range(parallelism):
newCidrs = []
for cidr in cidrs:
newCidrs.extend(cidr.split())
cidrs = newCidrs
simulate = args.simulate or False
if simulate:
print(f"Simulate ", end="")
confirm = input(f"Scan {count} IPs in {len(cidrs)} Threads [y/n]: ")
if confirm.lower() != "y":
exit(0)
threads = []
for i, cidr in enumerate(cidrs):
threads.append(ScanThread(i, cidr, filter(lambda x: x.overlaps(cidr), blacklist), simulate))
for thread in threads:
if simulate:
thread.run()
else:
thread.start()
# progress bar
done = False
while not done:
done = any([thread.is_alive() for thread in threads])
progress = sum([thread.progress for thread in threads])
try:
width = os.get_terminal_size().columns - 2
except:
width = 80
ratio = f"{progress}/{count}"
width -= len(ratio) + 2
ratio_num = progress / count
ratio_bar = int(ratio_num * width)
print(f"[{'=' * max(0, ratio_bar - 1)}{'>' * (ratio_bar and 1 or 0)}{' ' * (width - ratio_bar)}] {ratio}", end='\r')
time.sleep(5)
print("Done")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment