Created
August 8, 2022 09:27
-
-
Save lockness-Ko/1630d91677e5d3968cc4d282d330626b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
""" | |
Credit to live overflow for most of the boilerplate. I've just added a command interface, auth and some more stuff | |
""" | |
import json | |
from xmlrpc.client import ProtocolError | |
import requests | |
from twisted.python import failure | |
import struct, math, time | |
from twisted.internet import reactor | |
from quarry.types.uuid import UUID | |
from quarry.net.proxy import UpstreamFactory, Upstream, DownstreamFactory, Downstream, Bridge | |
from quarry.net import auth, crypto | |
from twisted.internet import reactor | |
class MyUpstream(Upstream): | |
def packet_login_encryption_request(self, buff): | |
p_server_id = buff.unpack_string() | |
# 1.7.x | |
if self.protocol_version <= 5: | |
def unpack_array(b): return b.read(b.unpack('h')) | |
# 1.8.x | |
else: | |
def unpack_array(b): return b.read(b.unpack_varint(max_bits=16)) | |
p_public_key = unpack_array(buff) | |
p_verify_token = unpack_array(buff) | |
if not self.factory.profile.online: | |
raise ProtocolError("Can't log into online-mode server while using" | |
" offline profile") | |
self.shared_secret = crypto.make_shared_secret() | |
self.public_key = crypto.import_public_key(p_public_key) | |
self.verify_token = p_verify_token | |
# make digest | |
digest = crypto.make_digest( | |
p_server_id.encode('ascii'), | |
self.shared_secret, | |
p_public_key) | |
# do auth | |
# deferred = self.factory.profile.join(digest) | |
# deferred.addCallbacks(self.auth_ok, self.auth_failed) | |
url = "https://sessionserver.mojang.com/session/minecraft/join" | |
payload = json.dumps({ | |
"accessToken": self.factory.profile.access_token, | |
"selectedProfile": self.factory.profile.uuid.to_hex(False), | |
"serverId": digest | |
}) | |
headers = { | |
'Content-Type': 'application/json' | |
} | |
r = requests.request( | |
"POST", "https://sessionserver.mojang.com/session/minecraft/join", headers=headers, data=payload) | |
if r.status_code == 204: | |
self.auth_ok(r.text) | |
else: | |
self.auth_failed(failure.Failure( | |
auth.AuthException('unverified', 'unverified username'))) | |
class MyDownstream(Downstream): | |
def packet_login_encryption_response(self, buff): | |
if self.login_expecting != 1: | |
raise ProtocolError("Out-of-order login") | |
# 1.7.x | |
if self.protocol_version <= 5: | |
def unpack_array(b): return b.read(b.unpack('h')) | |
# 1.8.x | |
else: | |
def unpack_array(b): return b.read(b.unpack_varint(max_bits=16)) | |
p_shared_secret = unpack_array(buff) | |
p_verify_token = unpack_array(buff) | |
shared_secret = crypto.decrypt_secret( | |
self.factory.keypair, | |
p_shared_secret) | |
verify_token = crypto.decrypt_secret( | |
self.factory.keypair, | |
p_verify_token) | |
self.login_expecting = None | |
if verify_token != self.verify_token: | |
raise ProtocolError("Verify token incorrect") | |
# enable encryption | |
self.cipher.enable(shared_secret) | |
self.logger.debug("Encryption enabled") | |
# make digest | |
digest = crypto.make_digest( | |
self.server_id.encode('ascii'), | |
shared_secret, | |
self.factory.public_key) | |
# do auth | |
remote_host = None | |
if self.factory.prevent_proxy_connections: | |
remote_host = self.remote_addr.host | |
# deferred = auth.has_joined( | |
# self.factory.auth_timeout, | |
# digest, | |
# self.display_name, | |
# remote_host) | |
# deferred.addCallbacks(self.auth_ok, self.auth_failed) | |
r = requests.get('https://sessionserver.mojang.com/session/minecraft/hasJoined', | |
params={'username': self.display_name, 'serverId': digest, 'ip': remote_host}) | |
if r.status_code == 200: | |
self.auth_ok(r.json()) | |
else: | |
self.auth_failed(failure.Failure( | |
auth.AuthException('invalid', 'invalid session'))) | |
class MyUpstreamFactory(UpstreamFactory): | |
protocol = MyUpstream | |
connection_timeout = 10 | |
class MyBridge(Bridge): | |
quiet_mode = False | |
upstream_factory_class = MyUpstreamFactory | |
prev_pos, prev_look = None, None | |
annoyers = [] | |
fly = False | |
inc = 0 | |
def write_chat(self, text, direction): | |
if direction == "upstream": | |
return self.buff_type.pack_string(text) | |
elif direction == "downstream": | |
data = self.buff_type.pack_chat(text) | |
# 1.8.x+ | |
if self.downstream.protocol_version >= 47: | |
data += self.buff_type.pack('B', 0) | |
# 1.16.x+ | |
if self.downstream.protocol_version >= 736: | |
data += self.buff_type.pack_uuid(UUID(int=0)) | |
return data | |
#self.upstream.send_packet("chat_message", self.buff_type.pack_string(f"Hello {i}")) | |
def packet_downstream_chat_message(self, buff): | |
chat_message = self.read_chat(buff, "downstream") | |
try: | |
parsed = chat_message.split(".text{")[1][:-1].split(", ") | |
print(f"<{parsed[0]}> {parsed[1]}") | |
if parsed[0] in self.annoyers: | |
self.upstream.send_packet("chat_message", self.buff_type.pack_string(parsed[1])) | |
except: | |
pass | |
buff.restore() | |
self.downstream.send_packet("chat_message", buff.read()) | |
def read_chat(self, buff, direction): | |
buff.save() | |
if direction == "upstream": | |
p_text = buff.unpack_string() | |
return p_text | |
elif direction == "downstream": | |
p_text = str(buff.unpack_chat()) | |
p_position = 0 | |
p_sender = None | |
# 1.8.x+ | |
if self.upstream.protocol_version >= 47: | |
p_position = buff.unpack('B') | |
# 1.16.x+ | |
if self.upstream.protocol_version >= 736: | |
p_sender = buff.unpack_uuid() | |
if p_position in (0, 1): | |
return p_text | |
def packet_upstream_chat_message(self, buff): | |
buff.save() | |
chat_message = buff.unpack_string() | |
print(f"> {chat_message}") | |
if chat_message.startswith(";"): | |
parsed = chat_message[1:].split(" ") | |
if parsed[0] == "annoy": | |
self.annoyers.append(chat_message.split(" ")[1]) | |
elif parsed[0] == "tp": | |
distance = parsed[1] | |
flags = 0 | |
teleport = 0 | |
dismount = 0 | |
x, y, z, ground = self.prev_pos | |
yaw, pitch, ground = self.prev_look | |
f = pitch * 0.017453292 | |
g = -yaw * 0.017453292 | |
h = math.cos(g) | |
i = math.sin(g) | |
j = math.cos(f) | |
k = math.sin(f) | |
_x = i*j | |
_y = -k | |
_z = h*j | |
x += _x * float(distance) | |
y += _y * float(distance) | |
z += _z * float(distance) | |
buf = struct.pack('>dddffBBB', x, y, z, yaw, pitch, flags, teleport, dismount) | |
self.downstream.send_packet('player_position_and_look', buf) | |
elif parsed[0] == "fly": | |
if self.fly == False: | |
self.fly = True | |
else: | |
self.fly = False | |
elif parsed[0] == "help": | |
self.downstream.send_packet("chat_message", self.write_chat("lproxy v1.0", "downstream")) | |
self.downstream.send_packet("chat_message", self.write_chat(";tp <x> - teleport forward x blocks", "downstream")) | |
self.downstream.send_packet("chat_message", self.write_chat(";fly - self explanitory", "downstream")) | |
self.downstream.send_packet("chat_message", self.write_chat(";annoy - annoy someone by copying everything they say", "downstream")) | |
else: | |
buff.restore() | |
self.upstream.send_packet("chat_message", buff.read()) | |
def packet_upstream_player_position(self, buff): | |
buff.save() | |
x, y, z, ground = struct.unpack('>dddB', buff.read()) | |
print(f"[*] player_position {x} / {y} / {z} | {ground}") | |
if self.fly and self.inc % 2 == 0: | |
distance = "1" | |
flags = 0 | |
teleport = 0 | |
dismount = 0 | |
x, y, z, ground = self.prev_pos | |
yaw, pitch, ground = self.prev_look | |
pitch = 0 | |
f = pitch * 0.017453292 | |
g = -yaw * 0.017453292 | |
h = math.cos(g) | |
i = math.sin(g) | |
j = math.cos(f) | |
k = math.sin(f) | |
_x = i*j | |
_y = -k | |
_z = h*j | |
x += 0 * float(distance) | |
y += 1 * float(distance) | |
z += 0 * float(distance) | |
buf = struct.pack('>dddffBBB', x, y, z, yaw, pitch, flags, teleport, dismount) | |
self.downstream.send_packet('player_position_and_look', buf) | |
self.inc += 1 | |
self.prev_pos = (x, y, z, ground) | |
buf = struct.pack('>dddB', x, y, z, ground) | |
self.upstream.send_packet('player_position', buf) | |
def packet_upstream_player_look(self, buff): | |
buff.save() | |
yaw, pitch, ground = struct.unpack('>ffB', buff.read()) | |
print(f"[*] player_look {yaw} / {pitch} | {ground}") | |
self.prev_look = (yaw, pitch, ground) | |
buf = struct.pack('>ffB', yaw, pitch, ground) | |
self.upstream.send_packet('player_look', buf) | |
def packet_upstream_player_position_and_look(self, buff): | |
buff.save() | |
x, y, z, yaw, pitch, ground = struct.unpack('>dddffB', buff.read()) | |
print(f"[*] player_position_and_look {x} / {y} / {z}, {yaw}, {pitch} | {ground}") | |
self.prev_pos = (x, y, z, ground) | |
self.prev_look = (yaw, pitch, ground) | |
buf = struct.pack('>dddffB', x, y, z, yaw, pitch, ground) | |
self.upstream.send_packet('player_position_and_look', buf) | |
def make_profile(self): | |
""" | |
Support online mode | |
""" | |
# follow: https://kqzz.github.io/mc-bearer-token/ | |
accessToken = 'YOUR_TOKEN_HERE' | |
url = "https://api.minecraftservices.com/minecraft/profile" | |
headers = {'Authorization': 'Bearer ' + accessToken} | |
response = requests.request("GET", url, headers=headers) | |
result = response.json() | |
myUuid = UUID.from_hex(result['id']) | |
myUsername = result['name'] | |
return auth.Profile('(skip)', accessToken, myUsername, myUuid) | |
class MyDownstreamFactory(DownstreamFactory): | |
protocol = MyDownstream | |
bridge_class = MyBridge | |
motd = "lproxy" | |
max_players = 1 | |
def main(argv): | |
# Parse options | |
import argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-a1", "--listen-host1", default="", | |
help="address to listen on") | |
parser.add_argument("-p1", "--listen-port1", default=25566, | |
type=int, help="port to listen on") | |
parser.add_argument("-b", "--connect-host", | |
default="127.0.0.1", help="address to connect to") | |
parser.add_argument("-q", "--connect-port", default=25565, | |
type=int, help="port to connect to") | |
args = parser.parse_args(argv) | |
# Create factory | |
factory = MyDownstreamFactory() | |
factory.connect_host = args.connect_host | |
factory.connect_port = args.connect_port | |
# Listen | |
factory.listen(args.listen_host1, args.listen_port1) | |
reactor.run() | |
if __name__ == "__main__": | |
import sys | |
main(sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment