Skip to content

Instantly share code, notes, and snippets.

@search5
Last active July 27, 2020 16:19
Show Gist options
  • Save search5/1b363c394d3ffca04d88523b2af9b8aa to your computer and use it in GitHub Desktop.
Save search5/1b363c394d3ffca04d88523b2af9b8aa to your computer and use it in GitHub Desktop.
LG UPlus XPay 파이썬(Python3.7) 코드(pycurl, chardet 설치 필요)(세션 부분은 flask 의존)
################################################################
# XPayClient.py
#
# 이 라이브러리를 사용해서 발생하는 책임은 지지 않습니다.
# 이 라이브러리는 LG U+ 전자결제 시스템의 PHP 버전을 파이썬 3.7 기반으로 고친 것입니다.
# 이 라이브러리의 원작(PHP 버전) 저작권은 LG U+에 있으며 이 라이브러리의 저작권은 Apache License 입니다.
# 이 라이브러리 사용 후 발생되는 모든 책임은 저와 LG U+에 없으므로 유의하시기 바랍니다.
#
# 변환자: 이지호([email protected])
# Translate Date: 2020.07.22
################################################################
from pathlib import Path
from io import StringIO, BytesIO
from datetime import datetime
from hashlib import sha1, md5
from urllib.parse import urlparse, urlencode
from json import json_loads
from tempfile import NamedTemporaryFile
from shutil import copyfileobj
import configparser
import pycurl
import uuid
import string
import random
from flask import session
LGD_USER_AGENT = "XPayClient (4.0.0.0/PHP)"
LGD_LOG_FATAL = 0
LGD_LOG_ERROR = 1
LGD_LOG_WARN = 2
LGD_LOG_INFO = 3
LGD_LOG_DEBUG = 4
LGD_ERR_NO_HOME_DIR = "10001"
LGD_ERR_NO_MALL_CONFIG = "10002"
LGD_ERR_NO_LGDACOM_CONFIG = "10003"
LGD_ERR_NO_MID = "10004"
LGD_ERR_OUT_OF_MEMORY = "10005"
LGD_ERR_NO_SECURE_PROTOCOLS = "10007"
LGD_ERR_HTTP_URL = "20001"
LGD_ERR_RESOLVE_HOST = "20002"
LGD_ERR_RESOLVE_PROXY = "20003"
LGD_ERR_CONNECT = "20004"
LGD_ERR_WRITE = "20005"
LGD_ERR_READ = "20006"
LGD_ERR_SEND = "20007"
LGD_ERR_RECV = "20008"
LGD_ERR_TIMEDOUT = "20009"
LGD_ERR_SSL = "20101"
LGD_ERR_CURL = "20201"
LGD_ERR_JSON_DECODE = "40001"
def session_id():
return ''.join([random.choice(string.ascii_letters + string.digits) for i in range(26)])
class XPayClient:
def __init__(self, home_dir, mode="real"):
self.ch = ""
self.debug = False
self.bTest = False
self.error_msg = ""
self.home_dir = ""
self.mode = ""
self.TX_ID = ""
self.MID = ""
self.Auth_Code = ""
self.config = ""
self.Post = {}
self.response_json = ""
self.response_array = ""
self.response_code = ""
self.response_msg = ""
self.log_file = ""
self.err_label = ("FATAL", "ERROR", "WARN ", "INFO ", "DEBUG")
self.INFO = ("LGD_TXID", "LGD_AUTHCODE", "LGD_MID", "LGD_OID", "LGD_TXNAME", "LGD_PAYKEY", "LGD_RESPCODE", "LGD_RESPMSG")
self.DEBUG = ("LGD_TXID", "LGD_AUTHCODE", "LGD_MID", "LGD_TID", "LGD_OID", "LGD_PAYTYPE", "LGD_PAYDATE", "LGD_TXNAME", "LGD_PAYKEY", "LGD_RESPCODE", "LGD_RESPMSG")
# CURL Error Code, Message
self.curl_errno = -99
self.curl_error = ""
self.home_dir = home_dir
path_mall_conf = Path(self.home_dir) / "conf/mall.conf"
path_lgdacom_conf = Path(self.home_dir) / "conf/lgdacom.conf"
if not Path(self.home_dir).exists():
self.response_code = LGD_ERR_NO_HOME_DIR
self.response_msg = f"home_dir [{self.home_dir}] does not exist"
trigger_error(self.response_msg, E_USER_ERROR)
elif not path_mall_conf.exists():
self.response_code = LGD_ERR_NO_MALL_CONFIG
self.response_msg = f"config file [{self.home_dir}/conf/mall.conf] does not exist"
trigger_error(self.response_msg, E_USER_ERROR)
elif not path_lgdacom_conf.exists():
self.response_code = LGD_ERR_NO_LGDACOM_CONFIG
self.response_msg = f"config file [{self.home_dir}/conf/lgdacom.conf] does not exist"
trigger_error(self.response_msg, E_USER_ERROR)
array1 = configparser.ConfigParser()
array2 = configparser.ConfigParser()
dummy_mall = StringIO()
dummy_mall.write("[DEFAULT]\n" + open(path_mall_conf, encoding="euc-kr").read())
dummy_lgdacom = StringIO()
dummy_lgdacom.write("[DEFAULT]\n" + open(path_lgdacom_conf, encoding="euc-kr").read())
array1.read_string(dummy_mall.getvalue())
array2.read_string(dummy_lgdacom.getvalue())
self.config = dict(array1['DEFAULT'].items())
self.config.update(dict(array2['DEFAULT'].items()))
self.log_file = Path(self.config["log_dir"]) / f"log_{datetime.now():%Y%m%d}.log"
Path(self.config["log_dir"]).mkdir("0o777", True, True)
self.log(f"XPayClient initialize [{self.home_dir}] [{mode}]", LGD_LOG_INFO)
if mode.lower() == "test":
self.bTest = True
self.debug = False
self.init()
def init(self):
self.ch = pycurl.Curl()
self.ch.setopt(pycurl.USERAGENT, LGD_USER_AGENT)
self.ch.setopt(pycurl.FOLLOWLOCATION, True)
self.ch.setopt(pycurl.ENCODING, 'gzip, deflate')
if self.config["verify_cert"] == 0:
self.ch.setopt(pycurl.SSL_VERIFYPEER, 0)
self.log("Do not verify server Certificate", LGD_LOG_WARN)
if self.config["verify_host"] == 0:
self.ch.setopt(pycurl.SSL_VERIFYHOST, 0)
self.log("Do not verify host Domain", LGD_LOG_WARN)
# TLS 설정
# ; 512 (TLS1.1) , 2048 (TLS1.2) , 2560 (TLS1.0)
# CURL_SSLVERSION_DEFAULT (0), CURL_SSLVERSION_TLSv1 (1), CURL_SSLVERSION_SSLv2 (2),
# CURL_SSLVERSION_SSLv3 (3), CURL_SSLVERSION_TLSv1_0 (4), CURL_SSLVERSION_TLSv1_1 (5) or CURL_SSLVERSION_TLSv1_2 (6).
if self.config['default_secure_protocols'] == "512":
self.ch.setopt(pycurl.SSLVERSION, 5)
# print(f"default_secure_protocols = {self.config['default_secure_protocols']}", end="<br>")
elif self.config['default_secure_protocols'] == "2048":
self.ch.setopt(pycurl.SSLVERSION, 6)
# print("default_secure_protocols = {self.config['default_secure_protocols']}", end="<br>")
elif self.config['default_secure_protocols'] == "2560":
self.ch.setopt(pycurl.SSLVERSION, 4)
# print("default_secure_protocols = {self.config['default_secure_protocols']}", end="<br>")
else:
self.ch.setopt(pycurl.SSLVERSION, 0)
# print("default_secure_protocols = {self.config['default_secure_protocols']}", end="<br>")
self.ch.setopt(pycurl.CAINFO, Path(self.home_dir / "conf/ca-bundle.crt").resolve())
def IsAcceptLog(self, ParamName, LogLevel):
if LGD_LOG_DEBUG == LogLevel:
if ParamName in self.DEBUG:
return True
elif LGD_LOG_INFO == LogLevel:
if ParamName in self.INFO:
return True
return False
def Init_TX(self, MID):
if self.config[MID] == None:
self.response_code = LGD_ERR_NO_MID
self.response_msg = f"Key for MID [{MID}] does not exist in mall.conf"
self.log(self.response_msg, LGD_LOG_FATAL)
return False
self.TX_ID = self.Gen_TX_ID(MID)
self.MID = MID
self.Auth_Code = self.Gen_Auth_Code(self.TX_ID, MID)
self.Post = {"LGD_TXID": self.TX_ID, "LGD_AUTHCODE": self.Auth_Code, "LGD_MID": MID}
return True
def GenerateGUID(self):
return str(uuid.uuid4()).replace("-", "")
def Get_Unique(self):
if 'tx_counter' in session:
session['tx_counter'] += 1
else:
session['tx_counter'] = 1
return session_id() + session['tx_counter'] + self.GenerateGUID()
def Gen_TX_ID(self, MID):
header = f"{MID}-{self.config['server_id']}-{datetime.now():%Y%m%d%H%M%S}"
return header + sha1(f"{header}{self.Get_Unique()}".encode("utf-8")).hexdigest()
def Gen_Auth_Code(self, tx_id, MID):
return sha1(f"{tx_id}{self.config[MID]}".encode("utf-8")).hexdigest()
def Set(self, name, value):
self.Post[name] = value
def Check_Auto_Rollback(self, errno):
# CURLE_OPERATION_TIMEDOUT
if errno == 28:
return True
else:
return False
def set_curl_error(self):
# 1: CURLE_UNSUPPORTED_PROTOCOL
# 3: CURLE_URL_MALFORMAT
# 6: CURLE_COULDNT_RESOLVE_HOST
# 5: CURLE_COULDNT_RESOLVE_PROXY
# 7: CURLE_COULDNT_CONNECT
# 23: CURLE_WRITE_ERROR
# 26: CURLE_READ_ERROR
# 55: CURLE_SEND_ERROR
# 56: CURLE_RECV_ERROR
# 27: CURLE_OUT_OF_MEMORY
# 28: CURLE_OPERATION_TIMEDOUT
# 35: CURLE_SSL_CONNECT_ERROR
# 51: CURLE_PEER_FAILED_VERIFICATION
# 53: CURLE_SSL_ENGINE_NOTFOUND
# 54: CURLE_SSL_ENGINE_SETFAILED
# 58: CURLE_SSL_CERTPROBLEM
# 59: CURLE_SSL_CIPHER
# 60: CURLE_SSL_CACERT
# 64: CURLE_USE_SSL_FAILED
# 66: CURLE_SSL_ENGINE_INITFAILED
# 77: CURLE_SSL_CACERT_BADFILE
# 80: CURLE_SSL_SHUTDOWN_FAILED
# 82: CURLE_SSL_CRL_BADFILE
# 83: CURLE_SSL_ISSUER_ERROR
errors = {
(1, 3): {
"response_code": LGD_ERR_HTTP_URL,
"response_msg": "URL error"
},
(6,): {
"response_code": LGD_ERR_RESOLVE_HOST,
"response_msg": "Resolve host error"
},
(5,): {
"response_code": LGD_ERR_RESOLVE_PROXY,
"response_msg": "Resolve proxy error"
},
(7,): {
"response_code": LGD_ERR_CONNECT,
"response_msg": "Could not connect error"
},
(23,): {
"response_code": LGD_ERR_WRITE,
"response_msg": "Write error"
},
(26,): {
"response_code": LGD_ERR_READ,
"response_msg": "Read error"
},
(55,): {
"response_code": LGD_ERR_SEND,
"response_msg": "Send error"
},
(56,): {
"response_code": LGD_ERR_RECV,
"response_msg": "Recv error"
},
(27,): {
"response_code": LGD_ERR_OUT_OF_MEMORY,
"response_msg": "Out of memory error"
},
(28,): {
"response_code": LGD_ERR_TIMEDOUT,
"response_msg": "Timeout error"
},
(35, 51, 53, 54, 58, 59, 60, 64, 66, 77, 80, 82, 83): {
"response_code": LGD_ERR_SSL,
"response_msg": "SSL error"
}
}
find_errors = filter(lambda entry: self.curl_errno in entry[0], errors.items())
find_errors = list(find_errors)
if len(find_errors) > 0:
self.response_code = find_errors[1]["response_code"]
self.response_msg = find_errors[1]["response_msg"]
else:
self.response_code = LGD_ERR_CURL
self.response_msg = "CURL error"
self.response_msg += f"; cURL error code = {self.curl_errno} msg = {self.curl_error}"
def TX(self, bRollbackOnError=True):
bTX = True
bRollback = False
strRollbackReason = ""
bReporting = False
bCheckURL = False
strReportStatus = ""
strReportMsg = ""
if bRollbackOnError:
if self.config['auto_rollback'] == 0:
bRollbackOnError = False
if self.bTest:
url = self.config['test_url']
else:
url = self.config['url']
Protocol = urlparse(url).scheme
if Protocol == "":
url = f"https://{url}"
bCheckURL = True
elif Protocol == "http":
bCheckURL = False
elif Protocol == "https":
bCheckURL = True
if bCheckURL == True:
# 여기에서 에러가 떨어지면 잡아오면 됨..
result = self.send_post_data(url, self.Post, None, self.config['timeout'])
else:
result = False
bRollback = False
bReporting = False
self.response_code = LGD_ERR_HTTP_URL
self.response_msg = "http protocol not supported."
self.log(f"TX failed: res code = {self.response_code}; msg = {self.response_msg}", LGD_LOG_FATAL)
if result == False:
if bCheckURL == True:
bTX = False
self.set_curl_error()
self.log(f"TX failed: res code = {self.response_code}; msg = {self.response_msg}", LGD_LOG_FATAL)
if bRollbackOnError and self.Check_Auto_Rollback(self.curl_errno):
bRollback = True
strRollbackReason = "Timeout"
else:
bTX = False
else:
http_res_code = self.ch.getinfo(pycurl.HTTP_CODE)
if http_res_code in range(200, 301):
# http response error
bTX = False
self.response_code = f"{30000 + http_res_code}"
self.response_msg = f"HTTP response code = {http_res_code}"
self.log(f"TX failed: res code = {self.response_code}; msg = {self.response_msg}", LGD_LOG_FATAL)
# report
bReporting = True
strReportStatus = f"HTTP response {http_res_code}"
strReportMsg = result
if bRollbackOnError and http_res_code >= 500:
# rollback
bRollback = True
strRollbackReason = f"HTTP {http_res_code}"
else:
self.response_json = result
self.response_array = json_loads(self.response_json)
if (self.response_array == False) or (len(self.response_array["LGD_RESPCODE"]) == 0):
# JSON decode failed
bTX = False
bReporting = True
strReportStatus = "JSON decode fail"
strReportMsg = result
if bRollbackOnError:
bRollback = True
strRollbackReason = "JSON decode fail"
self.log("JSON Decode failed", LGD_LOG_ERROR)
self.response_array = {}
self.response_code = LGD_ERR_JSON_DECODE
self.response_msg = "JSON Decode Failed"
else:
self.response_code = self.response_array["LGD_RESPCODE"]
if self.config['output_UTF8'] == 1:
self.response_msg = self.response_array["LGD_RESPMSG"]
else:
# TODO 아래 코드는 원래 iconv를 사용해 utf-8 문자열을 euc-kr로 바꾸는 코드였다.(어차피 로그 남기는 건데 뭐..)
self.response_msg = self.response_array["LGD_RESPMSG"].decode("utf-8")
# TODO 아래 코드는 원래 iconv를 사용해 utf-8 문자열을 euc-kr로 바꾸는 코드였다.(어차피 로그 남기는 건데 뭐..)
self.log(f"Response Code=[{self.response_code}], Msg=[{self.response_array["LGD_RESPMSG"].decode('utf-8')}], Count={self.Response_Count()}", LGD_LOG_INFO)
keys = self.Response_Names()
for i in range(self.Response_Count()):
for name in keys:
if self.IsAcceptLog(name, LGD_LOG_DEBUG):
self.log(f"Response ({name}, {i}) = {self.Response(name, i)}", LGD_LOG_DEBUG)
if bRollback:
# try auto-rollback
tx_id = self.TX_ID
code = self.response_code
msg = self.response_msg
self.init()
self.Rollback(strRollbackReason)
# restore previous info
self.TX_ID = tx_id
self.response_code = code
self.response_msg = msg
if bReporting:
self.Report(strReportStatus, strReportMsg)
return bTX
def Report_TX(self):
url = self.config['aux_url']
result = self.send_post_data(url, self.Post, None, self.config['timeout'])
if result == False:
self.log(f"Reporting failed: res code = {self.response_code}; msg = {self.response_msg}", LGD_LOG_ERROR)
return False
http_res_code = self.ch.getinfo(pycurl.HTTP_CODE)
if http_res_code in range(200, 301):
# http response error
self.log(f"Reporting failed: HTTP response code = {http_res_code}", LGD_LOG_ERROR)
return False
response_array = json_loads(result)
if response_array == False:
# JSON decode failed
self.log("Report JSON Decode failed", LGD_LOG_ERROR)
return False
response_code = response_array["LGD_RESPCODE"]
# 아래 코드는 원래 UTF-8 문자열을 EUC-KR로 바꾸는 코드임(TODO)
response_msg = response_array["LGD_RESPMSG"].decode("utf-8")
self.log(f"Report Response Code=[{response_code}], Msg=[{response_msg}]", LGD_LOG_INFO)
return True
def Patch_TX(self, filename):
url = self.config['aux_url']
fp = NamedTemporaryFile("w+t", prefix="tmp", dir=(Path(self.home_dir) / "conf"))
result = self.post_into_file(url, self.Post, fp, None, self.config['timeout'])
fp.seek(0)
if result == False:
fp.close()
self.log(f"Patch failed: file = {filename}", LGD_LOG_ERROR)
return False
else:
self.log(f"Patch success: file = {filename}", LGD_LOG_INFO)
type_ = self.ch.getinfo(pycurl.CONTENT_TYPE)
if type_[:10].lower() == "text/plain":
self.log(f"Patch success: Content-Type = {type}", LGD_LOG_INFO)
copyfileobj(fp, Path(self.home_dir + "/conf/" + filename).open("w+t"))
fp.close()
return True
else:
# error
content = fp.read(512)
self.Report(f"Patch error : {filename}", content)
fp.close()
return False
def Rollback(self, reason):
RBTX = self.TX_ID
self.Init_TX(self.MID)
self.Set("LGD_TXNAME", "Rollback")
self.Set("LGD_RB_TXID", RBTX)
self.Set("LGD_RB_REASON", reason)
if self.TX(False) == False:
return False
if self.response_code == "0000":
return True
return False
def Report(self, status, msg):
if self.config['report_error'] != 1:
return False
self.Init_TX(self.MID)
self.Set("LGD_TXNAME", "Report")
self.Set("LGD_STATUS", status)
self.Set("LGD_MSG", msg)
return self.Report_TX()
def Patch(self, filename):
self.Init_TX(self.MID)
self.Set("LGD_TXNAME", "Patch")
self.Set("LGD_FILE", filename)
return self.Patch_TX(filename)
def Response_Json(self):
return self.response_json
def Response_Count(self):
if self.response_array["LGD_RESPONSE"] == None:
return 0
return len(self.response_array["LGD_RESPONSE"])
def Response_Code(self):
return self.response_code
def Response_Msg(self):
return self.response_msg
def Response_Names(self):
if not self.Response_Count():
return None
return self.response_array["LGD_RESPONSE"][0].keys()
def Response(self, name, index=0):
if self.config['output_UTF8'] == 1:
return self.response_array["LGD_RESPONSE"][index][name]
else:
# TODO: 아래 코드는 원래 utf-8을 euc-kr로 변환하는 코드였음(근데 필요한가?? euc-kr 환경에선 테스트 안해봄)
return self.response_array["LGD_RESPONSE"][index][name].decode("utf-8")
def log(self, msg, level=LGD_LOG_FATAL):
if level > self.config["log_level"]:
return False
if 'chardet' in locals().keys():
if (chardet.detect(msg)['encoding'] == "UTF-8" and self.config['output_UTF8'] == 0):
# 아래 msg는 utf-8을 euc-kr로 바꾸는 코드였음
err_msg = datetime.now().strftime("%Y%m%d %H:%M:%S") + f" [{self.err_label[level]}] [{self.TX_ID}] {msg.decode('utf-8')}\n"
else:
err_msg = datetime.now().strftime("%Y%m%d %H:%M:%S") + f" [{self.err_label[level]}] [{self.TX_ID}] {msg}\n"
else:
err_msg = datetime.now().strftime("%Y%m%d %H:%M:%S") + f" [{self.err_label[level]}] [{self.TX_ID}] {msg}\n"
print(err_msg. end='', file=self.log_file)
def set_credentials(self, username, password):
self.ch.setopt(pycurl.USERPWD, f"{username}:{password}")
def set_referrer(self, referrer_url):
self.ch.setopt(pycurl.REFERER, referrer_url)
def set_user_agent(self, useragent):
self.ch.setopt(pycurl.USERAGENT, useragent)
def include_response_headers(self, value):
self.ch.setopt(pycurl.HEADER, value)
def set_proxy(self, proxy):
self.ch.setopt(pycurl.PROXY, proxy)
def perform(self):
ret = {'data': ''}
buffer = BytesIO()
self.ch.setopt(pycurl.WRITEDATA, buffer)
try:
self.ch.perform()
except pycurl.error as e:
self.curl_errno = ret['errno'] = a.args[0]
self.curl_error = ret['error'] = a.args[1]
finally:
ret['data'] = buffer.getvalue()
return ret
def send_post_data(self, url, postdata, ip=None, timeout=10):
self.ch.setopt(pycurl.URL, url)
self.ch.setopt(pycurl.RETURNTRANSFER, True)
if ip:
if self.debug:
print(f"Binding to ip {ip}\n")
self.ch.setopt(pycurl.INTERFACE, ip)
self.ch.setopt(pycurl.TIMEOUT, timeout)
self.ch.setopt(pycurl.POST, True)
post_array = {}
if type(postdata) == dict:
for key, value in postdata.items():
post_array[key] = value
if (self.IsAcceptLog(key, LGD_LOG_DEBUG)):
self.log(f"Post [{key}] = [{value}]", LGD_LOG_DEBUG)
post_string = urlencode(postdata)
else:
post_string = postdata
self.ch.setopt(pycurl.POSTFIELDS, post_string)
ret = self.perform()
if ret['errno'] > -1:
if self.debug:
print("Error Occured in Curl\n")
print(f"Error number: {ret['errno']}\n"
print(f"Error message: {ret['error']}\n"
return False
else:
return ret['data']
def fetch_url(self, url, ip=None, timeout=5):
self.ch.setopt(pycurl.URL, url)
self.ch.setopt(pycurl.HTTPGET, True)
self.ch.setopt(pycurl.RETURNTRANSFER, True)
if ip:
if self.debug:
print(f"Binding to ip {ip}\n")
self.ch.setopt(pycurl.INTERFACE, ip)
self.ch.setopt(pycurl.TIMEOUT, timeout)
ret = self.perform()
if ret['errno'] > -1:
if self.debug:
print("Error Occured in Curl\n")
print(f"Error number: {ret['errno']}\n"
print(f"Error message: {ret['error']}\n"
def fetch_into_file(self, url, fp, ip=None, timeout=5):
self.ch.setopt(pycurl.URL, url)
self.ch.setopt(pycurl.HTTPGET, True)
self.ch.setopt(pycurl.FILE, fp)
if ip:
if self.debug:
print(f"Binding to ip {ip}\n")
self.ch.setopt(pycurl.INTERFACE, ip)
self.ch.setopt(pycurl.TIMEOUT, timeout)
ret = self.perform()
if ret['errno'] > -1:
if self.debug:
print("Error Occured in Curl\n")
print(f"Error number: {ret['errno']}\n"
print(f"Error message: {ret['error']}\n"
return False
else:
return True
def post_into_file(self, url, postdata, fp, ip=None, timeout=10):
self.ch.setopt(pycurl.URL, url)
self.ch.setopt(pycurl.FILE, fp)
if ip:
if self.debug:
print(f"Binding to ip {ip}\n")
self.ch.setopt(pycurl.INTERFACE, ip)
self.ch.setopt(pycurl.TIMEOUT, timeout)
self.ch.setopt(pycurl.POST, True)
post_array = {}
if type(postdata) == dict:
for key, value in postdata.items():
post_array[key] = value
if (self.IsAcceptLog(key, LGD_LOG_DEBUG)):
self.log(f"Post [{key}] = [{value}]", LGD_LOG_DEBUG)
post_string = urlencode(postdata)
if self.debug:
print(f"Url: {url}\nPost String: {post_string}\n")
else:
post_string = postdata
self.ch.setopt(pycurl.POSTFIELDS, post_string)
ret = self.perform()
if ret['errno'] > -1:
if self.debug:
print("Error Occured in Curl\n")
print(f"Error number: {ret['errno']}\n"
print(f"Error message: {ret['error']}\n"
return False
else:
return ret['data']
def send_multipart_post_data(self, url, postdata, file_field_array={}, ip=None, timeout=30):
self.ch.setopt(pycurl.URL, url)
self.ch.setopt(pycurl.RETURNTRANSFER, True)
if ip:
if self.debug:
print(f"Binding to ip {ip}\n")
self.ch.setopt(pycurl.INTERFACE, ip)
self.ch.setopt(pycurl.TIMEOUT, timeout)
self.ch.setopt(pycurl.POST, True)
headers = ("Expect: ",)
self.ch.setopt(pycurl.HTTPHEADER, headers)
post_array = {}
post_string_array = {}
if not (type(postdata) == dict):
return False
for key, value in postdata.items():
post_array[key] = value
post_string_array[key] = value
post_string = urlencode(post_string_array)
if self.debug:
print(f"Post String: {post_string}\n")
if len(file_field_array) > 0:
for var_name, var_value in file_field_array.items():
if sys.platform == "win32":
# win hack
var_value = var_value.replace("/", "\\")
file_field_array[var_name] = f"@{var_value}"
result_post = post_array.copy()
result_post.update(file_field_array)
self.ch.setopt(pycurl.POSTFIELDS, result_post)
ret = self.perform()
if ret['errno'] > -1:
if self.debug:
print("Error Occured in Curl\n")
print(f"Error number: {ret['errno']}\n"
print(f"Error message: {ret['error']}\n"
return False
else:
return ret['data']
def store_cookies(self, cookie_file):
self.ch.setopt(pycurl.COOKIEJAR, cookie_file)
self.ch.setopt(pycurl.COOKIEFILE, cookie_file)
def set_cookie(self, cookie):
self.ch.setopt(pycurl.COOKIE, cookie)
def get_effective_url(self):
return curl_getinfo(self.ch, CURLINFO_EFFECTIVE_URL)
def get_http_response_code(self):
return curl_getinfo(self.ch, CURLINFO_HTTP_CODE)
def get_error_msg(self):
err = f"Error number: {self.curl_errno}\n"
err += f"Error message: {self.curl_error}\n"
return err
def close(self):
self.ch.close()
def GetTimeStamp(self):
return f"{datetime.now():%Y%m%d%H%M%S}"
def GetHashData(self, LGD_MID, LGD_OID, LGD_AMOUNT, LGD_TIMESTAMP):
return md5(f"{LGD_MID}{LGD_OID}{LGD_AMOUNT}{LGD_TIMESTAMP}{self.config[LGD_MID]}".encode("utf-8")).hexdigest()
def GetHashDataOpenpay(self, LGD_VENDERNO, LGD_MID, LGD_OPENPAY_MER_UID, LGD_OPENPAY_TOKEN, LGD_TIMESTAMP):
InputData = f"{LGD_MID}{LGD_VENDERNO}{LGD_OPENPAY_MER_UID}"
if len(LGD_OPENPAY_TOKEN) == 0:
InputData = f"{InputData}{LGD_TIMESTAMP}{self.config[LGD_MID]}"
else:
InputData = f"{InputData}{LGD_OPENPAY_TOKEN}{LGD_TIMESTAMP}{self.config[LGD_MID]}"
return sha512(InputData.encode("utf-8")).hexdigest()
def GetHashDataCas(self, LGD_MID, LGD_OID, LGD_AMOUNT, LGD_RESPCODE, LGD_TIMESTAMP):
return md5(f"{LGD_MID}{LGD_OID}{LGD_AMOUNT}{LGD_RESPCODE}{LGD_TIMESTAMP}{self.config[LGD_MID]}".encode("utf-8")).hexdigest()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment