Last active
August 29, 2015 14:15
-
-
Save psykzz/1e59541ffe97101e777c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import json | |
import datetime | |
from slackclient import SlackClient | |
class SlackBot(object): | |
"""Simplified bot responder for slack""" | |
def __init__(self, api_key, bot_name='Bat Bot'): | |
super(SlackBot, self).__init__() | |
self._api_key = api_key | |
self.client = SlackClient(self._api_key) | |
self.bot_name = bot_name | |
self.responses = {} | |
self.time_responses = {} | |
self._cache = {} | |
self.run_timer = 0 | |
self._last_run = 0 | |
def format_user(self, event): | |
if event.get('user'): | |
event['user_details'] = self.get_user_info(event.get('user'), True) | |
return event | |
def add_response(self, text, response): | |
if text not in self.responses: | |
self.responses[text] = [] | |
self.responses[text] += [response] | |
def add_timer(self, func, timer=1): | |
if timer not in self.time_responses: | |
self.time_responses[timer] = [] | |
self.time_responses[timer] += [func] | |
# Add a remove response | |
def cache_get(self, key, default=None): | |
return self._cache.get(key, default) | |
def cache_set(self, key, value): | |
if key not in self._cache: | |
self._cache[key] = value | |
else: | |
self._cache[key] = value | |
def get_bot_info(self): | |
if not self.cache_get('self'): | |
_data = json.loads(self.client.api_call("auth.test")) | |
self.cache_set('self', { | |
'user_id':_data['user_id'], | |
'user_name': _data['user'], | |
'team_id':_data['team_id'], | |
'team_name':_data['team'], | |
}) | |
return self.cache_get('self') | |
def get_user_info(self, user, by_id=False): | |
key = ['users', 'name'] | |
if by_id: | |
key = ['users_byid', 'id'] | |
if not self.cache_get(key[0]): | |
_data = json.loads(self.client.api_call("users.list")) | |
self.cache_set(key[0], {_user[key[1]]:_user for _user in _data['members']}) | |
return self.cache_get(key[0]).get(user) | |
def get_channel_info(self, channel): | |
if not self.cache_get('channels'): | |
_data = json.loads(self.client.api_call("channels.list")) | |
self.cache_set('channels', {_chan['name']:_chan for _chan in _data['channels']}) | |
return self.cache_get('channels').get(channel) | |
def get_group_info(self, group): | |
if not self.cache_get('groups'): | |
_data = json.loads(self.client.api_call("groups.list")) | |
self.cache_set('groups', {_chan['name']:_chan for _chan in _data['groups']}) | |
return self.cache_get('groups').get(group) | |
def send_message(self, channel, message): | |
# remove the # | |
if channel[0] == '#': | |
channel = channel[1:] | |
channel_info = self.get_channel_info(channel) | |
if not channel_info: | |
# Lets check its a private group | |
channel_info = self.get_group_info(channel) | |
if not channel_info: | |
raise Exception('Invalid channel or group (%s)' % (channel)) | |
return self.client.api_call("chat.postMessage", **{ | |
'channel': channel_info['id'], | |
'username': self.bot_name, | |
'text': message | |
}) | |
def process(self): | |
if self.client.rtm_connect(): | |
while True: | |
# Run timers | |
self.run_timer += 1 | |
for func, interval in self.time_responses: | |
if (self.run_timer % interval == 0): | |
message = func() | |
self.send_message('#illuminati', message) | |
events = self.client.rtm_read() | |
for event in events: | |
print event | |
if event['type'] != u'message': | |
continue | |
if event.get('hidden') == u'true': | |
continue | |
if event.get('username') == self.bot_name: | |
continue | |
# Get extra information | |
event = self.format_user(event) | |
# Custom fucckery | |
if 'kill yourself' in event.get('text') and event.get('username').lower() == 'psykzz': | |
return | |
# Main response loop | |
for repsonse_key, responses in self.responses.iteritems(): | |
# Start of the string | |
if event['text'].find(repsonse_key) != 0: | |
continue | |
for response in responses: | |
if not response: | |
continue | |
if callable(response): | |
message = response(event) | |
else: | |
message = response | |
if message is not None: | |
self.send_message('#illuminati', message) | |
time.sleep(1) # Lets be nice | |
else: | |
return | |
#### | |
import datetime | |
import requests | |
import xmltodict | |
api_key = "xoxp-xxxxx" | |
cli = SlackBot(api_key) | |
_eve_cache = {} | |
def eve_to_date(date_string): | |
return datetime.datetime.strptime(date_string, '%Y-%m-%d %H:%M:%S') | |
def eve_cache(key, value=None): | |
global _eve_cache | |
if value is None: return _eve_cache.get(key) | |
else: _eve_cache[key] = value | |
return _eve_cache[key] | |
def test_func(event): | |
return str(event) | |
def get_weather(event): | |
location = event.get('text')[event.get('text').index('weather in') + len('weather in'):] | |
result = requests.get('http://api.openweathermap.org/data/2.5/weather?q=%s' % (location)).json() | |
if result.get('cod') == '404': | |
return "Can't find %s" % (location) | |
return "%s in %s" % (result.get('weather')[0].get('description'), location) | |
def get_character_info(event): | |
player_name = event.get('text')[event.get('text').index('!charinfo') + len('!charinfo '):] | |
cache_key_id = 'char_name_id_%s' % (player_name) | |
cache_key_sheet = 'char_sheet_%s' % (player_name) | |
cached = True | |
# Get the char id | |
#/corp/CorporationSheet.xml.aspx | |
result = eve_cache(cache_key_id) | |
if result: cache_date = eve_to_date(result['eveapi']['cachedUntil']) | |
if not result or cache_date < datetime.datetime.utcnow(): | |
print 'Downloading...' | |
result = requests.get("https://api.eveonline.com/eve/CharacterID.xml.aspx", data={'names': player_name}).text | |
result = xmltodict.parse(result) | |
eve_cache(cache_key_id, result) | |
cached = False | |
character_id = int(result['eveapi']['result']['rowset']['row']['@characterID']) | |
if not character_id: | |
return "Character (%s): *not found*" % (player_name) | |
# get the public sheet | |
# eve/CharacterInfo.xml.aspx | |
result = eve_cache(cache_key_sheet) | |
if result: cache_date = eve_to_date(result['eveapi']['cachedUntil']) | |
if not result or cache_date < datetime.datetime.utcnow(): | |
print 'Downloading...' | |
result = requests.get("https://api.eveonline.com/eve/CharacterInfo.xml.aspx", data={'characterID': character_id}).text | |
result = xmltodict.parse(result) | |
eve_cache(cache_key_sheet, result) | |
cached = False | |
character_info = result['eveapi']['result'] | |
emploment = ['Work in progress'] | |
for item in character_info['rowset']['row']: | |
pass | |
# name = item['@corporationName'] | |
# start = item['@startDate'] | |
# emploment += [ "%s _(%s)_" % (name, start) ] | |
output = "*=== Character Info%s ===*\n" % (' _(Cached)_' if cached else '') | |
output += "*ID*: %s\n" % (character_info['characterID']) | |
output += "*Name*: %s\n" % (character_info['characterName']) | |
output += "*Corporation*: %s\n" % (character_info['corporation']) | |
output += "*Security Status*: %s\n" % (format(float(character_info['securityStatus']), '.2f')) | |
output += "*Employment History*: %s" % (", ".join(emploment)) | |
return output | |
def get_corp_wallet(event): | |
api_data = {'KeyID': 'xxx', 'vCode': 'xxxx'} | |
cached = True | |
# Get the corp sheet | |
#/corp/CorporationSheet.xml.aspx | |
wallets = {} | |
result = eve_cache('corp_sheet') | |
if result: cache_date = eve_to_date(result['eveapi']['cachedUntil']) | |
if not result or cache_date < datetime.datetime.utcnow(): | |
print 'Downloading...' | |
result = requests.get("https://api.eveonline.com/corp/CorporationSheet.xml.aspx", data=api_data).text | |
result = xmltodict.parse(result) | |
eve_cache('corp_sheet', result) | |
cached = False | |
for row in result['eveapi']['result']['rowset']: | |
if row['@name'] == 'divisions': | |
continue | |
for item in row['row']: | |
wallets[item['@accountKey']] = item['@description'] | |
# Get the wallet details | |
#/corp/AccountBalance.xml.aspx | |
result = eve_cache('corp_account_balance') | |
if result: cache_date = eve_to_date(result['eveapi']['cachedUntil']) | |
if not result or cache_date < datetime.datetime.utcnow(): | |
print 'Downloading...' | |
result = requests.get("https://api.eveonline.com/corp/AccountBalance.xml.aspx", data=api_data).text | |
result = xmltodict.parse(result) | |
eve_cache('corp_account_balance', result) | |
cached = False | |
output = "*=== Wallets%s ===*\n" % (' _(Cached)_' if cached else '') | |
balance_total = float(0.00) | |
for row in result['eveapi']['result']['rowset']['row']: | |
balance_total += float(row['@balance']) | |
output += "*%s*: %s ISK \n" % (wallets[row['@accountKey']], format(float(row['@balance']), ',.2f')) | |
output += "*- Total*: %s ISK" % (format(balance_total, ',.2f')) | |
return output | |
# Command, response / function | |
cli.add_response('hi', 'hello') | |
cli.add_response('debug_output', test_func) | |
cli.add_response('weather in', get_weather) | |
cli.add_response('!wallet', get_corp_wallet) | |
cli.add_response('!charinfo', get_character_info) | |
# function / interval in seconds | |
cli.add_timer(twitch_get_online, 15) | |
for _ in cli.process(): | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment