Created
August 1, 2016 16:27
-
-
Save icedraco/8236cbd78e764f0ed0938f38152d082e to your computer and use it in GitHub Desktop.
Furcadia / BTA Inventory Check Script (exports `info of multiple characters into a CSV file)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
###--# BTA Inventory Check 1.0b | |
# This script was built to mass-check the inventory of many Furcadia characters | |
# and document the results. | |
# | |
# Author: Artex / IceDragon <[email protected]> | |
# | |
import re | |
import socket | |
import csv | |
# CSV output file | |
OUTPUT_CSV = "inventory.csv" | |
# Character list | |
# format: [(name1, pass1), (name2, pass2), ...] | |
CHARACTERS = [("BuggeTestAlt%04d" % n, "password") for n in range(1,31)] | |
# Furcadia server address | |
GAMESERVER_ADDR = ('lightbringer.furcadia.com', 6500) | |
# Adjustment: First line of `info starts with... | |
FIRST_LINE_PREFIX = "(Player info:" | |
### Regular expressions: | |
# (type, text) | |
RE_BASIC = re.compile('^\\(([^:]+): (.*)$') | |
# (groupname, level) | |
RE_ACCESS = re.compile('([^,]+), level (\d+)') | |
# (name, rest) | |
RE_EXTRA = re.compile('^([a-zA-Z0-9 ]+)[, ].(.*)$') | |
# (active, inactive, pending_review) | |
RE_PORTRAITS = re.compile('(\d+) active, (\d+) inactive, (\d+) waiting for review') | |
# (name, active, inactive) | |
RE_ITEM = re.compile('([^\\(]+) \\((\d+) active, (\d+) inactive\\)') | |
# (name, remaining) | |
RE_ITEM_STACK = re.compile('([a-zA-Z0-9 ]+) \\((\d+) remaining\\)') | |
# (name, rest) | |
RE_EFFECT = re.compile('^([a-zA-Z0-9 ]+) (.*)$') | |
# (gd, sd, cd, usd_with_symbols) | |
RE_SCALES = re.compile('(\d+) GD, (\d+) SD, (\d+) CD \\(total (.*) USD\\)') | |
# (exp_date_text) | |
RE_FURC_TS = re.compile("(\d{4}-\d{2}-\d{2}/[0-9:]+/\w{3})") | |
# (name, email) | |
RE_PLAYER_INFO = re.compile("^([^\\(]+) \\((.*)\\)") | |
###--# Functionality #--####################################################### | |
class ReportItem(object): | |
def __init__(self): | |
pass | |
def name(self): | |
return "???" | |
@staticmethod | |
def category(): | |
return "??" | |
class Access(ReportItem): | |
def __init__(self, text): | |
t = RE_ACCESS.findall(text)[0] | |
self.group = t[0].replace('|', ' ') | |
self.level = int(t[1]) | |
def __str__(self): | |
return "Access: %s, level %d" % (self.group, self.level) | |
def name(self): | |
return self.group | |
@staticmethod | |
def category(): | |
return "ACC" | |
class Extra(ReportItem): | |
def __init__(self, text): | |
t = RE_EXTRA.findall(text)[0] | |
self.item_name = t[0] | |
self.is_account = "(account)" in t[1] | |
self.is_4life = "#C" in t[1] or "/cache/6" in t[1] or "life" in t[1] | |
self.expire_date = get_expire_date(t[1]) | |
def __str__(self): | |
extra = "4life" if self.is_4life else self.expire_date | |
if not extra: | |
if self.is_account: | |
extra = "account" | |
else: | |
extra = "--" | |
return "Extra: %s (%s)" % (self.item_name, extra) | |
def name(self): | |
return self.item_name | |
@staticmethod | |
def category(): | |
return "EX" | |
class Portraits(ReportItem): | |
def __init__(self, text): | |
t = RE_PORTRAITS.findall(text)[0] | |
self.active = int(t[0]) | |
self.inactive = int(t[1]) | |
self.pending = int(t[2]) | |
def __str__(self): | |
return "Portraits: %d active, %d inactive, %d waiting for review" % (self.active, self.inactive, self.pending) | |
@staticmethod | |
def category(): | |
return "PORT" | |
class RegularPortraits(Portraits): | |
def __init__(self, text): | |
Portraits.__init__(self, text) | |
self.is_animated = False | |
def name(self): | |
return "Regular Portraits" | |
@staticmethod | |
def category(): | |
return Portraits.category() + "/REG" | |
class AnimatedPortraits(Portraits): | |
def __init__(self, text): | |
Portraits.__init__(self, text) | |
self.is_animated = True | |
def __str__(self): | |
return "Animated " + Portraits.__str__(self) | |
def name(self): | |
return "Animataed Portraits" | |
@staticmethod | |
def category(): | |
return Portraits.category() + "/ANIM" | |
class Item(ReportItem): | |
def __init__(self, text): | |
t = RE_ITEM.findall(text)[0] | |
self.item_name = t[0] | |
self.active = int(t[1]) | |
self.inactive = int(t[2]) | |
def __str__(self): | |
return "Item: %s (%d active, %d inactive)" % (self.item_name, self.active, self.inactive) | |
def name(self): | |
return self.item_name | |
@staticmethod | |
def category(): | |
return "ITEM" | |
class ItemStack(ReportItem): | |
def __init__(self, text): | |
t = RE_ITEM_STACK.findall(text)[0] | |
self.item_name = t[0] | |
self.remaining = int(t[1]) | |
def __str__(self): | |
return "Item Stack: %s (%d remaining)" % (self.item_name, self.remaining) | |
def name(self): | |
return self.item_name | |
@staticmethod | |
def category(): | |
return "STACK" | |
class ItemPlatter(ReportItem): | |
def __init__(self, text): | |
t = RE_ITEM_STACK.findall(text)[0] | |
self.item_name = t[0] | |
self.remaining = int(t[1]) | |
def __str__(self): | |
return "Item Platter: %s (%d remaining)" % (self.item_name, self.remaining) | |
def name(self): | |
return self.item_name | |
@staticmethod | |
def category(): | |
return "PLATTER" | |
class Effect(ReportItem): | |
def __init__(self, text): | |
t = RE_EFFECT.findall(text)[0] | |
self.item_name = t[0] | |
self.is_4life = "life" in t[1] | |
self.expire_date = get_expire_date(t[1]) | |
def __str__(self): | |
extra = "4life" if self.is_4life else self.expire_date | |
return "Extra: %s (%s)" % (self.item_name, extra) | |
def name(self): | |
return self.item_name | |
@staticmethod | |
def category(): | |
return "EFF" | |
class ScalesBalance(ReportItem): | |
def __init__(self, text): | |
t = RE_SCALES.findall(text)[0] | |
self.gd = int(t[0]) | |
self.sd = int(t[1]) | |
self.cd = int(t[2]) | |
self.usd = float(t[3][1:]) | |
def __str__(self): | |
data = ( | |
self.gd, self.sd, self.cd, | |
self.usd | |
) | |
return "DragonScales Balance: %d GD, %d SD, %d CD (total $%.2f USD)" % data | |
def name(self): | |
return "Balance" | |
@staticmethod | |
def category(): | |
return "DSC" | |
class InventoryGenerator(object): | |
# NOTE: THE KEYS MUST BE IN LOWERCASE! | |
ASSOCIATIONS = { | |
'access': Access, | |
'extra': Extra, | |
'portraits': RegularPortraits, | |
'animated portraits': AnimatedPortraits, | |
'item': Item, | |
'item stack': ItemStack, | |
'item platter': ItemPlatter, | |
'effect': Effect, | |
'dragonscales balance': ScalesBalance | |
} | |
# These types cna have more than 1 line to them: | |
MULTI_ITEM_TYPES = ['access', 'extra', 'item', 'item stack', 'item platter', 'effect'] | |
def __init__(self): | |
pass | |
def create(self, lines): | |
# Stage 1: Input deconstruction | |
# a. pass each line through RE_BASIC => (itemname, text) | |
# b. filter empty/irrelevant lines | |
def non_empty_list(re_list): return re_list != [] | |
# c. extract first element from re.findall() result | |
def first_element(re_list): return re_list[0] | |
# d. expand each relevant "text" part in (itemname, text) into a class | |
# using ASSOCIATIONS | |
def expand_item(item): | |
(itype, text) = item | |
itype = itype.lower() | |
if itype in self.ASSOCIATIONS: | |
text = self.ASSOCIATIONS[itype](text) | |
return (itype, text) | |
# all the stages above combined | |
items = map(expand_item, map(first_element, filter(non_empty_list, map(RE_BASIC.findall, lines)))) | |
# Stage 2: Collapsing | |
result = {} | |
multi_items = filter(lambda i: i[0] in self.MULTI_ITEM_TYPES, items) | |
other_items = filter(lambda i: i not in multi_items, items) | |
for item in other_items: | |
result[item[0]] = item[1] | |
for t in self.MULTI_ITEM_TYPES: | |
result[t] = map(lambda i: i[1], filter(lambda i: i[0] == t, multi_items)) | |
return Inventory(result) | |
class Inventory(object): | |
def __init__(self, inventory_dict): | |
self.inventory = inventory_dict | |
def name(self): | |
return RE_PLAYER_INFO.findall(self.inventory['player info'])[0][0] | |
def email(self): | |
return RE_PLAYER_INFO.findall(self.inventory['player info'])[0][1] | |
def uid(self): | |
return int(self.inventory['furcadia uid']) | |
def account_id(self): | |
return int(self.inventory['furcadia account id']) | |
def balance(self): | |
return self.inventory['dragonscales balance'] | |
def access(self, group=None): | |
access = [{a.group: a.level} for a in self.inventory['access']] | |
return access if not group else access[group.lower()] | |
def items(self, item_type): | |
t = item_type.lower() | |
if t in self.inventory: | |
return self.inventory[t] | |
else: | |
return [] | |
def dict(self): | |
result = { | |
'Name': self.name(), | |
'Email': self.email(), | |
'UID': self.uid(), | |
'Account ID': self.account_id(), | |
'Balance': self.balance().usd | |
} | |
# Access | |
for access in self.inventory['access']: | |
result[get_obj_field(access)] = access.level | |
# Portraits | |
for t in ['portraits', 'animated portraits']: | |
p = self.inventory.get(t) | |
if p: | |
result[get_field(p.category(), "Active")] = p.active | |
result[get_field(p.category(), "Inactive")] = p.inactive | |
result[get_field(p.category(), "Pending")] = p.pending | |
# Extra | |
for item in self.items('extra'): | |
extra = "4life" if item.is_4life else item.expire_date | |
if not extra: | |
if item.is_account: | |
extra = "account" | |
else: | |
extra = "+" | |
result[get_obj_field(item)] = extra | |
# Item | |
for item in self.items('item'): | |
result[get_obj_field(item)] = "%d/%d actv" % (item.active, item.active + item.inactive) | |
# Item Stack | |
for item in self.items('item stack'): | |
result[get_obj_field(item)] = item.remaining | |
# Item Platter | |
for item in self.items('item platter'): | |
result[get_obj_field(item)] = item.remaining | |
# Effect | |
for item in self.items('effect'): | |
result[get_obj_field(item)] = "4life" if item.is_4life else item.expire_date | |
return result | |
class CsvExporter(object): | |
def __init__(self): | |
self.__static_fields = [] | |
pass | |
def set_field_order(self, field_names): | |
self.__static_fields = field_names | |
def export(self, filename, inventories): | |
fields = extract_fields(inventories, self.__static_fields) | |
inv_dicts = map(Inventory.dict, inventories) | |
with open(filename, 'w') as csvfile: | |
writer = csv.writer(csvfile) | |
for f in fields: | |
writer.writerow([f] + [d.get(f, "--") for d in inv_dicts]) | |
class FurcadiaInfoClient(object): | |
def __init__(self, name, passwd, log_callback=None): | |
self.__log = log_callback | |
self.__name = name | |
self.__passwd = passwd | |
self.__socket = None | |
self.__is_logging_info = False | |
self.is_fail = False | |
self.state = "READY" | |
self.info_buffer = [] | |
def name(self): | |
return self.__name | |
def fail(self): | |
return self.is_fail | |
def info(self): | |
return self.info_buffer | |
def log(self, text): | |
if self.__log: | |
self.__log(text) | |
def send(self, line): | |
return self.__socket.send(line + "\n") if self.__socket else 0 | |
def shutdown(self): | |
self.__is_running = False | |
self.state = "CLOSING" | |
def handle(self, line): | |
if line == False: | |
self.log("! TIMEOUT - shutting down") | |
self.shutdown() | |
return | |
if self.state == "BANNER": | |
if line.startswith("Dragonroar"): | |
self.state = "AUTH" | |
self.log("> Logging in...") | |
self.send("connect %s %s" % (self.__name, self.__passwd)) | |
self.__passwd = "" | |
elif self.state == "AUTH": | |
if line.startswith("&&&&&&&&"): | |
self.state = "CONNECTED" | |
self.log("> Connected: requesting info...") | |
self.send("info") | |
elif self.state == "CONNECTED": | |
if line.startswith("("): | |
# determine new state of "is logging info?" | |
if self.__is_logging_info: | |
if line.startswith("(Pong"): | |
self.__is_logging_info = False | |
self.is_fail = False | |
self.log("> Info obtained: shutting down...") | |
self.send("quit") | |
self.shutdown() | |
else: | |
if line.startswith(FIRST_LINE_PREFIX): | |
self.__is_logging_info = True | |
self.send("ping") | |
# log the data if we still are | |
if self.__is_logging_info: | |
self.info_buffer += [line] | |
def run(self): | |
self.__is_running = True | |
self.is_fail = True | |
self.state = "CONNECT" | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.connect(GAMESERVER_ADDR) | |
self.state = "BANNER" | |
self.__socket = s | |
last_line = '' | |
buf = [] | |
def non_empty(s): return s != "" | |
while self.__is_running: | |
if buf == []: | |
try: | |
buf += (last_line + s.recv(4096)).split("\n") | |
# Last is either empty or incomplete! | |
last_line = buf.pop() | |
buf.reverse() | |
except socket.timeout: | |
self.handle(False) | |
except socket.error as err: | |
self.log("Connection closed: " + err) | |
self.shutdown() | |
else: | |
self.handle(buf.pop()) | |
self.__socket = None | |
s.close() | |
self.state = "CLOSED" | |
self.log("- Connection closed") | |
def extract_fields(inventories, static_fields=[]): | |
fields = set() | |
for inv in inventories: | |
fields = fields.union(set(inv.dict().keys())) | |
fields = list(fields) | |
for f in static_fields: | |
if f in fields: | |
fields.remove(f) | |
fields.sort() | |
return static_fields + fields | |
def get_field(category, name): | |
return "%s: %s" % (category, name) | |
def get_obj_field(inv_item): | |
return get_field(inv_item.category(), inv_item.name()) | |
def get_expire_date(text): | |
t = RE_FURC_TS.findall(text) | |
return None if t == [] else t[0] | |
def main(): | |
print "=== BTA Inventory Check 1.0b ============================" | |
socket.setdefaulttimeout(5) | |
generator = InventoryGenerator() | |
print " * Retrieving inventory for %d characters..." % len(CHARACTERS) | |
inventories = [] | |
for char in CHARACTERS: | |
(name, passwd) = char | |
print " >", name | |
def log_callback(text): print " ", text | |
instance = FurcadiaInfoClient(name, passwd, log_callback) | |
instance.run() ;# TODO: Threads! | |
if not instance.fail(): | |
info = instance.info() | |
inv = generator.create(info) | |
inventories += [inv] | |
else: | |
print " X FAILED" | |
print " * Exporting data to %s..." % OUTPUT_CSV | |
exporter = CsvExporter() | |
exporter.set_field_order(['Name', 'UID', 'Account ID', 'Email', 'Balance']) | |
exporter.export(OUTPUT_CSV, inventories) | |
print " > DONE" | |
return 0 | |
if __name__ == "__main__": | |
raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment