Skip to content

Instantly share code, notes, and snippets.

@icedraco
Created August 1, 2016 16:27
Show Gist options
  • Save icedraco/8236cbd78e764f0ed0938f38152d082e to your computer and use it in GitHub Desktop.
Save icedraco/8236cbd78e764f0ed0938f38152d082e to your computer and use it in GitHub Desktop.
Furcadia / BTA Inventory Check Script (exports `info of multiple characters into a CSV file)
###--# BTA Inventory Check 1.0b
# This script was built to mass-check the inventory of many Furcadia characters
# and document the results.
#
# Author: Artex / IceDragon <[email protected]>
#
import re
import socket
import csv
# CSV output file
OUTPUT_CSV = "inventory.csv"
# Character list
# format: [(name1, pass1), (name2, pass2), ...]
CHARACTERS = [("BuggeTestAlt%04d" % n, "password") for n in range(1,31)]
# Furcadia server address
GAMESERVER_ADDR = ('lightbringer.furcadia.com', 6500)
# Adjustment: First line of `info starts with...
FIRST_LINE_PREFIX = "(Player info:"
### Regular expressions:
# (type, text)
RE_BASIC = re.compile('^\\(([^:]+): (.*)$')
# (groupname, level)
RE_ACCESS = re.compile('([^,]+), level (\d+)')
# (name, rest)
RE_EXTRA = re.compile('^([a-zA-Z0-9 ]+)[, ].(.*)$')
# (active, inactive, pending_review)
RE_PORTRAITS = re.compile('(\d+) active, (\d+) inactive, (\d+) waiting for review')
# (name, active, inactive)
RE_ITEM = re.compile('([^\\(]+) \\((\d+) active, (\d+) inactive\\)')
# (name, remaining)
RE_ITEM_STACK = re.compile('([a-zA-Z0-9 ]+) \\((\d+) remaining\\)')
# (name, rest)
RE_EFFECT = re.compile('^([a-zA-Z0-9 ]+) (.*)$')
# (gd, sd, cd, usd_with_symbols)
RE_SCALES = re.compile('(\d+) GD, (\d+) SD, (\d+) CD \\(total (.*) USD\\)')
# (exp_date_text)
RE_FURC_TS = re.compile("(\d{4}-\d{2}-\d{2}/[0-9:]+/\w{3})")
# (name, email)
RE_PLAYER_INFO = re.compile("^([^\\(]+) \\((.*)\\)")
###--# Functionality #--#######################################################
class ReportItem(object):
def __init__(self):
pass
def name(self):
return "???"
@staticmethod
def category():
return "??"
class Access(ReportItem):
def __init__(self, text):
t = RE_ACCESS.findall(text)[0]
self.group = t[0].replace('|', ' ')
self.level = int(t[1])
def __str__(self):
return "Access: %s, level %d" % (self.group, self.level)
def name(self):
return self.group
@staticmethod
def category():
return "ACC"
class Extra(ReportItem):
def __init__(self, text):
t = RE_EXTRA.findall(text)[0]
self.item_name = t[0]
self.is_account = "(account)" in t[1]
self.is_4life = "#C" in t[1] or "/cache/6" in t[1] or "life" in t[1]
self.expire_date = get_expire_date(t[1])
def __str__(self):
extra = "4life" if self.is_4life else self.expire_date
if not extra:
if self.is_account:
extra = "account"
else:
extra = "--"
return "Extra: %s (%s)" % (self.item_name, extra)
def name(self):
return self.item_name
@staticmethod
def category():
return "EX"
class Portraits(ReportItem):
def __init__(self, text):
t = RE_PORTRAITS.findall(text)[0]
self.active = int(t[0])
self.inactive = int(t[1])
self.pending = int(t[2])
def __str__(self):
return "Portraits: %d active, %d inactive, %d waiting for review" % (self.active, self.inactive, self.pending)
@staticmethod
def category():
return "PORT"
class RegularPortraits(Portraits):
def __init__(self, text):
Portraits.__init__(self, text)
self.is_animated = False
def name(self):
return "Regular Portraits"
@staticmethod
def category():
return Portraits.category() + "/REG"
class AnimatedPortraits(Portraits):
def __init__(self, text):
Portraits.__init__(self, text)
self.is_animated = True
def __str__(self):
return "Animated " + Portraits.__str__(self)
def name(self):
return "Animataed Portraits"
@staticmethod
def category():
return Portraits.category() + "/ANIM"
class Item(ReportItem):
def __init__(self, text):
t = RE_ITEM.findall(text)[0]
self.item_name = t[0]
self.active = int(t[1])
self.inactive = int(t[2])
def __str__(self):
return "Item: %s (%d active, %d inactive)" % (self.item_name, self.active, self.inactive)
def name(self):
return self.item_name
@staticmethod
def category():
return "ITEM"
class ItemStack(ReportItem):
def __init__(self, text):
t = RE_ITEM_STACK.findall(text)[0]
self.item_name = t[0]
self.remaining = int(t[1])
def __str__(self):
return "Item Stack: %s (%d remaining)" % (self.item_name, self.remaining)
def name(self):
return self.item_name
@staticmethod
def category():
return "STACK"
class ItemPlatter(ReportItem):
def __init__(self, text):
t = RE_ITEM_STACK.findall(text)[0]
self.item_name = t[0]
self.remaining = int(t[1])
def __str__(self):
return "Item Platter: %s (%d remaining)" % (self.item_name, self.remaining)
def name(self):
return self.item_name
@staticmethod
def category():
return "PLATTER"
class Effect(ReportItem):
def __init__(self, text):
t = RE_EFFECT.findall(text)[0]
self.item_name = t[0]
self.is_4life = "life" in t[1]
self.expire_date = get_expire_date(t[1])
def __str__(self):
extra = "4life" if self.is_4life else self.expire_date
return "Extra: %s (%s)" % (self.item_name, extra)
def name(self):
return self.item_name
@staticmethod
def category():
return "EFF"
class ScalesBalance(ReportItem):
def __init__(self, text):
t = RE_SCALES.findall(text)[0]
self.gd = int(t[0])
self.sd = int(t[1])
self.cd = int(t[2])
self.usd = float(t[3][1:])
def __str__(self):
data = (
self.gd, self.sd, self.cd,
self.usd
)
return "DragonScales Balance: %d GD, %d SD, %d CD (total $%.2f USD)" % data
def name(self):
return "Balance"
@staticmethod
def category():
return "DSC"
class InventoryGenerator(object):
# NOTE: THE KEYS MUST BE IN LOWERCASE!
ASSOCIATIONS = {
'access': Access,
'extra': Extra,
'portraits': RegularPortraits,
'animated portraits': AnimatedPortraits,
'item': Item,
'item stack': ItemStack,
'item platter': ItemPlatter,
'effect': Effect,
'dragonscales balance': ScalesBalance
}
# These types cna have more than 1 line to them:
MULTI_ITEM_TYPES = ['access', 'extra', 'item', 'item stack', 'item platter', 'effect']
def __init__(self):
pass
def create(self, lines):
# Stage 1: Input deconstruction
# a. pass each line through RE_BASIC => (itemname, text)
# b. filter empty/irrelevant lines
def non_empty_list(re_list): return re_list != []
# c. extract first element from re.findall() result
def first_element(re_list): return re_list[0]
# d. expand each relevant "text" part in (itemname, text) into a class
# using ASSOCIATIONS
def expand_item(item):
(itype, text) = item
itype = itype.lower()
if itype in self.ASSOCIATIONS:
text = self.ASSOCIATIONS[itype](text)
return (itype, text)
# all the stages above combined
items = map(expand_item, map(first_element, filter(non_empty_list, map(RE_BASIC.findall, lines))))
# Stage 2: Collapsing
result = {}
multi_items = filter(lambda i: i[0] in self.MULTI_ITEM_TYPES, items)
other_items = filter(lambda i: i not in multi_items, items)
for item in other_items:
result[item[0]] = item[1]
for t in self.MULTI_ITEM_TYPES:
result[t] = map(lambda i: i[1], filter(lambda i: i[0] == t, multi_items))
return Inventory(result)
class Inventory(object):
def __init__(self, inventory_dict):
self.inventory = inventory_dict
def name(self):
return RE_PLAYER_INFO.findall(self.inventory['player info'])[0][0]
def email(self):
return RE_PLAYER_INFO.findall(self.inventory['player info'])[0][1]
def uid(self):
return int(self.inventory['furcadia uid'])
def account_id(self):
return int(self.inventory['furcadia account id'])
def balance(self):
return self.inventory['dragonscales balance']
def access(self, group=None):
access = [{a.group: a.level} for a in self.inventory['access']]
return access if not group else access[group.lower()]
def items(self, item_type):
t = item_type.lower()
if t in self.inventory:
return self.inventory[t]
else:
return []
def dict(self):
result = {
'Name': self.name(),
'Email': self.email(),
'UID': self.uid(),
'Account ID': self.account_id(),
'Balance': self.balance().usd
}
# Access
for access in self.inventory['access']:
result[get_obj_field(access)] = access.level
# Portraits
for t in ['portraits', 'animated portraits']:
p = self.inventory.get(t)
if p:
result[get_field(p.category(), "Active")] = p.active
result[get_field(p.category(), "Inactive")] = p.inactive
result[get_field(p.category(), "Pending")] = p.pending
# Extra
for item in self.items('extra'):
extra = "4life" if item.is_4life else item.expire_date
if not extra:
if item.is_account:
extra = "account"
else:
extra = "+"
result[get_obj_field(item)] = extra
# Item
for item in self.items('item'):
result[get_obj_field(item)] = "%d/%d actv" % (item.active, item.active + item.inactive)
# Item Stack
for item in self.items('item stack'):
result[get_obj_field(item)] = item.remaining
# Item Platter
for item in self.items('item platter'):
result[get_obj_field(item)] = item.remaining
# Effect
for item in self.items('effect'):
result[get_obj_field(item)] = "4life" if item.is_4life else item.expire_date
return result
class CsvExporter(object):
def __init__(self):
self.__static_fields = []
pass
def set_field_order(self, field_names):
self.__static_fields = field_names
def export(self, filename, inventories):
fields = extract_fields(inventories, self.__static_fields)
inv_dicts = map(Inventory.dict, inventories)
with open(filename, 'w') as csvfile:
writer = csv.writer(csvfile)
for f in fields:
writer.writerow([f] + [d.get(f, "--") for d in inv_dicts])
class FurcadiaInfoClient(object):
def __init__(self, name, passwd, log_callback=None):
self.__log = log_callback
self.__name = name
self.__passwd = passwd
self.__socket = None
self.__is_logging_info = False
self.is_fail = False
self.state = "READY"
self.info_buffer = []
def name(self):
return self.__name
def fail(self):
return self.is_fail
def info(self):
return self.info_buffer
def log(self, text):
if self.__log:
self.__log(text)
def send(self, line):
return self.__socket.send(line + "\n") if self.__socket else 0
def shutdown(self):
self.__is_running = False
self.state = "CLOSING"
def handle(self, line):
if line == False:
self.log("! TIMEOUT - shutting down")
self.shutdown()
return
if self.state == "BANNER":
if line.startswith("Dragonroar"):
self.state = "AUTH"
self.log("> Logging in...")
self.send("connect %s %s" % (self.__name, self.__passwd))
self.__passwd = ""
elif self.state == "AUTH":
if line.startswith("&&&&&&&&"):
self.state = "CONNECTED"
self.log("> Connected: requesting info...")
self.send("info")
elif self.state == "CONNECTED":
if line.startswith("("):
# determine new state of "is logging info?"
if self.__is_logging_info:
if line.startswith("(Pong"):
self.__is_logging_info = False
self.is_fail = False
self.log("> Info obtained: shutting down...")
self.send("quit")
self.shutdown()
else:
if line.startswith(FIRST_LINE_PREFIX):
self.__is_logging_info = True
self.send("ping")
# log the data if we still are
if self.__is_logging_info:
self.info_buffer += [line]
def run(self):
self.__is_running = True
self.is_fail = True
self.state = "CONNECT"
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(GAMESERVER_ADDR)
self.state = "BANNER"
self.__socket = s
last_line = ''
buf = []
def non_empty(s): return s != ""
while self.__is_running:
if buf == []:
try:
buf += (last_line + s.recv(4096)).split("\n")
# Last is either empty or incomplete!
last_line = buf.pop()
buf.reverse()
except socket.timeout:
self.handle(False)
except socket.error as err:
self.log("Connection closed: " + err)
self.shutdown()
else:
self.handle(buf.pop())
self.__socket = None
s.close()
self.state = "CLOSED"
self.log("- Connection closed")
def extract_fields(inventories, static_fields=[]):
fields = set()
for inv in inventories:
fields = fields.union(set(inv.dict().keys()))
fields = list(fields)
for f in static_fields:
if f in fields:
fields.remove(f)
fields.sort()
return static_fields + fields
def get_field(category, name):
return "%s: %s" % (category, name)
def get_obj_field(inv_item):
return get_field(inv_item.category(), inv_item.name())
def get_expire_date(text):
t = RE_FURC_TS.findall(text)
return None if t == [] else t[0]
def main():
print "=== BTA Inventory Check 1.0b ============================"
print
socket.setdefaulttimeout(5)
generator = InventoryGenerator()
print " * Retrieving inventory for %d characters..." % len(CHARACTERS)
inventories = []
for char in CHARACTERS:
(name, passwd) = char
print " >", name
def log_callback(text): print " ", text
instance = FurcadiaInfoClient(name, passwd, log_callback)
instance.run() ;# TODO: Threads!
if not instance.fail():
info = instance.info()
inv = generator.create(info)
inventories += [inv]
else:
print " X FAILED"
print
print " * Exporting data to %s..." % OUTPUT_CSV
exporter = CsvExporter()
exporter.set_field_order(['Name', 'UID', 'Account ID', 'Email', 'Balance'])
exporter.export(OUTPUT_CSV, inventories)
print
print " > DONE"
return 0
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment