Last active
July 21, 2024 04:24
-
-
Save jango-blockchained/ac324b5609a1d803299aeda5a759ba27 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import frappe | |
import logging | |
import json | |
import time | |
from frappe import _ | |
import ccxt # type: ignore | |
from frappe.utils.logger import get_logger | |
from .user import send_to_homeassistant | |
from tenacity import retry, stop_after_attempt, wait_exponential # type: ignore | |
from tv_data.tv_data.doctype.datafield.datafield import get_doc_from_user_key | |
from frappe_telegram.client import send_message | |
from frappe.utils.password import get_decrypted_password | |
class HooxAPI: | |
""" | |
The Hoox class processes Incoming Request and handles the required actions. | |
""" | |
client_ip_address = None | |
secret_hash = None | |
data = None | |
log = None | |
def __init__(self: object, request_json: str = None): | |
""" | |
Initializes the Hoox object. It fetches Hoox settings and the request data. | |
""" | |
self.data = request_json or frappe.request.data | |
self.log = get_logger(__name__ if self.cfg and self.cfg.log_type != "Personal" else self.secret_hash) | |
self.log.setLevel(logging.getLevelName(self.cfg.log_level if self.cfg else "INFO")) | |
# Load settings and credentials lazily (on first access) | |
self._cfg = None | |
self._exchange_creds = None | |
self._telegram_creds = None | |
self._homeassistant_creds = None | |
if frappe.local.request.method != "POST" or request_json is None: | |
return frappe.throw("Method not allowed.") | |
self.get_client_ip() | |
if not self.is_valid_ip(): | |
return frappe.throw(f"Invalid IP address. IP: {self.client_ip_address}") | |
self.parse_request_data() | |
def parse_request_data(self: object): | |
self.secret_hash = self.json.get("secret_hash") | |
if not self.secret_hash: | |
return frappe.throw("Secret Hash is required.") | |
self.determine_actions() | |
self.save_incoming_request() | |
def determine_actions(self: object): | |
self.action = [] | |
if self.json.get("action") in ["buy", "sell", "close"]: | |
self.action.append("trade") | |
if self.json.get("cex"): | |
self.action.append("trade_cex") | |
if self.json.get("dex"): | |
self.action.append("trade_dex") | |
if self.json.get("homeassistant") and self.json.get("homeassistant").get("entity_id"): | |
self.action.append("homeassistant") | |
if self.json.get("telegram") and self.json.get("telegram").get("message"): | |
self.action.append("telegram") | |
if self.json.get("datafield") and self.json.get("datafield").get("key") and self.json.get("datafield").get("value"): | |
self.action.append("datafield") | |
return | |
# ----------------------------------------------------------------------------------------- | |
@property | |
def cfg(self: object): | |
if self._cfg is None: | |
self._cfg = frappe.get_single("Hoox Settings") | |
return self._cfg | |
@property | |
def exchange_creds(self: object): | |
if self._exchange_creds is None: | |
self._exchange_creds = self.get_exchange_creds() | |
return self._exchange_creds | |
@property | |
def exchange_connection(self: object, market_type: str, verbose: bool = True, rate_limit: bool = True, price_required: bool = False): | |
if not hasattr(self, "_exchange_connection"): | |
self._exchange_connection = self.get_exchange_connection(market_type, verbose, rate_limit, price_required) | |
return self._exchange_connection | |
@property | |
def telegram_creds(self: object): | |
if self._telegram_creds is None: | |
self._telegram_creds = frappe.get_doc("Telegram User", {"user": self.exchange_creds.user}) if self.exchange_creds else None | |
return self._telegram_creds if self._telegram_creds.enabled else None | |
@property | |
def homeassistant_creds(self: object): | |
if self._homeassistant_creds is None: | |
self._homeassistant_creds = frappe.get_doc("Telegram User", {"user": self.exchange_creds.user}) if self.exchange_creds else None | |
return self._homeassistant_creds if self._homeassistant_creds.enabled else None | |
@property | |
def json(self: object): | |
if not hasattr(self, "_json"): | |
self._json = json.loads(self.data) | |
return self._json | |
# ------------------------------------------------------------------------------------ | |
@frappe.whitelist(allow_guest=True) | |
def log_execution_time(func): | |
log = get_logger(__name__ + '.timer') | |
log.setLevel(logging.INFO) | |
@frappe.whitelist(allow_guest=True) | |
def wrapper(*args, **kwargs): | |
start_time = time.time() | |
result = func(*args, **kwargs) | |
end_time = time.time() | |
duration = end_time - start_time | |
msg = f"Execution time of {func.__name__}: {duration} seconds" | |
log.info(msg) | |
return result | |
return wrapper | |
def retry_on_exception(): | |
cfg = frappe.get_single('Hoox Settings') | |
if cfg.retry_enabled: | |
return retry( | |
stop=stop_after_attempt(cfg.retry_stop_after), | |
wait=wait_exponential( | |
multiplier=cfg.retry_backoff, min=cfg.retry_min, max=cfg.retry_max) | |
) | |
else: | |
def no_retry(func): | |
return func | |
return no_retry | |
# ----------------------------------------------------------------------------------------- | |
def get_client_ip(self): | |
if 'X-Forwarded-For' in frappe.local.request.headers: | |
self.client_ip_address = frappe.local.request.headers.get('X-Forwarded-For').split(', ')[-1] | |
else: | |
self.client_ip_address = frappe.local.request.remote_addr | |
return | |
def is_valid_ip(self: object): | |
return frappe.db.exists("IP Whitelist", {"ip": ["in", [self.client_ip_address, "*"]], "enabled": 1}) | |
# ----------------------------------------------------------------------------------------- | |
def send_msg(self: object, message_text: str): | |
if not self.cfg.telegram_enabled or not self.telegram_creds: | |
return | |
send_message(message_text=message_text, user=self.exchange_creds.user, telegram_user_id=self.telegram_creds.telegram_user_id) | |
def get_exchange_creds(self: object): | |
exchange_creds = frappe.get_doc("Exchange Credential", { | |
"secret_hash": self.secret_hash}) | |
if not exchange_creds: | |
raise Exception( | |
f"No exchange credentials found for secret hash {self.secret_hash}") | |
exchange_creds.api_key = get_decrypted_password( | |
"Exchange Credential", exchange_creds.name, "api_key", False | |
) | |
exchange_creds.api_secret = get_decrypted_password( | |
"Exchange Credential", exchange_creds.name, "api_secret", False | |
) | |
return exchange_creds if exchange_creds.enabled else None | |
def get_exchange_connection(self: object, market_type: str, verbose: bool = True, rate_limit: bool = True, price_required: bool = False): | |
""" | |
Connect via REST to the selected exchange. | |
""" | |
return getattr(ccxt, self.exchange_creds.exchange)({ | |
"apiKey": self.exchange_creds.api_key, | |
"secret": self.exchange_creds.api_secret, | |
"enableRateLimit": rate_limit, | |
"verbose": verbose, | |
"options": { | |
"defaultType": market_type, | |
"test": self.exchange_creds.testnet, | |
"createMarketBuyOrderRequiresPrice": price_required, | |
"createMarketSellOrderRequiresPrice": price_required | |
} | |
}) | |
def save_incoming_request(self: object): | |
self.incoming_request = frappe.get_doc({ | |
"doctype": "Incoming Request", | |
"method": "/".join(act for act in self.action), | |
"params": json.dumps(self.json, indent=4), | |
"status": "Success", | |
}) | |
self.incoming_request.insert(ignore_permissions=True) | |
# ----------------------------------------------------------------------------------------- | |
@log_execution_time() | |
def process_datafield(self: object): | |
"""Updates a datafield document based on the request data.""" | |
def is_name(this): | |
return this.startswith("DATA_") | |
try: | |
df = self.json.get("datafield") | |
if df: | |
_key = df.get("key") | |
key = _key.split("_")[2] if _key and is_name(_key) else _key | |
value = df.get("value", 0) | |
n = df.get("n", None) | |
insert = df.get("insert", False) | |
user = self.exchange_creds.user | |
if key and value: | |
doc = get_doc_from_user_key(user, key, insert) | |
if doc is not None: | |
doc.handle_new_data(value, n, self.json) | |
self.log.info(f"Datafield '{key}' updated successfully.") | |
except frappe.DoesNotExistError as e: | |
self.log.error(f"Datafield not found: {e}") | |
except Exception as e: | |
self.log.error(f"Error processing datafield: {e}") | |
if self.cfg.throw: | |
frappe.throw(f"Error processing datafield: {e}") | |
@log_execution_time() | |
@retry_on_exception() | |
def process_trade(self: object): | |
"""Executes a trade on the exchange based on the request data.""" | |
# Function Definition | |
ORDER_TYPE_FUNCS = { | |
"buy": { | |
"limit": "create_limit_buy_order", | |
"market": "create_market_buy_order", | |
}, | |
"sell": { | |
"limit": "create_limit_sell_order", | |
"market": "create_market_sell_order", | |
}, | |
} | |
def execute_order(action:str, pair:str, price:int|float, quantity:int|float, percent:int|float, order_type:str, market_type:str, leverage:int|float): | |
""" | |
Execute an order on an exchange using CCXT. | |
Returns the order object. | |
""" | |
def handle_error(msg): | |
# frappe.msgprint(msg) | |
frappe.log_error('Order Execution Error', msg, ref.doctype, ref.name) | |
logger.error(msg) | |
return msg | |
try: | |
# Get exchange | |
exchange = self.exchange_connection(market_type) | |
# exchange.verbose = True | |
if credentials.testnet: | |
exchange.set_sandbox_mode(True) | |
response = {} | |
# Set leverage | |
if market_type == "future" and "setLeverage" in exchange.has and leverage is not None and 1 < leverage: # <= exchange.maxLeverage: | |
exchange.set_leverage(leverage, pair) | |
# Check action | |
if action not in ["buy", "sell", "close", None]: | |
raise ValueError(f"Invalid action: {action}") | |
# Execute order | |
if action in ["buy", "sell"]: | |
order_func_name = ORDER_TYPE_FUNCS[action].get(order_type) | |
if order_func_name: | |
order_func = getattr(exchange, order_func_name) | |
if order_type == "limit": | |
response["order"] = order_func(pair, quantity, price) | |
else: | |
response["order"] = order_func(pair, quantity) | |
elif action == "close": | |
all_orders = exchange.fetch_open_orders(pair) | |
response["order"] = [exchange.cancel_order(order["id"]) for order in all_orders] | |
logger.debug(response) | |
return response | |
except ccxt.RequestTimeout as e: | |
msg = f"Request timed out: {str(e)}" | |
handle_error(msg) | |
except ccxt.AuthenticationError as e: | |
msg = f"Authentication error: {str(e)}" | |
handle_error(msg) | |
except ccxt.ExchangeNotAvailable as e: | |
msg = f"Exchange not available: {str(e)}" | |
handle_error(msg) | |
# except ccxt.ExchangeError as e: | |
# msg = f"Exchange error: {str(e)}" | |
# handle_error(msg) | |
except ccxt.BaseError as e: | |
msg = f"Base error in CCXT: {str(e)}" | |
handle_error(msg) | |
except ValueError as e: | |
msg = f"Value error: {str(e)}" | |
handle_error(msg) | |
except AttributeError as e: | |
msg = f"Attribute error: {str(e)}" | |
handle_error(msg) | |
except Exception as e: | |
msg = f"An unexpected error occurred: {str(e)}" | |
handle_error(msg) | |
def handle_alert(self: object): | |
"""Executes a trade order and sends notifications.""" | |
action = self.json.get("action") | |
exchange_id = self.exchange_creds.exchange | |
pair = self.json.get("pair") | |
price = self.json.get("price") | |
quantity = self.json.get("quantity") | |
percent = self.json.get("percent") or False | |
order_type = self.json.get("order_type") or "market" | |
market_order = order_type == "market" | |
# Create a Trade document | |
trade = frappe.new_doc("Trade") | |
trade.exchange = exchange_id | |
trade.symbol = pair | |
trade.action = action | |
trade.order_type = order_type | |
trade.quantity = quantity | |
if price: | |
trade.price = price | |
if percent: | |
trade.percent = percent | |
# Execute the trade order | |
response = execute_order(trade, market_order=market_order) | |
# Update the Trade document with response data | |
trade.update(response) | |
# Send Telegram notification (if successful) | |
if response.get("status") == "success" and self.cfg.telegram_enabled: | |
self.send_msg(f"Trade executed successfully: {action} {pair} (Order ID: {trade.name})") | |
# Save the Trade document | |
trade.save(ignore_permissions=True) | |
self.log.info(f"Trade processed: {trade.name}") | |
return response | |
def process_dex_action(self: object): | |
"""Handles DEX (decentralized exchange) actions.""" | |
required_fields = ["exchange", "action", "contract", "wallet", "secret_hash", "quantity"] | |
if all(field in self.json for field in required_fields): | |
if self.json["order_type"] == "limit" and "price" not in self.json: | |
return frappe.throw("Price field is required for 'limit' order type.") | |
elif self.exchange_creds.enabled: | |
response_alert = self.handle_alert() | |
self.log.info(response_alert) | |
return response_alert | |
def process_cex_action(self: object): | |
"""Handles CEX (centralized exchange) actions.""" | |
required_fields = ["action", "pair", "order_type", "secret_hash", "quantity"] | |
if all(field in self.json for field in required_fields): | |
if self.json["order_type"] == "limit" and "price" not in self.json: | |
return frappe.throw("Price field is required for 'limit' order type.") | |
elif self.exchange_creds.enabled: | |
response_alert = self.handle_alert() | |
self.log.info(response_alert) | |
return response_alert | |
response = None | |
response = process_cex_action() or process_dex_action() | |
return response | |
@log_execution_time() | |
@retry_on_exception() | |
def process_telegram(self: object): | |
"""Sends a Telegram notification based on the request data.""" | |
if self.json.get("telegram") and self.json.get("telegram").get("message"): | |
message_text = self.json["telegram"]["message"] | |
self.send_msg(message_text) | |
self.log.info(f"Telegram message sent: {message_text}") | |
@log_execution_time() | |
@retry_on_exception() | |
def process_homeassistant(self: object): | |
"""Calls a Home Assistant service based on the request data.""" | |
if self.json.get("homeassistant") and self.json.get("homeassistant").get("entity_id"): | |
entity_id = self.json["homeassistant"]["entity_id"] | |
service = self.json.get("homeassistant").get("service") | |
data = self.json.get("homeassistant").get("data") | |
response = send_to_homeassistant(entity_id, service, data) | |
self.log.info(f"Home Assistant service called: {entity_id}.{service} - Response: {response}") | |
def update_success(self: object): | |
"""Updates the status of the Incoming Request document.""" | |
self.incoming_request.status = "Success" | |
self.incoming_request.save(ignore_permissions=True) | |
@frappe.whitelist(allow_guest=True) | |
def receive_alert(self, request_json=None): | |
"""Entry point for receiving alerts.""" | |
self.__init__(request_json) | |
try: | |
response = {} | |
if self.exchange_creds and self.exchange_creds.enabled: | |
for action in self.action: | |
process_method = getattr(self, f"process_{action}") | |
response.update(process_method()) | |
self.update_success() | |
frappe.db.commit() | |
return frappe.response.update(response) | |
except Exception as e: | |
self.log.error(f"Error: {e}") | |
frappe.msgprint(f"Error: {e}") | |
# ... (Other existing methods) | |
if __name__ == "__main__": | |
# For testing purposes | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment