Skip to content

Instantly share code, notes, and snippets.

@kimmobrunfeldt
Created December 6, 2013 16:33
Show Gist options
  • Select an option

  • Save kimmobrunfeldt/7827819 to your computer and use it in GitHub Desktop.

Select an option

Save kimmobrunfeldt/7827819 to your computer and use it in GitHub Desktop.
Watches bitcoin price at Bitstamp and sends alerts.
import json
import time
import urllib2
from decimal import Decimal
import smtplib
# Amount of percentage highest bitcoin bid order can change in time_window
# If this is exceeded, alert is called
TOTAL_CHANGE_THRESHOLD = 10
# Amount of percentage highest bitcoin bid order can change between the last 2
# checkpoints. If this is exceeded, alert is called
RECENT_CHANGE_THRESHOLD = 4
# Time in seconds
TIME_WINDOW = 60 * 20
WATCH_INTERVAL = 30
SEND_TEST_MAIL_ON_START = True
MAIL_SERVER = 'posti.saunalahti.fi'
MAIL_RECEIVERS = receivers = ['[email protected]']
TICKER_URL = 'http://www.bitstamp.net/api/ticker/'
def alert(change, time_window):
formatted_change = '%.2f %%' % change
sender = '[email protected]'
receivers = MAIL_RECEIVERS
message = """From: Bitcoin alert <[email protected]>
To: All <[email protected]>
Subject: Bitcoin course has changed %s!
Total change was %s in %s seconds.
""" % (formatted_change, formatted_change, time_window)
try:
smtpObj = smtplib.SMTP(MAIL_SERVER)
smtpObj.sendmail(sender, receivers, message)
print "Successfully sent email"
except smtplib.SMTPException as e:
print e
print "Error: unable to send email"
def bitstamp_ticker():
response = json.loads(urllib2.urlopen(TICKER_URL).read())
for x, y in response.items():
response[x] = Decimal(y)
return response
def total_change_percentage(history):
if len(history) < 2:
return 0
return (history[-1]['bid'] / history[0]['bid'] - 1) * 100
def total_time_window(history):
if len(history) < 2:
return 0
return history[-1]['timestamp'] - history[0]['timestamp']
def recent_change_percentage(history):
if len(history) < 2:
return 0
return (history[-1]['bid'] / history[-2]['bid'] - 1) * 100
def recent_time_window(history):
if len(history) < 2:
return 0
return history[-1]['timestamp'] - history[-2]['timestamp']
def filter_old(history):
return [x for x in history if x['timestamp'] >= time.time() - TIME_WINDOW]
def main():
if SEND_TEST_MAIL_ON_START:
alert(100, 100)
history = []
while True:
ticker = bitstamp_ticker()
history.append(ticker)
history = filter_old(history)
total_change = total_change_percentage(history)
total_window = total_time_window(history)
recent_change = recent_change_percentage(history)
recent_window = recent_time_window(history)
alert_sent = False
if abs(total_change) > TOTAL_CHANGE_THRESHOLD:
alert(total_change, time_window=total_window)
alert_sent = True
if abs(recent_change) > RECENT_CHANGE_THRESHOLD and not alert_sent:
alert(recent_change, time_window=recent_window)
print 'Currently highest bid price is %s USD' % ticker['bid']
print 'Total change was %.2f %% in %s seconds' % (total_change,
total_window)
print 'Recent change was %.2f %% in %s seconds' % (recent_change,
recent_window)
time.sleep(WATCH_INTERVAL)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment