Last active
July 27, 2020 16:19
-
-
Save search5/1b363c394d3ffca04d88523b2af9b8aa to your computer and use it in GitHub Desktop.
LG UPlus XPay 파이썬(Python3.7) 코드(pycurl, chardet 설치 필요)(세션 부분은 flask 의존)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
################################################################ | |
# XPayClient.py | |
# | |
# 이 라이브러리를 사용해서 발생하는 책임은 지지 않습니다. | |
# 이 라이브러리는 LG U+ 전자결제 시스템의 PHP 버전을 파이썬 3.7 기반으로 고친 것입니다. | |
# 이 라이브러리의 원작(PHP 버전) 저작권은 LG U+에 있으며 이 라이브러리의 저작권은 Apache License 입니다. | |
# 이 라이브러리 사용 후 발생되는 모든 책임은 저와 LG U+에 없으므로 유의하시기 바랍니다. | |
# | |
# 변환자: 이지호([email protected]) | |
# Translate Date: 2020.07.22 | |
################################################################ | |
from pathlib import Path | |
from io import StringIO, BytesIO | |
from datetime import datetime | |
from hashlib import sha1, md5 | |
from urllib.parse import urlparse, urlencode | |
from json import json_loads | |
from tempfile import NamedTemporaryFile | |
from shutil import copyfileobj | |
import configparser | |
import pycurl | |
import uuid | |
import string | |
import random | |
from flask import session | |
LGD_USER_AGENT = "XPayClient (4.0.0.0/PHP)" | |
LGD_LOG_FATAL = 0 | |
LGD_LOG_ERROR = 1 | |
LGD_LOG_WARN = 2 | |
LGD_LOG_INFO = 3 | |
LGD_LOG_DEBUG = 4 | |
LGD_ERR_NO_HOME_DIR = "10001" | |
LGD_ERR_NO_MALL_CONFIG = "10002" | |
LGD_ERR_NO_LGDACOM_CONFIG = "10003" | |
LGD_ERR_NO_MID = "10004" | |
LGD_ERR_OUT_OF_MEMORY = "10005" | |
LGD_ERR_NO_SECURE_PROTOCOLS = "10007" | |
LGD_ERR_HTTP_URL = "20001" | |
LGD_ERR_RESOLVE_HOST = "20002" | |
LGD_ERR_RESOLVE_PROXY = "20003" | |
LGD_ERR_CONNECT = "20004" | |
LGD_ERR_WRITE = "20005" | |
LGD_ERR_READ = "20006" | |
LGD_ERR_SEND = "20007" | |
LGD_ERR_RECV = "20008" | |
LGD_ERR_TIMEDOUT = "20009" | |
LGD_ERR_SSL = "20101" | |
LGD_ERR_CURL = "20201" | |
LGD_ERR_JSON_DECODE = "40001" | |
def session_id(): | |
return ''.join([random.choice(string.ascii_letters + string.digits) for i in range(26)]) | |
class XPayClient: | |
def __init__(self, home_dir, mode="real"): | |
self.ch = "" | |
self.debug = False | |
self.bTest = False | |
self.error_msg = "" | |
self.home_dir = "" | |
self.mode = "" | |
self.TX_ID = "" | |
self.MID = "" | |
self.Auth_Code = "" | |
self.config = "" | |
self.Post = {} | |
self.response_json = "" | |
self.response_array = "" | |
self.response_code = "" | |
self.response_msg = "" | |
self.log_file = "" | |
self.err_label = ("FATAL", "ERROR", "WARN ", "INFO ", "DEBUG") | |
self.INFO = ("LGD_TXID", "LGD_AUTHCODE", "LGD_MID", "LGD_OID", "LGD_TXNAME", "LGD_PAYKEY", "LGD_RESPCODE", "LGD_RESPMSG") | |
self.DEBUG = ("LGD_TXID", "LGD_AUTHCODE", "LGD_MID", "LGD_TID", "LGD_OID", "LGD_PAYTYPE", "LGD_PAYDATE", "LGD_TXNAME", "LGD_PAYKEY", "LGD_RESPCODE", "LGD_RESPMSG") | |
# CURL Error Code, Message | |
self.curl_errno = -99 | |
self.curl_error = "" | |
self.home_dir = home_dir | |
path_mall_conf = Path(self.home_dir) / "conf/mall.conf" | |
path_lgdacom_conf = Path(self.home_dir) / "conf/lgdacom.conf" | |
if not Path(self.home_dir).exists(): | |
self.response_code = LGD_ERR_NO_HOME_DIR | |
self.response_msg = f"home_dir [{self.home_dir}] does not exist" | |
trigger_error(self.response_msg, E_USER_ERROR) | |
elif not path_mall_conf.exists(): | |
self.response_code = LGD_ERR_NO_MALL_CONFIG | |
self.response_msg = f"config file [{self.home_dir}/conf/mall.conf] does not exist" | |
trigger_error(self.response_msg, E_USER_ERROR) | |
elif not path_lgdacom_conf.exists(): | |
self.response_code = LGD_ERR_NO_LGDACOM_CONFIG | |
self.response_msg = f"config file [{self.home_dir}/conf/lgdacom.conf] does not exist" | |
trigger_error(self.response_msg, E_USER_ERROR) | |
array1 = configparser.ConfigParser() | |
array2 = configparser.ConfigParser() | |
dummy_mall = StringIO() | |
dummy_mall.write("[DEFAULT]\n" + open(path_mall_conf, encoding="euc-kr").read()) | |
dummy_lgdacom = StringIO() | |
dummy_lgdacom.write("[DEFAULT]\n" + open(path_lgdacom_conf, encoding="euc-kr").read()) | |
array1.read_string(dummy_mall.getvalue()) | |
array2.read_string(dummy_lgdacom.getvalue()) | |
self.config = dict(array1['DEFAULT'].items()) | |
self.config.update(dict(array2['DEFAULT'].items())) | |
self.log_file = Path(self.config["log_dir"]) / f"log_{datetime.now():%Y%m%d}.log" | |
Path(self.config["log_dir"]).mkdir("0o777", True, True) | |
self.log(f"XPayClient initialize [{self.home_dir}] [{mode}]", LGD_LOG_INFO) | |
if mode.lower() == "test": | |
self.bTest = True | |
self.debug = False | |
self.init() | |
def init(self): | |
self.ch = pycurl.Curl() | |
self.ch.setopt(pycurl.USERAGENT, LGD_USER_AGENT) | |
self.ch.setopt(pycurl.FOLLOWLOCATION, True) | |
self.ch.setopt(pycurl.ENCODING, 'gzip, deflate') | |
if self.config["verify_cert"] == 0: | |
self.ch.setopt(pycurl.SSL_VERIFYPEER, 0) | |
self.log("Do not verify server Certificate", LGD_LOG_WARN) | |
if self.config["verify_host"] == 0: | |
self.ch.setopt(pycurl.SSL_VERIFYHOST, 0) | |
self.log("Do not verify host Domain", LGD_LOG_WARN) | |
# TLS 설정 | |
# ; 512 (TLS1.1) , 2048 (TLS1.2) , 2560 (TLS1.0) | |
# CURL_SSLVERSION_DEFAULT (0), CURL_SSLVERSION_TLSv1 (1), CURL_SSLVERSION_SSLv2 (2), | |
# CURL_SSLVERSION_SSLv3 (3), CURL_SSLVERSION_TLSv1_0 (4), CURL_SSLVERSION_TLSv1_1 (5) or CURL_SSLVERSION_TLSv1_2 (6). | |
if self.config['default_secure_protocols'] == "512": | |
self.ch.setopt(pycurl.SSLVERSION, 5) | |
# print(f"default_secure_protocols = {self.config['default_secure_protocols']}", end="<br>") | |
elif self.config['default_secure_protocols'] == "2048": | |
self.ch.setopt(pycurl.SSLVERSION, 6) | |
# print("default_secure_protocols = {self.config['default_secure_protocols']}", end="<br>") | |
elif self.config['default_secure_protocols'] == "2560": | |
self.ch.setopt(pycurl.SSLVERSION, 4) | |
# print("default_secure_protocols = {self.config['default_secure_protocols']}", end="<br>") | |
else: | |
self.ch.setopt(pycurl.SSLVERSION, 0) | |
# print("default_secure_protocols = {self.config['default_secure_protocols']}", end="<br>") | |
self.ch.setopt(pycurl.CAINFO, Path(self.home_dir / "conf/ca-bundle.crt").resolve()) | |
def IsAcceptLog(self, ParamName, LogLevel): | |
if LGD_LOG_DEBUG == LogLevel: | |
if ParamName in self.DEBUG: | |
return True | |
elif LGD_LOG_INFO == LogLevel: | |
if ParamName in self.INFO: | |
return True | |
return False | |
def Init_TX(self, MID): | |
if self.config[MID] == None: | |
self.response_code = LGD_ERR_NO_MID | |
self.response_msg = f"Key for MID [{MID}] does not exist in mall.conf" | |
self.log(self.response_msg, LGD_LOG_FATAL) | |
return False | |
self.TX_ID = self.Gen_TX_ID(MID) | |
self.MID = MID | |
self.Auth_Code = self.Gen_Auth_Code(self.TX_ID, MID) | |
self.Post = {"LGD_TXID": self.TX_ID, "LGD_AUTHCODE": self.Auth_Code, "LGD_MID": MID} | |
return True | |
def GenerateGUID(self): | |
return str(uuid.uuid4()).replace("-", "") | |
def Get_Unique(self): | |
if 'tx_counter' in session: | |
session['tx_counter'] += 1 | |
else: | |
session['tx_counter'] = 1 | |
return session_id() + session['tx_counter'] + self.GenerateGUID() | |
def Gen_TX_ID(self, MID): | |
header = f"{MID}-{self.config['server_id']}-{datetime.now():%Y%m%d%H%M%S}" | |
return header + sha1(f"{header}{self.Get_Unique()}".encode("utf-8")).hexdigest() | |
def Gen_Auth_Code(self, tx_id, MID): | |
return sha1(f"{tx_id}{self.config[MID]}".encode("utf-8")).hexdigest() | |
def Set(self, name, value): | |
self.Post[name] = value | |
def Check_Auto_Rollback(self, errno): | |
# CURLE_OPERATION_TIMEDOUT | |
if errno == 28: | |
return True | |
else: | |
return False | |
def set_curl_error(self): | |
# 1: CURLE_UNSUPPORTED_PROTOCOL | |
# 3: CURLE_URL_MALFORMAT | |
# 6: CURLE_COULDNT_RESOLVE_HOST | |
# 5: CURLE_COULDNT_RESOLVE_PROXY | |
# 7: CURLE_COULDNT_CONNECT | |
# 23: CURLE_WRITE_ERROR | |
# 26: CURLE_READ_ERROR | |
# 55: CURLE_SEND_ERROR | |
# 56: CURLE_RECV_ERROR | |
# 27: CURLE_OUT_OF_MEMORY | |
# 28: CURLE_OPERATION_TIMEDOUT | |
# 35: CURLE_SSL_CONNECT_ERROR | |
# 51: CURLE_PEER_FAILED_VERIFICATION | |
# 53: CURLE_SSL_ENGINE_NOTFOUND | |
# 54: CURLE_SSL_ENGINE_SETFAILED | |
# 58: CURLE_SSL_CERTPROBLEM | |
# 59: CURLE_SSL_CIPHER | |
# 60: CURLE_SSL_CACERT | |
# 64: CURLE_USE_SSL_FAILED | |
# 66: CURLE_SSL_ENGINE_INITFAILED | |
# 77: CURLE_SSL_CACERT_BADFILE | |
# 80: CURLE_SSL_SHUTDOWN_FAILED | |
# 82: CURLE_SSL_CRL_BADFILE | |
# 83: CURLE_SSL_ISSUER_ERROR | |
errors = { | |
(1, 3): { | |
"response_code": LGD_ERR_HTTP_URL, | |
"response_msg": "URL error" | |
}, | |
(6,): { | |
"response_code": LGD_ERR_RESOLVE_HOST, | |
"response_msg": "Resolve host error" | |
}, | |
(5,): { | |
"response_code": LGD_ERR_RESOLVE_PROXY, | |
"response_msg": "Resolve proxy error" | |
}, | |
(7,): { | |
"response_code": LGD_ERR_CONNECT, | |
"response_msg": "Could not connect error" | |
}, | |
(23,): { | |
"response_code": LGD_ERR_WRITE, | |
"response_msg": "Write error" | |
}, | |
(26,): { | |
"response_code": LGD_ERR_READ, | |
"response_msg": "Read error" | |
}, | |
(55,): { | |
"response_code": LGD_ERR_SEND, | |
"response_msg": "Send error" | |
}, | |
(56,): { | |
"response_code": LGD_ERR_RECV, | |
"response_msg": "Recv error" | |
}, | |
(27,): { | |
"response_code": LGD_ERR_OUT_OF_MEMORY, | |
"response_msg": "Out of memory error" | |
}, | |
(28,): { | |
"response_code": LGD_ERR_TIMEDOUT, | |
"response_msg": "Timeout error" | |
}, | |
(35, 51, 53, 54, 58, 59, 60, 64, 66, 77, 80, 82, 83): { | |
"response_code": LGD_ERR_SSL, | |
"response_msg": "SSL error" | |
} | |
} | |
find_errors = filter(lambda entry: self.curl_errno in entry[0], errors.items()) | |
find_errors = list(find_errors) | |
if len(find_errors) > 0: | |
self.response_code = find_errors[1]["response_code"] | |
self.response_msg = find_errors[1]["response_msg"] | |
else: | |
self.response_code = LGD_ERR_CURL | |
self.response_msg = "CURL error" | |
self.response_msg += f"; cURL error code = {self.curl_errno} msg = {self.curl_error}" | |
def TX(self, bRollbackOnError=True): | |
bTX = True | |
bRollback = False | |
strRollbackReason = "" | |
bReporting = False | |
bCheckURL = False | |
strReportStatus = "" | |
strReportMsg = "" | |
if bRollbackOnError: | |
if self.config['auto_rollback'] == 0: | |
bRollbackOnError = False | |
if self.bTest: | |
url = self.config['test_url'] | |
else: | |
url = self.config['url'] | |
Protocol = urlparse(url).scheme | |
if Protocol == "": | |
url = f"https://{url}" | |
bCheckURL = True | |
elif Protocol == "http": | |
bCheckURL = False | |
elif Protocol == "https": | |
bCheckURL = True | |
if bCheckURL == True: | |
# 여기에서 에러가 떨어지면 잡아오면 됨.. | |
result = self.send_post_data(url, self.Post, None, self.config['timeout']) | |
else: | |
result = False | |
bRollback = False | |
bReporting = False | |
self.response_code = LGD_ERR_HTTP_URL | |
self.response_msg = "http protocol not supported." | |
self.log(f"TX failed: res code = {self.response_code}; msg = {self.response_msg}", LGD_LOG_FATAL) | |
if result == False: | |
if bCheckURL == True: | |
bTX = False | |
self.set_curl_error() | |
self.log(f"TX failed: res code = {self.response_code}; msg = {self.response_msg}", LGD_LOG_FATAL) | |
if bRollbackOnError and self.Check_Auto_Rollback(self.curl_errno): | |
bRollback = True | |
strRollbackReason = "Timeout" | |
else: | |
bTX = False | |
else: | |
http_res_code = self.ch.getinfo(pycurl.HTTP_CODE) | |
if http_res_code in range(200, 301): | |
# http response error | |
bTX = False | |
self.response_code = f"{30000 + http_res_code}" | |
self.response_msg = f"HTTP response code = {http_res_code}" | |
self.log(f"TX failed: res code = {self.response_code}; msg = {self.response_msg}", LGD_LOG_FATAL) | |
# report | |
bReporting = True | |
strReportStatus = f"HTTP response {http_res_code}" | |
strReportMsg = result | |
if bRollbackOnError and http_res_code >= 500: | |
# rollback | |
bRollback = True | |
strRollbackReason = f"HTTP {http_res_code}" | |
else: | |
self.response_json = result | |
self.response_array = json_loads(self.response_json) | |
if (self.response_array == False) or (len(self.response_array["LGD_RESPCODE"]) == 0): | |
# JSON decode failed | |
bTX = False | |
bReporting = True | |
strReportStatus = "JSON decode fail" | |
strReportMsg = result | |
if bRollbackOnError: | |
bRollback = True | |
strRollbackReason = "JSON decode fail" | |
self.log("JSON Decode failed", LGD_LOG_ERROR) | |
self.response_array = {} | |
self.response_code = LGD_ERR_JSON_DECODE | |
self.response_msg = "JSON Decode Failed" | |
else: | |
self.response_code = self.response_array["LGD_RESPCODE"] | |
if self.config['output_UTF8'] == 1: | |
self.response_msg = self.response_array["LGD_RESPMSG"] | |
else: | |
# TODO 아래 코드는 원래 iconv를 사용해 utf-8 문자열을 euc-kr로 바꾸는 코드였다.(어차피 로그 남기는 건데 뭐..) | |
self.response_msg = self.response_array["LGD_RESPMSG"].decode("utf-8") | |
# TODO 아래 코드는 원래 iconv를 사용해 utf-8 문자열을 euc-kr로 바꾸는 코드였다.(어차피 로그 남기는 건데 뭐..) | |
self.log(f"Response Code=[{self.response_code}], Msg=[{self.response_array["LGD_RESPMSG"].decode('utf-8')}], Count={self.Response_Count()}", LGD_LOG_INFO) | |
keys = self.Response_Names() | |
for i in range(self.Response_Count()): | |
for name in keys: | |
if self.IsAcceptLog(name, LGD_LOG_DEBUG): | |
self.log(f"Response ({name}, {i}) = {self.Response(name, i)}", LGD_LOG_DEBUG) | |
if bRollback: | |
# try auto-rollback | |
tx_id = self.TX_ID | |
code = self.response_code | |
msg = self.response_msg | |
self.init() | |
self.Rollback(strRollbackReason) | |
# restore previous info | |
self.TX_ID = tx_id | |
self.response_code = code | |
self.response_msg = msg | |
if bReporting: | |
self.Report(strReportStatus, strReportMsg) | |
return bTX | |
def Report_TX(self): | |
url = self.config['aux_url'] | |
result = self.send_post_data(url, self.Post, None, self.config['timeout']) | |
if result == False: | |
self.log(f"Reporting failed: res code = {self.response_code}; msg = {self.response_msg}", LGD_LOG_ERROR) | |
return False | |
http_res_code = self.ch.getinfo(pycurl.HTTP_CODE) | |
if http_res_code in range(200, 301): | |
# http response error | |
self.log(f"Reporting failed: HTTP response code = {http_res_code}", LGD_LOG_ERROR) | |
return False | |
response_array = json_loads(result) | |
if response_array == False: | |
# JSON decode failed | |
self.log("Report JSON Decode failed", LGD_LOG_ERROR) | |
return False | |
response_code = response_array["LGD_RESPCODE"] | |
# 아래 코드는 원래 UTF-8 문자열을 EUC-KR로 바꾸는 코드임(TODO) | |
response_msg = response_array["LGD_RESPMSG"].decode("utf-8") | |
self.log(f"Report Response Code=[{response_code}], Msg=[{response_msg}]", LGD_LOG_INFO) | |
return True | |
def Patch_TX(self, filename): | |
url = self.config['aux_url'] | |
fp = NamedTemporaryFile("w+t", prefix="tmp", dir=(Path(self.home_dir) / "conf")) | |
result = self.post_into_file(url, self.Post, fp, None, self.config['timeout']) | |
fp.seek(0) | |
if result == False: | |
fp.close() | |
self.log(f"Patch failed: file = {filename}", LGD_LOG_ERROR) | |
return False | |
else: | |
self.log(f"Patch success: file = {filename}", LGD_LOG_INFO) | |
type_ = self.ch.getinfo(pycurl.CONTENT_TYPE) | |
if type_[:10].lower() == "text/plain": | |
self.log(f"Patch success: Content-Type = {type}", LGD_LOG_INFO) | |
copyfileobj(fp, Path(self.home_dir + "/conf/" + filename).open("w+t")) | |
fp.close() | |
return True | |
else: | |
# error | |
content = fp.read(512) | |
self.Report(f"Patch error : {filename}", content) | |
fp.close() | |
return False | |
def Rollback(self, reason): | |
RBTX = self.TX_ID | |
self.Init_TX(self.MID) | |
self.Set("LGD_TXNAME", "Rollback") | |
self.Set("LGD_RB_TXID", RBTX) | |
self.Set("LGD_RB_REASON", reason) | |
if self.TX(False) == False: | |
return False | |
if self.response_code == "0000": | |
return True | |
return False | |
def Report(self, status, msg): | |
if self.config['report_error'] != 1: | |
return False | |
self.Init_TX(self.MID) | |
self.Set("LGD_TXNAME", "Report") | |
self.Set("LGD_STATUS", status) | |
self.Set("LGD_MSG", msg) | |
return self.Report_TX() | |
def Patch(self, filename): | |
self.Init_TX(self.MID) | |
self.Set("LGD_TXNAME", "Patch") | |
self.Set("LGD_FILE", filename) | |
return self.Patch_TX(filename) | |
def Response_Json(self): | |
return self.response_json | |
def Response_Count(self): | |
if self.response_array["LGD_RESPONSE"] == None: | |
return 0 | |
return len(self.response_array["LGD_RESPONSE"]) | |
def Response_Code(self): | |
return self.response_code | |
def Response_Msg(self): | |
return self.response_msg | |
def Response_Names(self): | |
if not self.Response_Count(): | |
return None | |
return self.response_array["LGD_RESPONSE"][0].keys() | |
def Response(self, name, index=0): | |
if self.config['output_UTF8'] == 1: | |
return self.response_array["LGD_RESPONSE"][index][name] | |
else: | |
# TODO: 아래 코드는 원래 utf-8을 euc-kr로 변환하는 코드였음(근데 필요한가?? euc-kr 환경에선 테스트 안해봄) | |
return self.response_array["LGD_RESPONSE"][index][name].decode("utf-8") | |
def log(self, msg, level=LGD_LOG_FATAL): | |
if level > self.config["log_level"]: | |
return False | |
if 'chardet' in locals().keys(): | |
if (chardet.detect(msg)['encoding'] == "UTF-8" and self.config['output_UTF8'] == 0): | |
# 아래 msg는 utf-8을 euc-kr로 바꾸는 코드였음 | |
err_msg = datetime.now().strftime("%Y%m%d %H:%M:%S") + f" [{self.err_label[level]}] [{self.TX_ID}] {msg.decode('utf-8')}\n" | |
else: | |
err_msg = datetime.now().strftime("%Y%m%d %H:%M:%S") + f" [{self.err_label[level]}] [{self.TX_ID}] {msg}\n" | |
else: | |
err_msg = datetime.now().strftime("%Y%m%d %H:%M:%S") + f" [{self.err_label[level]}] [{self.TX_ID}] {msg}\n" | |
print(err_msg. end='', file=self.log_file) | |
def set_credentials(self, username, password): | |
self.ch.setopt(pycurl.USERPWD, f"{username}:{password}") | |
def set_referrer(self, referrer_url): | |
self.ch.setopt(pycurl.REFERER, referrer_url) | |
def set_user_agent(self, useragent): | |
self.ch.setopt(pycurl.USERAGENT, useragent) | |
def include_response_headers(self, value): | |
self.ch.setopt(pycurl.HEADER, value) | |
def set_proxy(self, proxy): | |
self.ch.setopt(pycurl.PROXY, proxy) | |
def perform(self): | |
ret = {'data': ''} | |
buffer = BytesIO() | |
self.ch.setopt(pycurl.WRITEDATA, buffer) | |
try: | |
self.ch.perform() | |
except pycurl.error as e: | |
self.curl_errno = ret['errno'] = a.args[0] | |
self.curl_error = ret['error'] = a.args[1] | |
finally: | |
ret['data'] = buffer.getvalue() | |
return ret | |
def send_post_data(self, url, postdata, ip=None, timeout=10): | |
self.ch.setopt(pycurl.URL, url) | |
self.ch.setopt(pycurl.RETURNTRANSFER, True) | |
if ip: | |
if self.debug: | |
print(f"Binding to ip {ip}\n") | |
self.ch.setopt(pycurl.INTERFACE, ip) | |
self.ch.setopt(pycurl.TIMEOUT, timeout) | |
self.ch.setopt(pycurl.POST, True) | |
post_array = {} | |
if type(postdata) == dict: | |
for key, value in postdata.items(): | |
post_array[key] = value | |
if (self.IsAcceptLog(key, LGD_LOG_DEBUG)): | |
self.log(f"Post [{key}] = [{value}]", LGD_LOG_DEBUG) | |
post_string = urlencode(postdata) | |
else: | |
post_string = postdata | |
self.ch.setopt(pycurl.POSTFIELDS, post_string) | |
ret = self.perform() | |
if ret['errno'] > -1: | |
if self.debug: | |
print("Error Occured in Curl\n") | |
print(f"Error number: {ret['errno']}\n" | |
print(f"Error message: {ret['error']}\n" | |
return False | |
else: | |
return ret['data'] | |
def fetch_url(self, url, ip=None, timeout=5): | |
self.ch.setopt(pycurl.URL, url) | |
self.ch.setopt(pycurl.HTTPGET, True) | |
self.ch.setopt(pycurl.RETURNTRANSFER, True) | |
if ip: | |
if self.debug: | |
print(f"Binding to ip {ip}\n") | |
self.ch.setopt(pycurl.INTERFACE, ip) | |
self.ch.setopt(pycurl.TIMEOUT, timeout) | |
ret = self.perform() | |
if ret['errno'] > -1: | |
if self.debug: | |
print("Error Occured in Curl\n") | |
print(f"Error number: {ret['errno']}\n" | |
print(f"Error message: {ret['error']}\n" | |
def fetch_into_file(self, url, fp, ip=None, timeout=5): | |
self.ch.setopt(pycurl.URL, url) | |
self.ch.setopt(pycurl.HTTPGET, True) | |
self.ch.setopt(pycurl.FILE, fp) | |
if ip: | |
if self.debug: | |
print(f"Binding to ip {ip}\n") | |
self.ch.setopt(pycurl.INTERFACE, ip) | |
self.ch.setopt(pycurl.TIMEOUT, timeout) | |
ret = self.perform() | |
if ret['errno'] > -1: | |
if self.debug: | |
print("Error Occured in Curl\n") | |
print(f"Error number: {ret['errno']}\n" | |
print(f"Error message: {ret['error']}\n" | |
return False | |
else: | |
return True | |
def post_into_file(self, url, postdata, fp, ip=None, timeout=10): | |
self.ch.setopt(pycurl.URL, url) | |
self.ch.setopt(pycurl.FILE, fp) | |
if ip: | |
if self.debug: | |
print(f"Binding to ip {ip}\n") | |
self.ch.setopt(pycurl.INTERFACE, ip) | |
self.ch.setopt(pycurl.TIMEOUT, timeout) | |
self.ch.setopt(pycurl.POST, True) | |
post_array = {} | |
if type(postdata) == dict: | |
for key, value in postdata.items(): | |
post_array[key] = value | |
if (self.IsAcceptLog(key, LGD_LOG_DEBUG)): | |
self.log(f"Post [{key}] = [{value}]", LGD_LOG_DEBUG) | |
post_string = urlencode(postdata) | |
if self.debug: | |
print(f"Url: {url}\nPost String: {post_string}\n") | |
else: | |
post_string = postdata | |
self.ch.setopt(pycurl.POSTFIELDS, post_string) | |
ret = self.perform() | |
if ret['errno'] > -1: | |
if self.debug: | |
print("Error Occured in Curl\n") | |
print(f"Error number: {ret['errno']}\n" | |
print(f"Error message: {ret['error']}\n" | |
return False | |
else: | |
return ret['data'] | |
def send_multipart_post_data(self, url, postdata, file_field_array={}, ip=None, timeout=30): | |
self.ch.setopt(pycurl.URL, url) | |
self.ch.setopt(pycurl.RETURNTRANSFER, True) | |
if ip: | |
if self.debug: | |
print(f"Binding to ip {ip}\n") | |
self.ch.setopt(pycurl.INTERFACE, ip) | |
self.ch.setopt(pycurl.TIMEOUT, timeout) | |
self.ch.setopt(pycurl.POST, True) | |
headers = ("Expect: ",) | |
self.ch.setopt(pycurl.HTTPHEADER, headers) | |
post_array = {} | |
post_string_array = {} | |
if not (type(postdata) == dict): | |
return False | |
for key, value in postdata.items(): | |
post_array[key] = value | |
post_string_array[key] = value | |
post_string = urlencode(post_string_array) | |
if self.debug: | |
print(f"Post String: {post_string}\n") | |
if len(file_field_array) > 0: | |
for var_name, var_value in file_field_array.items(): | |
if sys.platform == "win32": | |
# win hack | |
var_value = var_value.replace("/", "\\") | |
file_field_array[var_name] = f"@{var_value}" | |
result_post = post_array.copy() | |
result_post.update(file_field_array) | |
self.ch.setopt(pycurl.POSTFIELDS, result_post) | |
ret = self.perform() | |
if ret['errno'] > -1: | |
if self.debug: | |
print("Error Occured in Curl\n") | |
print(f"Error number: {ret['errno']}\n" | |
print(f"Error message: {ret['error']}\n" | |
return False | |
else: | |
return ret['data'] | |
def store_cookies(self, cookie_file): | |
self.ch.setopt(pycurl.COOKIEJAR, cookie_file) | |
self.ch.setopt(pycurl.COOKIEFILE, cookie_file) | |
def set_cookie(self, cookie): | |
self.ch.setopt(pycurl.COOKIE, cookie) | |
def get_effective_url(self): | |
return curl_getinfo(self.ch, CURLINFO_EFFECTIVE_URL) | |
def get_http_response_code(self): | |
return curl_getinfo(self.ch, CURLINFO_HTTP_CODE) | |
def get_error_msg(self): | |
err = f"Error number: {self.curl_errno}\n" | |
err += f"Error message: {self.curl_error}\n" | |
return err | |
def close(self): | |
self.ch.close() | |
def GetTimeStamp(self): | |
return f"{datetime.now():%Y%m%d%H%M%S}" | |
def GetHashData(self, LGD_MID, LGD_OID, LGD_AMOUNT, LGD_TIMESTAMP): | |
return md5(f"{LGD_MID}{LGD_OID}{LGD_AMOUNT}{LGD_TIMESTAMP}{self.config[LGD_MID]}".encode("utf-8")).hexdigest() | |
def GetHashDataOpenpay(self, LGD_VENDERNO, LGD_MID, LGD_OPENPAY_MER_UID, LGD_OPENPAY_TOKEN, LGD_TIMESTAMP): | |
InputData = f"{LGD_MID}{LGD_VENDERNO}{LGD_OPENPAY_MER_UID}" | |
if len(LGD_OPENPAY_TOKEN) == 0: | |
InputData = f"{InputData}{LGD_TIMESTAMP}{self.config[LGD_MID]}" | |
else: | |
InputData = f"{InputData}{LGD_OPENPAY_TOKEN}{LGD_TIMESTAMP}{self.config[LGD_MID]}" | |
return sha512(InputData.encode("utf-8")).hexdigest() | |
def GetHashDataCas(self, LGD_MID, LGD_OID, LGD_AMOUNT, LGD_RESPCODE, LGD_TIMESTAMP): | |
return md5(f"{LGD_MID}{LGD_OID}{LGD_AMOUNT}{LGD_RESPCODE}{LGD_TIMESTAMP}{self.config[LGD_MID]}".encode("utf-8")).hexdigest() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment