Skip to content

Instantly share code, notes, and snippets.

@jango-blockchained
Last active July 21, 2024 04:24
Show Gist options
  • Save jango-blockchained/ac324b5609a1d803299aeda5a759ba27 to your computer and use it in GitHub Desktop.
Save jango-blockchained/ac324b5609a1d803299aeda5a759ba27 to your computer and use it in GitHub Desktop.
import frappe
import logging
import json
import time
from frappe import _
import ccxt # type: ignore
from frappe.utils.logger import get_logger
from .user import send_to_homeassistant
from tenacity import retry, stop_after_attempt, wait_exponential # type: ignore
from tv_data.tv_data.doctype.datafield.datafield import get_doc_from_user_key
from frappe_telegram.client import send_message
from frappe.utils.password import get_decrypted_password
class HooxAPI:
"""
The Hoox class processes Incoming Request and handles the required actions.
"""
client_ip_address = None
secret_hash = None
data = None
log = None
def __init__(self: object, request_json: str = None):
"""
Initializes the Hoox object. It fetches Hoox settings and the request data.
"""
self.data = request_json or frappe.request.data
self.log = get_logger(__name__ if self.cfg and self.cfg.log_type != "Personal" else self.secret_hash)
self.log.setLevel(logging.getLevelName(self.cfg.log_level if self.cfg else "INFO"))
# Load settings and credentials lazily (on first access)
self._cfg = None
self._exchange_creds = None
self._telegram_creds = None
self._homeassistant_creds = None
if frappe.local.request.method != "POST" or request_json is None:
return frappe.throw("Method not allowed.")
self.get_client_ip()
if not self.is_valid_ip():
return frappe.throw(f"Invalid IP address. IP: {self.client_ip_address}")
self.parse_request_data()
def parse_request_data(self: object):
self.secret_hash = self.json.get("secret_hash")
if not self.secret_hash:
return frappe.throw("Secret Hash is required.")
self.determine_actions()
self.save_incoming_request()
def determine_actions(self: object):
self.action = []
if self.json.get("action") in ["buy", "sell", "close"]:
self.action.append("trade")
if self.json.get("cex"):
self.action.append("trade_cex")
if self.json.get("dex"):
self.action.append("trade_dex")
if self.json.get("homeassistant") and self.json.get("homeassistant").get("entity_id"):
self.action.append("homeassistant")
if self.json.get("telegram") and self.json.get("telegram").get("message"):
self.action.append("telegram")
if self.json.get("datafield") and self.json.get("datafield").get("key") and self.json.get("datafield").get("value"):
self.action.append("datafield")
return
# -----------------------------------------------------------------------------------------
@property
def cfg(self: object):
if self._cfg is None:
self._cfg = frappe.get_single("Hoox Settings")
return self._cfg
@property
def exchange_creds(self: object):
if self._exchange_creds is None:
self._exchange_creds = self.get_exchange_creds()
return self._exchange_creds
@property
def exchange_connection(self: object, market_type: str, verbose: bool = True, rate_limit: bool = True, price_required: bool = False):
if not hasattr(self, "_exchange_connection"):
self._exchange_connection = self.get_exchange_connection(market_type, verbose, rate_limit, price_required)
return self._exchange_connection
@property
def telegram_creds(self: object):
if self._telegram_creds is None:
self._telegram_creds = frappe.get_doc("Telegram User", {"user": self.exchange_creds.user}) if self.exchange_creds else None
return self._telegram_creds if self._telegram_creds.enabled else None
@property
def homeassistant_creds(self: object):
if self._homeassistant_creds is None:
self._homeassistant_creds = frappe.get_doc("Telegram User", {"user": self.exchange_creds.user}) if self.exchange_creds else None
return self._homeassistant_creds if self._homeassistant_creds.enabled else None
@property
def json(self: object):
if not hasattr(self, "_json"):
self._json = json.loads(self.data)
return self._json
# ------------------------------------------------------------------------------------
@frappe.whitelist(allow_guest=True)
def log_execution_time(func):
log = get_logger(__name__ + '.timer')
log.setLevel(logging.INFO)
@frappe.whitelist(allow_guest=True)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
duration = end_time - start_time
msg = f"Execution time of {func.__name__}: {duration} seconds"
log.info(msg)
return result
return wrapper
def retry_on_exception():
cfg = frappe.get_single('Hoox Settings')
if cfg.retry_enabled:
return retry(
stop=stop_after_attempt(cfg.retry_stop_after),
wait=wait_exponential(
multiplier=cfg.retry_backoff, min=cfg.retry_min, max=cfg.retry_max)
)
else:
def no_retry(func):
return func
return no_retry
# -----------------------------------------------------------------------------------------
def get_client_ip(self):
if 'X-Forwarded-For' in frappe.local.request.headers:
self.client_ip_address = frappe.local.request.headers.get('X-Forwarded-For').split(', ')[-1]
else:
self.client_ip_address = frappe.local.request.remote_addr
return
def is_valid_ip(self: object):
return frappe.db.exists("IP Whitelist", {"ip": ["in", [self.client_ip_address, "*"]], "enabled": 1})
# -----------------------------------------------------------------------------------------
def send_msg(self: object, message_text: str):
if not self.cfg.telegram_enabled or not self.telegram_creds:
return
send_message(message_text=message_text, user=self.exchange_creds.user, telegram_user_id=self.telegram_creds.telegram_user_id)
def get_exchange_creds(self: object):
exchange_creds = frappe.get_doc("Exchange Credential", {
"secret_hash": self.secret_hash})
if not exchange_creds:
raise Exception(
f"No exchange credentials found for secret hash {self.secret_hash}")
exchange_creds.api_key = get_decrypted_password(
"Exchange Credential", exchange_creds.name, "api_key", False
)
exchange_creds.api_secret = get_decrypted_password(
"Exchange Credential", exchange_creds.name, "api_secret", False
)
return exchange_creds if exchange_creds.enabled else None
def get_exchange_connection(self: object, market_type: str, verbose: bool = True, rate_limit: bool = True, price_required: bool = False):
"""
Connect via REST to the selected exchange.
"""
return getattr(ccxt, self.exchange_creds.exchange)({
"apiKey": self.exchange_creds.api_key,
"secret": self.exchange_creds.api_secret,
"enableRateLimit": rate_limit,
"verbose": verbose,
"options": {
"defaultType": market_type,
"test": self.exchange_creds.testnet,
"createMarketBuyOrderRequiresPrice": price_required,
"createMarketSellOrderRequiresPrice": price_required
}
})
def save_incoming_request(self: object):
self.incoming_request = frappe.get_doc({
"doctype": "Incoming Request",
"method": "/".join(act for act in self.action),
"params": json.dumps(self.json, indent=4),
"status": "Success",
})
self.incoming_request.insert(ignore_permissions=True)
# -----------------------------------------------------------------------------------------
@log_execution_time()
def process_datafield(self: object):
"""Updates a datafield document based on the request data."""
def is_name(this):
return this.startswith("DATA_")
try:
df = self.json.get("datafield")
if df:
_key = df.get("key")
key = _key.split("_")[2] if _key and is_name(_key) else _key
value = df.get("value", 0)
n = df.get("n", None)
insert = df.get("insert", False)
user = self.exchange_creds.user
if key and value:
doc = get_doc_from_user_key(user, key, insert)
if doc is not None:
doc.handle_new_data(value, n, self.json)
self.log.info(f"Datafield '{key}' updated successfully.")
except frappe.DoesNotExistError as e:
self.log.error(f"Datafield not found: {e}")
except Exception as e:
self.log.error(f"Error processing datafield: {e}")
if self.cfg.throw:
frappe.throw(f"Error processing datafield: {e}")
@log_execution_time()
@retry_on_exception()
def process_trade(self: object):
"""Executes a trade on the exchange based on the request data."""
# Function Definition
ORDER_TYPE_FUNCS = {
"buy": {
"limit": "create_limit_buy_order",
"market": "create_market_buy_order",
},
"sell": {
"limit": "create_limit_sell_order",
"market": "create_market_sell_order",
},
}
def execute_order(action:str, pair:str, price:int|float, quantity:int|float, percent:int|float, order_type:str, market_type:str, leverage:int|float):
"""
Execute an order on an exchange using CCXT.
Returns the order object.
"""
def handle_error(msg):
# frappe.msgprint(msg)
frappe.log_error('Order Execution Error', msg, ref.doctype, ref.name)
logger.error(msg)
return msg
try:
# Get exchange
exchange = self.exchange_connection(market_type)
# exchange.verbose = True
if credentials.testnet:
exchange.set_sandbox_mode(True)
response = {}
# Set leverage
if market_type == "future" and "setLeverage" in exchange.has and leverage is not None and 1 < leverage: # <= exchange.maxLeverage:
exchange.set_leverage(leverage, pair)
# Check action
if action not in ["buy", "sell", "close", None]:
raise ValueError(f"Invalid action: {action}")
# Execute order
if action in ["buy", "sell"]:
order_func_name = ORDER_TYPE_FUNCS[action].get(order_type)
if order_func_name:
order_func = getattr(exchange, order_func_name)
if order_type == "limit":
response["order"] = order_func(pair, quantity, price)
else:
response["order"] = order_func(pair, quantity)
elif action == "close":
all_orders = exchange.fetch_open_orders(pair)
response["order"] = [exchange.cancel_order(order["id"]) for order in all_orders]
logger.debug(response)
return response
except ccxt.RequestTimeout as e:
msg = f"Request timed out: {str(e)}"
handle_error(msg)
except ccxt.AuthenticationError as e:
msg = f"Authentication error: {str(e)}"
handle_error(msg)
except ccxt.ExchangeNotAvailable as e:
msg = f"Exchange not available: {str(e)}"
handle_error(msg)
# except ccxt.ExchangeError as e:
# msg = f"Exchange error: {str(e)}"
# handle_error(msg)
except ccxt.BaseError as e:
msg = f"Base error in CCXT: {str(e)}"
handle_error(msg)
except ValueError as e:
msg = f"Value error: {str(e)}"
handle_error(msg)
except AttributeError as e:
msg = f"Attribute error: {str(e)}"
handle_error(msg)
except Exception as e:
msg = f"An unexpected error occurred: {str(e)}"
handle_error(msg)
def handle_alert(self: object):
"""Executes a trade order and sends notifications."""
action = self.json.get("action")
exchange_id = self.exchange_creds.exchange
pair = self.json.get("pair")
price = self.json.get("price")
quantity = self.json.get("quantity")
percent = self.json.get("percent") or False
order_type = self.json.get("order_type") or "market"
market_order = order_type == "market"
# Create a Trade document
trade = frappe.new_doc("Trade")
trade.exchange = exchange_id
trade.symbol = pair
trade.action = action
trade.order_type = order_type
trade.quantity = quantity
if price:
trade.price = price
if percent:
trade.percent = percent
# Execute the trade order
response = execute_order(trade, market_order=market_order)
# Update the Trade document with response data
trade.update(response)
# Send Telegram notification (if successful)
if response.get("status") == "success" and self.cfg.telegram_enabled:
self.send_msg(f"Trade executed successfully: {action} {pair} (Order ID: {trade.name})")
# Save the Trade document
trade.save(ignore_permissions=True)
self.log.info(f"Trade processed: {trade.name}")
return response
def process_dex_action(self: object):
"""Handles DEX (decentralized exchange) actions."""
required_fields = ["exchange", "action", "contract", "wallet", "secret_hash", "quantity"]
if all(field in self.json for field in required_fields):
if self.json["order_type"] == "limit" and "price" not in self.json:
return frappe.throw("Price field is required for 'limit' order type.")
elif self.exchange_creds.enabled:
response_alert = self.handle_alert()
self.log.info(response_alert)
return response_alert
def process_cex_action(self: object):
"""Handles CEX (centralized exchange) actions."""
required_fields = ["action", "pair", "order_type", "secret_hash", "quantity"]
if all(field in self.json for field in required_fields):
if self.json["order_type"] == "limit" and "price" not in self.json:
return frappe.throw("Price field is required for 'limit' order type.")
elif self.exchange_creds.enabled:
response_alert = self.handle_alert()
self.log.info(response_alert)
return response_alert
response = None
response = process_cex_action() or process_dex_action()
return response
@log_execution_time()
@retry_on_exception()
def process_telegram(self: object):
"""Sends a Telegram notification based on the request data."""
if self.json.get("telegram") and self.json.get("telegram").get("message"):
message_text = self.json["telegram"]["message"]
self.send_msg(message_text)
self.log.info(f"Telegram message sent: {message_text}")
@log_execution_time()
@retry_on_exception()
def process_homeassistant(self: object):
"""Calls a Home Assistant service based on the request data."""
if self.json.get("homeassistant") and self.json.get("homeassistant").get("entity_id"):
entity_id = self.json["homeassistant"]["entity_id"]
service = self.json.get("homeassistant").get("service")
data = self.json.get("homeassistant").get("data")
response = send_to_homeassistant(entity_id, service, data)
self.log.info(f"Home Assistant service called: {entity_id}.{service} - Response: {response}")
def update_success(self: object):
"""Updates the status of the Incoming Request document."""
self.incoming_request.status = "Success"
self.incoming_request.save(ignore_permissions=True)
@frappe.whitelist(allow_guest=True)
def receive_alert(self, request_json=None):
"""Entry point for receiving alerts."""
self.__init__(request_json)
try:
response = {}
if self.exchange_creds and self.exchange_creds.enabled:
for action in self.action:
process_method = getattr(self, f"process_{action}")
response.update(process_method())
self.update_success()
frappe.db.commit()
return frappe.response.update(response)
except Exception as e:
self.log.error(f"Error: {e}")
frappe.msgprint(f"Error: {e}")
# ... (Other existing methods)
if __name__ == "__main__":
# For testing purposes
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment