Created
October 25, 2011 17:48
-
-
Save RobFreiburger/1313630 to your computer and use it in GitHub Desktop.
Battlefield 3 RCON "Kick Some Pubbies" Script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# "Kick Some Pubbies" script for BF3 server admins | |
# Script is a modified version of EA DICE's CommandConsole.py from BFBC2 | |
# Confirmed working on Battlefield 3 servers on launch day, 10/25/11 | |
# Go to the configurable variables section to modify to your needs. | |
# Made to retrieve whitelist from a URL, can be easily changed to local file. See getWhitelist() function. | |
from struct import * | |
import binascii | |
import socket | |
import sys | |
import shlex | |
import string | |
import threading | |
import md5 | |
import os | |
import urllib2 | |
import random | |
import time | |
# Begin code written by EA DICE | |
def EncodeHeader(isFromServer, isResponse, sequence): | |
header = sequence & 0x3fffffff | |
if isFromServer: | |
header += 0x80000000 | |
if isResponse: | |
header += 0x40000000 | |
return pack('<I', header) | |
def DecodeHeader(data): | |
[header] = unpack('<I', data[0 : 4]) | |
return [header & 0x80000000, header & 0x40000000, header & 0x3fffffff] | |
def EncodeInt32(size): | |
return pack('<I', size) | |
def DecodeInt32(data): | |
return unpack('<I', data[0 : 4])[0] | |
def EncodeWords(words): | |
size = 0 | |
encodedWords = '' | |
for word in words: | |
strWord = str(word) | |
encodedWords += EncodeInt32(len(strWord)) | |
encodedWords += strWord | |
encodedWords += '\x00' | |
size += len(strWord) + 5 | |
return size, encodedWords | |
def DecodeWords(size, data): | |
numWords = DecodeInt32(data[0:]) | |
words = [] | |
offset = 0 | |
while offset < size: | |
wordLen = DecodeInt32(data[offset : offset + 4]) | |
word = data[offset + 4 : offset + 4 + wordLen] | |
words.append(word) | |
offset += wordLen + 5 | |
return words | |
def EncodePacket(isFromServer, isResponse, sequence, words): | |
encodedHeader = EncodeHeader(isFromServer, isResponse, sequence) | |
encodedNumWords = EncodeInt32(len(words)) | |
[wordsSize, encodedWords] = EncodeWords(words) | |
encodedSize = EncodeInt32(wordsSize + 12) | |
return encodedHeader + encodedSize + encodedNumWords + encodedWords | |
# Decode a request or response packet | |
# Return format is: | |
# [isFromServer, isResponse, sequence, words] | |
# where | |
# isFromServer = the command in this command/response packet pair originated on the server | |
# isResponse = True if this is a response, False otherwise | |
# sequence = sequence number | |
# words = list of words | |
def DecodePacket(data): | |
[isFromServer, isResponse, sequence] = DecodeHeader(data) | |
wordsSize = DecodeInt32(data[4:8]) - 12 | |
words = DecodeWords(wordsSize, data[12:]) | |
return [isFromServer, isResponse, sequence, words] | |
############################################################################### | |
# Encode a request packet | |
def EncodeClientRequest(words): | |
global clientSequenceNr | |
packet = EncodePacket(False, False, clientSequenceNr, words) | |
clientSequenceNr = (clientSequenceNr + 1) & 0x3fffffff | |
return packet | |
# Encode a response packet | |
def EncodeClientResponse(sequence, words): | |
return EncodePacket(True, True, sequence, words) | |
################################################################################### | |
def containsCompletePacket(data): | |
if len(data) < 8: | |
return False | |
if len(data) < DecodeInt32(data[4:8]): | |
return False | |
return True | |
# Wait until the local receive buffer contains a full packet (appending data from the network socket), | |
# then split receive buffer into first packet and remaining buffer data | |
def receivePacket(socket, receiveBuffer): | |
while not containsCompletePacket(receiveBuffer): | |
receiveBuffer += socket.recv(4096) | |
packetSize = DecodeInt32(receiveBuffer[4:8]) | |
packet = receiveBuffer[0:packetSize] | |
receiveBuffer = receiveBuffer[packetSize:len(receiveBuffer)] | |
return [packet, receiveBuffer] | |
################################################################################### | |
# Display contents of packet in user-friendly format, useful for debugging purposes | |
def printPacket(packet): | |
if (packet[0]): | |
print "IsFromServer, ", | |
else: | |
print "IsFromClient, ", | |
if (packet[1]): | |
print "Response, ", | |
else: | |
print "Request, ", | |
print "Sequence: " + str(packet[2]), | |
if packet[3]: | |
print " Words:", | |
for word in packet[3]: | |
print "\"" + word + "\"", | |
print "" | |
################################################################################### | |
def generatePasswordHash(salt, password): | |
m = md5.new() | |
m.update(salt) | |
m.update(password) | |
return m.digest() | |
# End code written by EA DICE | |
def getWhitelist(whitelistURL): | |
theurl = whitelistURL | |
pagehandle = urllib2.urlopen(theurl) | |
unfilteredWhitelist = pagehandle.read().splitlines() | |
filteredWhitelist = [] | |
for person in unfilteredWhitelist: filteredWhitelist.append(person.lower()) | |
return filteredWhitelist | |
# Begin configurable variables | |
servers = [ {'name':'server1', 'ip':'server1.example.com', 'port':12345, 'password':'notAPassword', 'maxPlayers':64}, | |
{'name':'server2', 'ip':'server2.example.com', 'port':12345, 'password':'badPassword', 'maxPlayers':32} ] | |
pubbiesToKick = 3 | |
whitelist = getWhitelist('http://whitelist.example.com') | |
# End configurable variables | |
for server in servers: | |
clientSequenceNr = 0 | |
serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
serverSocket.connect( (server['ip'], server['port']) ) | |
serverSocket.setblocking(1) | |
print 'Connecting to ' + server['name'] | |
# Retrieve this connection's 'salt' (magic value used when encoding password) from server | |
getPasswordSaltRequest = EncodeClientRequest( [ "login.hashed" ] ) | |
serverSocket.send(getPasswordSaltRequest) | |
receiveBuffer = "" | |
[getPasswordSaltResponse, receiveBuffer] = receivePacket(serverSocket, receiveBuffer) | |
[isFromServer, isResponse, sequence, words] = DecodePacket(getPasswordSaltResponse) | |
# if the server doesn't understand "login.hashed" command, abort | |
if words[0] != "OK": | |
sys.exit(3) # error | |
# Given the salt and the password, combine them and compute hash value | |
salt = words[1].decode("hex") | |
passwordHash = generatePasswordHash(salt, server['password']) | |
passwordHashHexString = string.upper(passwordHash.encode("hex")) | |
loginRequest = EncodeClientRequest( [ "login.hashed", passwordHashHexString ] ) | |
serverSocket.send(loginRequest) | |
[loginResponse, receiveBuffer] = receivePacket(serverSocket, receiveBuffer) | |
[isFromServer, isResponse, sequence, words] = DecodePacket(loginResponse) | |
# if the server didn't like our password, abort | |
if words[0] != "OK": | |
sys.exit(3) # error | |
### | |
# Get players currently on server | |
### | |
command = 'admin.listPlayers all' | |
words = shlex.split(command) | |
# Send request to server on command channel | |
request = EncodeClientRequest(words) | |
serverSocket.send(request) | |
# Wait for response from server | |
[packet, receiveBuffer] = receivePacket(serverSocket, receiveBuffer) | |
[isFromServer, isResponse, sequence, words] = DecodePacket(packet) | |
rconResponse = words | |
playersOnServer = [rconResponse[i].lower() for i in range(10, len(rconResponse), 7)] | |
# rcon's vars.maxPlayers doesn't work! YAY! | |
if len(playersOnServer) < (server['maxPlayers'] - pubbiesToKick): | |
print "Server is not full. Server has " + str(len(playersOnServer)) + " players on it." | |
continue | |
# Compare players on server to whitelist | |
# Make list of pubbies | |
whitelist = getWhitelist() | |
pubbies = set(playersOnServer).difference(set(whitelist)) | |
if len(pubbies) == 0: | |
print "Server has no pubbies." | |
continue | |
# randomly select x pubbies to kick | |
if len(pubbies) > pubbiesToKick: | |
pubbies = random.sample(pubbies, pubbiesToKick) | |
command = 'admin.say "' + str(pubbiesToKick) + ' random pubbies will be kicked in 5 seconds. bye!" all' | |
words = shlex.split(command) | |
request = EncodeClientRequest(words) | |
serverSocket.send(request) | |
time.sleep(5) | |
for pubby in pubbies: | |
command = 'admin.kickPlayer "' + pubby + '" "you lose"' | |
words = shlex.split(command) | |
request = EncodeClientRequest(words) | |
serverSocket.send(request) | |
[packet, receiveBuffer] = receivePacket(serverSocket, receiveBuffer) | |
[isFromServer, isResponse, sequence, words] = DecodePacket(packet) | |
if words[0] != "OK": | |
print "Error kicking pubby: " + pubby | |
print words | |
command = 'admin.say "' + pubby + ' won the contest!" all' | |
words = shlex.split(command) | |
request = EncodeClientRequest(words) | |
serverSocket.send(request) | |
[packet, receiveBuffer] = receivePacket(serverSocket, receiveBuffer) | |
[isFromServer, isResponse, sequence, words] = DecodePacket(packet) | |
if words[0] != "OK": | |
print "Error announcing kick: " | |
print words | |
print "Kicked " + str(len(pubbies)) + " pubbies." |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment