logs fivem events externally by listening into the traffic between server and client
because enet's protocol doesn't have any sort of encryption, it's simply possible to just look at what udp data is sent over the wire.
fun :)
import scapy.all as scapy | |
import struct | |
import msgpack | |
from typing import List | |
import time | |
import traceback | |
ENET_PROTOCOL_HEADER_SESSION_MASK = (3 << 12) | |
ENET_PROTOCOL_HEADER_SESSION_SHIFT = 12 | |
ENET_PROTOCOL_HEADER_FLAG_COMPRESSED = (1 << 14) | |
ENET_PROTOCOL_HEADER_FLAG_SENT_TIME = (1 << 15) | |
ENET_PROTOCOL_HEADER_FLAG_MASK = ENET_PROTOCOL_HEADER_FLAG_COMPRESSED | ENET_PROTOCOL_HEADER_FLAG_SENT_TIME | |
def print_func(packet: scapy.Packet): | |
if not scapy.UDP in packet or not scapy.Raw in packet: | |
return | |
payload = packet[scapy.Raw].load | |
restPayload = payload[4:] | |
try: | |
while len(restPayload) >= 10: | |
cmdNum = struct.unpack('>BBH', restPayload[:4])[0] & 0x0F # 0x0F = mask | |
restPayload = restPayload[4:] | |
if cmdNum == 6: | |
dataLength = struct.unpack(">H", restPayload[:2])[0] | |
data = restPayload[2:] | |
restPayload = restPayload[dataLength+2:] | |
msgType = struct.unpack("<L", data[:4])[0] | |
if msgType == joaat("msgServerEvent"): | |
nameLength = struct.unpack("<H", data[4:6])[0] | |
name = data[6:6+nameLength-1].decode('utf-8') | |
msgpacked = data[6+nameLength:dataLength] | |
unpacked = msgpack.unpackb(msgpacked) | |
print(f"C2S: {name}, {unpacked}") | |
elif msgType == joaat("msgNetEvent"): | |
nameLength = struct.unpack("<HH", data[4:8])[1] | |
name = data[8:8+nameLength-1].decode('utf-8') | |
msgpacked = data[8+nameLength:dataLength] | |
unpacked = msgpack.unpackb(msgpacked, strict_map_key=False) | |
print(f"S2C: {name}, {unpacked}") | |
except: | |
print(f"Failed decoding packet: {traceback.format_exc()} {restPayload.hex()}") | |
pass | |
def main(): | |
sniffers: List[scapy.AsyncSniffer] = [] | |
for interface in scapy.get_working_ifaces(): | |
s = scapy.AsyncSniffer(filter="udp", prn=print_func, iface=interface) | |
s.start() | |
sniffers.append(s) | |
# this is stupid, but it works | |
try: | |
while True: | |
time.sleep(1) | |
except KeyboardInterrupt: | |
print("Stopping sniffers") | |
for sniffer in sniffers: | |
sniffer.stop() | |
# stole this from somewhere, can't remember | |
def joaat(keyString: str): | |
hash = 0 | |
for char in keyString: | |
hash += ord(char.encode("utf-8")) | |
hash &= 0xFFFFFFFF | |
hash += hash << 10 | |
hash &= 0xFFFFFFFF | |
hash ^= hash >> 6 | |
hash &= 0xFFFFFFFF | |
hash += hash << 3 | |
hash &= 0xFFFFFFFF | |
hash ^= hash >> 11 | |
hash &= 0xFFFFFFFF | |
hash += hash << 15 | |
hash &= 0xFFFFFFFF | |
return (hash & 4294967295) | |
if __name__ == '__main__': | |
main() |