Skip to content

Instantly share code, notes, and snippets.

@alxfordy
Last active September 14, 2021 10:07
Show Gist options
  • Save alxfordy/c41e2dc15e41a09a8ed995093179e59a to your computer and use it in GitHub Desktop.
Save alxfordy/c41e2dc15e41a09a8ed995093179e59a to your computer and use it in GitHub Desktop.
Coinbase Bot
import time
import os
import logging
import json
import cbpro
class CoinbaseBot():
def __init__(self, amount, crypto_token, api_key, secret_key, passphrase):
""" Initialisation of the Bot with the DCA options """
self._logger = logging.getLogger("Coinbase-DCA-Bot")
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
self.amount = amount
self.crypto_token = crypto_token
self._api_key = api_key
self._secret_key = secret_key
self._passphrase = passphrase
self.coinbase_client = cbpro.AuthenticatedClient(self._api_key, self._secret_key, self._passphrase)
self.profile_id = self.get_accounts()[0].get("profile_id")
def get_accounts(self):
return(self.coinbase_client.get_accounts())
def get_account_holdings(self):
## This functions uses a generator that isn't good
print(self.profile_id)
results = self.coinbase_client.get_account_holds(self.profile_id)
for key, value in results:
print(f"{key}: {value}")
print(type(results))
print(results[0].get("message"))
def get_account_holdings_workaround(self, ticker=None):
if ticker:
return [item for item in self.get_accounts() if item.get("currency") == ticker]
else: return self.get_accounts()
def get_ticker_price(self, ticker=None):
ticker = ticker if ticker else self.crypto_token
return self.coinbase_client.get_product_ticker(product_id=ticker)
def _got_funds(self, asset=None):
asset = asset if asset else "GBP"
if not hasattr(self, 'balance'):
self.balance = self.self.get_account_holdings_workaround(asset)[0].get("balance")
if float(self.amount) > float(self.balance): return False
return True
def create_buy_order(self):
result = self.coinbase_client.place_market_order(product_id=self.crypto_token,
side='buy',
funds=self.amount)
if "message" in result:
# Something went wrong if there's a 'message' field in response
print(result)
def run(self, **kwargs):
asset = kwargs.get("asset") if kwargs.get("asset") else "GBP"
balance = self.get_account_holdings_workaround(asset)
if len(balance) > 1:
print(f"Issues with checking FIAT Balance - More than one returned")
return False
fiat_balance = balance[0]
self.balance = fiat_balance.get('balance')
print(f"You currently have {fiat_balance.get('available')} {fiat_balance.get('currency')}")
if not self._got_funds():
print("You haven't got enough funds")
return False
ticker_price_details = self.get_ticker_price()
print(f"Buying {self.crypto_token} at {ticker_price_details.get('price')}")
self.create_buy_order()
print(f"Completed Purchase Of {self.crypto_token}, {asset} amount {self.amount}")
def lambda_handler(event, context):
cb_api_key = os.environ['cb_api_key']
cb_secret_key = os.environ['cb_secret_key']
cb_passphrase = os.environ['cb_passphrase']
cb_bot = CoinbaseBot("10.00", "ADA-GBP", cb_api_key, cb_secret_key, cb_passphrase)
cb_bot.run()
return {
'statusCode': 200,
'body': json.dumps('Done Making Money')
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment