Last active
June 29, 2023 20:22
-
-
Save wagyourtail/dc0e703ca5de4db42f3782a21837fa7f to your computer and use it in GitHub Desktop.
MCServerScraper.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# MIT License | |
# | |
# Copyright (c) 2022 Wagyourtail | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in all | |
# copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
# SOFTWARE. | |
import argparse | |
import os | |
import threading | |
import time | |
from mcstatus import JavaServer | |
lock = threading.Lock() | |
def compareIp(ip1, ip2): | |
for i in range(4): | |
if ip1[i] < ip2[i]: | |
return -1 | |
elif ip1[i] > ip2[i]: | |
return 1 | |
return 0 | |
class CIDR: | |
def __init__(self, cidrString): | |
cidr = cidrString.split('/') | |
cidr[0] = list(map(int, cidr[0].split('.'))) | |
self.ip = cidr[0] | |
self.mask = int(cidr[1] if len(cidr) > 1 else 32) | |
assert all(0 <= i <= 255 for i in self.ip) and 0 <= self.mask <= 32 | |
self.count = 2 ** (32 - self.mask) | |
def __str__(self): | |
return ".".join(map(str, self.ip)) + "/" + str(self.mask) | |
def min(self): | |
return self.ip | |
def max(self): | |
ip = self.ip.copy() | |
if self.mask == 32: return ip | |
ip[self.mask // 8] += 2 ** (8 - self.mask % 8) | |
return ip | |
def __contains__(self, item): | |
if isinstance(item, str): | |
item = CIDR(item) | |
if self.mask > item.mask: return False | |
# compare min | |
itemMin = item.min() | |
selfMin = self.min() | |
for i in range(4): | |
if itemMin[i] < selfMin[i]: return False | |
if itemMin[i] > selfMin[i]: break | |
# compare max | |
itemMax = item.max() | |
selfMax = self.max() | |
for i in range(4): | |
if itemMax[i] > selfMax[i]: return False | |
if itemMax[i] < selfMax[i]: break | |
return True | |
def overlaps(self, other): | |
if isinstance(other, str): | |
other = CIDR(other) | |
itemMin = other.min() | |
selfMin = self.min() | |
itemMax = other.max() | |
selfMax = self.max() | |
if compareIp(itemMin, selfMax) != 1: | |
return compareIp(itemMax, selfMin) != -1 | |
elif compareIp(itemMax, selfMin) != -1: | |
return compareIp(itemMin, selfMax) != 1 | |
return False | |
def split(self): | |
if self.mask == 32: return [self] | |
second = self.ip.copy() | |
second[self.mask // 8] += 2 ** (7 - self.mask % 8) | |
mask = self.mask + 1 | |
return [CIDR(".".join(map(str, self.ip)) + "/" + str(mask)), CIDR(".".join(map(str, second)) + "/" + str(mask))] | |
def __iter__(self): | |
current = self.ip.copy() | |
yield ".".join(map(str, current)) | |
for _ in range(1, self.count): | |
current[3] += 1 | |
for i in range(3, 0, -1): | |
if current[i] >= 256: | |
current[i - 1] += 1 | |
current[i] = 0 | |
if current[0] > 255: raise Exception() | |
yield ".".join(map(str, current)) | |
def __eq__(self, other): | |
if isinstance(other, CIDR): | |
return self.ip == other.ip and self.mask == other.mask | |
return False | |
class ScanThread(threading.Thread): | |
def __init__(self, tid, cidr, blacklist, simulate): | |
super().__init__() | |
self.tid = tid | |
self.cidr = cidr | |
self.blacklist = list(blacklist) | |
self.simulate = simulate | |
self.progress = 0 | |
def run(self) -> None: | |
prevBl = None | |
expected = 0 | |
startIp = None | |
if self.simulate: | |
print(f"Thread {self.tid}: started") | |
print(f"Thread {self.tid}: scanning {self.cidr} with blacklist [{', '.join(map(str, self.blacklist))}]") | |
for ip in self.cidr: | |
self.progress += 1 | |
for cidr in self.blacklist: | |
if ip in cidr: | |
if prevBl != cidr: | |
prevBl = cidr | |
if self.simulate: | |
print(f"Thread {self.tid}: skipping {cidr}") | |
startIp = None | |
expected += cidr.count | |
break | |
else: | |
expected += 1 | |
if expected != self.progress: | |
raise Exception(f"Thread {self.tid}: expected {expected} from skipping {prevBl} got {self.progress}") | |
if self.simulate: | |
if startIp is None: | |
startIp = ip | |
print(f"Thread {self.tid}: starting scan at {startIp}") | |
else: | |
for p in port: | |
# noinspection PyBroadException | |
try: | |
server = JavaServer(ip, p) | |
status = server.status() | |
except: | |
continue | |
else: | |
with lock: | |
with open(output, "a") as f: | |
desc = status.description.replace('\n', '\\n') | |
f.write(f"{ip}:{p},{status.players.online}/{status.players.max},\"{desc}\",\"{status.version.name}\",{status.version.protocol},\"{status.favicon}\"\n") | |
print(f"Thread {self.tid}: finished") | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--cidr", "-c", type=str, help="scan all in cidr(s) (comma separated)") | |
parser.add_argument("--cidrFile", "-f", type=str, help="scan all in cidrs") | |
parser.add_argument("--blacklist", "-b", type=str, help="blacklist file") | |
parser.add_argument("--parallelism", "-p", type=int, default=4, help="2^n = number of threads (per cidr)") | |
parser.add_argument("--output", "-o", type=str, default="output.txt", help="output file") | |
parser.add_argument("--port", "-P", type=str, default="25565", help="port(s) to scan (comma separated) dont do too many, each pings each server up to 3 times") | |
parser.add_argument("--simulate", "-s", action="store_true", help="simulate scan") | |
args = parser.parse_args() | |
cidr = args.cidr | |
cidrFile = args.cidrFile | |
cidrs = [] | |
if cidr is not None: | |
cidrs += cidr.split(',') | |
if cidrFile is not None: | |
with open(cidrFile, "r") as f: | |
cidrs += filter(lambda x: not x.strip().startswith("#") and not x.strip() == "", f.readlines()) | |
cidrs = [CIDR(cidr.strip()) for cidr in cidrs] | |
blacklistFile = args.blacklist | |
blacklist = [] | |
if blacklistFile is not None: | |
with open(blacklistFile, "r") as f: | |
blacklist += filter(lambda x: not x.strip().startswith("#") and not x.strip() == "", f.readlines()) | |
blacklist = [CIDR(cidr.strip()) for cidr in blacklist] | |
parallelism = args.parallelism | |
assert parallelism > 0 and parallelism <= 32, "Invalid parallelism" | |
output = args.output | |
with open(output, "w") as f: | |
f.write("ip:port,playerCounts,motd,version,protocol,icon\n") | |
port = args.port | |
port = map(lambda x: int(x.strip()), port.split(',')) | |
for p in port: | |
assert p > 0 and p <= 65535, f"Invalid port {p}" | |
count = sum([cidr.count for cidr in cidrs]) | |
for _ in range(parallelism): | |
newCidrs = [] | |
for cidr in cidrs: | |
newCidrs.extend(cidr.split()) | |
cidrs = newCidrs | |
simulate = args.simulate or False | |
if simulate: | |
print(f"Simulate ", end="") | |
confirm = input(f"Scan {count} IPs in {len(cidrs)} Threads [y/n]: ") | |
if confirm.lower() != "y": | |
exit(0) | |
threads = [] | |
for i, cidr in enumerate(cidrs): | |
threads.append(ScanThread(i, cidr, filter(lambda x: x.overlaps(cidr), blacklist), simulate)) | |
for thread in threads: | |
if simulate: | |
thread.run() | |
else: | |
thread.start() | |
# progress bar | |
done = False | |
while not done: | |
done = any([thread.is_alive() for thread in threads]) | |
progress = sum([thread.progress for thread in threads]) | |
try: | |
width = os.get_terminal_size().columns - 2 | |
except: | |
width = 80 | |
ratio = f"{progress}/{count}" | |
width -= len(ratio) + 2 | |
ratio_num = progress / count | |
ratio_bar = int(ratio_num * width) | |
print(f"[{'=' * max(0, ratio_bar - 1)}{'>' * (ratio_bar and 1 or 0)}{' ' * (width - ratio_bar)}] {ratio}", end='\r') | |
time.sleep(5) | |
print("Done") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment