Skip to content

Instantly share code, notes, and snippets.

@psykzz
Last active August 29, 2015 14:15
Show Gist options
  • Save psykzz/1e59541ffe97101e777c to your computer and use it in GitHub Desktop.
Save psykzz/1e59541ffe97101e777c to your computer and use it in GitHub Desktop.
import time
import json
import datetime
from slackclient import SlackClient
class SlackBot(object):
"""Simplified bot responder for slack"""
def __init__(self, api_key, bot_name='Bat Bot'):
super(SlackBot, self).__init__()
self._api_key = api_key
self.client = SlackClient(self._api_key)
self.bot_name = bot_name
self.responses = {}
self.time_responses = {}
self._cache = {}
self.run_timer = 0
self._last_run = 0
def format_user(self, event):
if event.get('user'):
event['user_details'] = self.get_user_info(event.get('user'), True)
return event
def add_response(self, text, response):
if text not in self.responses:
self.responses[text] = []
self.responses[text] += [response]
def add_timer(self, func, timer=1):
if timer not in self.time_responses:
self.time_responses[timer] = []
self.time_responses[timer] += [func]
# Add a remove response
def cache_get(self, key, default=None):
return self._cache.get(key, default)
def cache_set(self, key, value):
if key not in self._cache:
self._cache[key] = value
else:
self._cache[key] = value
def get_bot_info(self):
if not self.cache_get('self'):
_data = json.loads(self.client.api_call("auth.test"))
self.cache_set('self', {
'user_id':_data['user_id'],
'user_name': _data['user'],
'team_id':_data['team_id'],
'team_name':_data['team'],
})
return self.cache_get('self')
def get_user_info(self, user, by_id=False):
key = ['users', 'name']
if by_id:
key = ['users_byid', 'id']
if not self.cache_get(key[0]):
_data = json.loads(self.client.api_call("users.list"))
self.cache_set(key[0], {_user[key[1]]:_user for _user in _data['members']})
return self.cache_get(key[0]).get(user)
def get_channel_info(self, channel):
if not self.cache_get('channels'):
_data = json.loads(self.client.api_call("channels.list"))
self.cache_set('channels', {_chan['name']:_chan for _chan in _data['channels']})
return self.cache_get('channels').get(channel)
def get_group_info(self, group):
if not self.cache_get('groups'):
_data = json.loads(self.client.api_call("groups.list"))
self.cache_set('groups', {_chan['name']:_chan for _chan in _data['groups']})
return self.cache_get('groups').get(group)
def send_message(self, channel, message):
# remove the #
if channel[0] == '#':
channel = channel[1:]
channel_info = self.get_channel_info(channel)
if not channel_info:
# Lets check its a private group
channel_info = self.get_group_info(channel)
if not channel_info:
raise Exception('Invalid channel or group (%s)' % (channel))
return self.client.api_call("chat.postMessage", **{
'channel': channel_info['id'],
'username': self.bot_name,
'text': message
})
def process(self):
if self.client.rtm_connect():
while True:
# Run timers
self.run_timer += 1
for func, interval in self.time_responses:
if (self.run_timer % interval == 0):
message = func()
self.send_message('#illuminati', message)
events = self.client.rtm_read()
for event in events:
print event
if event['type'] != u'message':
continue
if event.get('hidden') == u'true':
continue
if event.get('username') == self.bot_name:
continue
# Get extra information
event = self.format_user(event)
# Custom fucckery
if 'kill yourself' in event.get('text') and event.get('username').lower() == 'psykzz':
return
# Main response loop
for repsonse_key, responses in self.responses.iteritems():
# Start of the string
if event['text'].find(repsonse_key) != 0:
continue
for response in responses:
if not response:
continue
if callable(response):
message = response(event)
else:
message = response
if message is not None:
self.send_message('#illuminati', message)
time.sleep(1) # Lets be nice
else:
return
####
import datetime
import requests
import xmltodict
api_key = "xoxp-xxxxx"
cli = SlackBot(api_key)
_eve_cache = {}
def eve_to_date(date_string):
return datetime.datetime.strptime(date_string, '%Y-%m-%d %H:%M:%S')
def eve_cache(key, value=None):
global _eve_cache
if value is None: return _eve_cache.get(key)
else: _eve_cache[key] = value
return _eve_cache[key]
def test_func(event):
return str(event)
def get_weather(event):
location = event.get('text')[event.get('text').index('weather in') + len('weather in'):]
result = requests.get('http://api.openweathermap.org/data/2.5/weather?q=%s' % (location)).json()
if result.get('cod') == '404':
return "Can't find %s" % (location)
return "%s in %s" % (result.get('weather')[0].get('description'), location)
def get_character_info(event):
player_name = event.get('text')[event.get('text').index('!charinfo') + len('!charinfo '):]
cache_key_id = 'char_name_id_%s' % (player_name)
cache_key_sheet = 'char_sheet_%s' % (player_name)
cached = True
# Get the char id
#/corp/CorporationSheet.xml.aspx
result = eve_cache(cache_key_id)
if result: cache_date = eve_to_date(result['eveapi']['cachedUntil'])
if not result or cache_date < datetime.datetime.utcnow():
print 'Downloading...'
result = requests.get("https://api.eveonline.com/eve/CharacterID.xml.aspx", data={'names': player_name}).text
result = xmltodict.parse(result)
eve_cache(cache_key_id, result)
cached = False
character_id = int(result['eveapi']['result']['rowset']['row']['@characterID'])
if not character_id:
return "Character (%s): *not found*" % (player_name)
# get the public sheet
# eve/CharacterInfo.xml.aspx
result = eve_cache(cache_key_sheet)
if result: cache_date = eve_to_date(result['eveapi']['cachedUntil'])
if not result or cache_date < datetime.datetime.utcnow():
print 'Downloading...'
result = requests.get("https://api.eveonline.com/eve/CharacterInfo.xml.aspx", data={'characterID': character_id}).text
result = xmltodict.parse(result)
eve_cache(cache_key_sheet, result)
cached = False
character_info = result['eveapi']['result']
emploment = ['Work in progress']
for item in character_info['rowset']['row']:
pass
# name = item['@corporationName']
# start = item['@startDate']
# emploment += [ "%s _(%s)_" % (name, start) ]
output = "*=== Character Info%s ===*\n" % (' _(Cached)_' if cached else '')
output += "*ID*: %s\n" % (character_info['characterID'])
output += "*Name*: %s\n" % (character_info['characterName'])
output += "*Corporation*: %s\n" % (character_info['corporation'])
output += "*Security Status*: %s\n" % (format(float(character_info['securityStatus']), '.2f'))
output += "*Employment History*: %s" % (", ".join(emploment))
return output
def get_corp_wallet(event):
api_data = {'KeyID': 'xxx', 'vCode': 'xxxx'}
cached = True
# Get the corp sheet
#/corp/CorporationSheet.xml.aspx
wallets = {}
result = eve_cache('corp_sheet')
if result: cache_date = eve_to_date(result['eveapi']['cachedUntil'])
if not result or cache_date < datetime.datetime.utcnow():
print 'Downloading...'
result = requests.get("https://api.eveonline.com/corp/CorporationSheet.xml.aspx", data=api_data).text
result = xmltodict.parse(result)
eve_cache('corp_sheet', result)
cached = False
for row in result['eveapi']['result']['rowset']:
if row['@name'] == 'divisions':
continue
for item in row['row']:
wallets[item['@accountKey']] = item['@description']
# Get the wallet details
#/corp/AccountBalance.xml.aspx
result = eve_cache('corp_account_balance')
if result: cache_date = eve_to_date(result['eveapi']['cachedUntil'])
if not result or cache_date < datetime.datetime.utcnow():
print 'Downloading...'
result = requests.get("https://api.eveonline.com/corp/AccountBalance.xml.aspx", data=api_data).text
result = xmltodict.parse(result)
eve_cache('corp_account_balance', result)
cached = False
output = "*=== Wallets%s ===*\n" % (' _(Cached)_' if cached else '')
balance_total = float(0.00)
for row in result['eveapi']['result']['rowset']['row']:
balance_total += float(row['@balance'])
output += "*%s*: %s ISK \n" % (wallets[row['@accountKey']], format(float(row['@balance']), ',.2f'))
output += "*- Total*: %s ISK" % (format(balance_total, ',.2f'))
return output
# Command, response / function
cli.add_response('hi', 'hello')
cli.add_response('debug_output', test_func)
cli.add_response('weather in', get_weather)
cli.add_response('!wallet', get_corp_wallet)
cli.add_response('!charinfo', get_character_info)
# function / interval in seconds
cli.add_timer(twitch_get_online, 15)
for _ in cli.process():
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment