-
-
Save grepwood/839a58bd1846e02c2cfa72d026bd77a9 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3.6 | |
import requests | |
import json | |
import re | |
from datetime import datetime | |
from datetime import timedelta | |
def token(): | |
headers = { | |
'Accept': 'application/json', | |
'Accept-Language': 'en_US', | |
} | |
data = [ | |
('grant_type', 'client_credentials'), | |
] | |
response = requests.post('https://api.paypal.com/v1/oauth2/token', headers=headers, data=data, auth=( | |
client_id, | |
secret_id)) | |
data = json.loads(response.text) | |
return data['access_token'] | |
def transaction_history(token, start_time, end_time): | |
headers = { | |
'Content-Type': 'application/json', | |
'Authorization': 'Bearer ' + token, | |
} | |
params = ( | |
('start_date', start_time), # dates must look like this: '2018-06-10T23:20:50.52Z' | |
('end_date', end_time), | |
('fields', 'transaction_info, payer_info, cart_info'), #all | |
) | |
results = [] | |
response = requests.get('https://api.paypal.com/v1/reporting/transactions', headers=headers, params=params) | |
if response.status_code != 200: | |
print('transaction_history: failed with HTTP code',response.status_code) | |
return results | |
data = json.loads(response.text) | |
print(data) | |
return 0 | |
for transaction in data['transaction_details']: | |
if "T0006" not in transaction['transaction_info']['transaction_event_code'] \ | |
and bool(transaction['payer_info']['payer_name'])\ | |
and float(transaction['transaction_info']['transaction_amount']['value']) > 0: | |
results.append({ | |
'item': transaction['cart_info']['item_details'][0]['item_name'] if len(transaction['cart_info']['item_details']) > 0 and 'item_name' in transaction['cart_info']['item_details'][0] else '', | |
'date': transaction['transaction_info']['transaction_initiation_date'] if 'transaction_initiation_date' in transaction['transaction_info'] else '', | |
'name': transaction['payer_info']['payer_name']['alternate_full_name'] if 'payer_name' in transaction['payer_info'] and 'alternate_full_name' in transaction['payer_info']['payer_name'] else '', | |
'value': transaction['transaction_info']['transaction_amount']['value'], | |
'currency': transaction['transaction_info']['transaction_amount']['currency_code'] | |
}) | |
return results | |
i = token() | |
time_end = datetime.utcnow() | |
time_start = time_end + timedelta(days=-30) | |
time_start = re.sub(r" +","T",time_start.strftime('%Y-%m-%dT%H:%M:%S.%f'))[:-4] + 'Z' | |
time_end = re.sub(r" +","T",time_end.strftime('%Y-%m-%dT%H:%M:%S.%f'))[:-4] + 'Z' | |
x = transaction_history(i,time_start,time_end) |
@Nosskirneh, sorry, no idea on what would work.
My code is slightly different, but it works just fine...
If the code below (or the above), doesn't work, I would guess it may be a PayPal configuration issue maybe...
Maybe the token is for the sandbox environment, only?
`
import requests
import json
import re
from dateutil import parser
from datetime import datetime
from datetime import timedelta
from backend import db, config
from backend.database.models import PayPal
now = datetime.now()
def token():
headers = {
'Accept': 'application/json',
'Accept-Language': 'en_US',
}
data = [
('grant_type', 'client_credentials'),
]
response = requests.post('https://api.paypal.com/v1/oauth2/token',
headers=headers,
data=data,
auth=(
PAYPAL_ID,
PAYPAL_SECRET))
data = json.loads(response.text)
return data['access_token']
def transaction_history(token, start_time):
time_end = start_time + timedelta(days=30)
start_time = convert_date_format(start_time)
time_end = convert_date_format(time_end)
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + token,
}
params = (
('start_date', start_time), # dates must look like this: '2018-06-10T23:20:50.52Z'
('end_date', time_end),
('fields', 'transaction_info, payer_info, cart_info'), #all
)
response = requests.get('https://api.paypal.com/v1/reporting/transactions', headers=headers, params=params)
if response.status_code != 200:
print('Request failed with HTTP code',response.status_code)
return response
return response
def get_transactions(time_start=datetime(2018,1,1)):
all_transactions = []
while time_start < datetime.utcnow():
print(f'Downloading data from {time_start}...', end='\r')
i = token()
response = transaction_history(i, time_start)
if response.status_code != 200:
return response
break
data = json.loads(response.text)
data = data['transaction_details']
if data is not None:
all_transactions.extend(data)
time_start = time_start + timedelta(days=30)
else:
time_start = time_start + timedelta(days=30)
return all_transactions
def process_data(all_transactions):
print('Starting database inserts...')
entries = []
for transaction in all_transactions:
transaction_info = transaction['transaction_info']
payer_info = transaction['payer_info']
paypal_account_id = transaction_info.get('paypal_account_id', None)
transaction_id = transaction_info.get('transaction_id', None)
transaction_event_code = transaction_info.get('transaction_event_code', None)
transaction_initiation_date = transaction_info.get('transaction_initiation_date', None)
transaction_updated_date = transaction_info.get('transaction_updated_date', None)
transaction_amount = transaction_info.get('transaction_amount', {}).get('value', None)
fee_amount = transaction_info.get('fee_amount', {}).get('value', None)
transaction_status = transaction_info.get('transaction_status', None)
invoice_id = transaction_info.get('invoice_id', None)
custom_field = transaction_info.get('custom_field', None)
account_id = payer_info.get('account_id', None)
email_address = payer_info.get('email_address', None)
address_status = payer_info.get('address_status', None)
payer_status = payer_info.get('payer_status', None)
payer_fname = payer_info.get('payer_name', {}).get('given_name', None)
payer_lname = payer_info.get('payer_name', {}).get('surname', None)
payer_fullname = payer_info.get('payer_name', {}).get('alternate_full_name', None)
if transaction_initiation_date is not None:
transaction_initiation_date = parser.parse(transaction_initiation_date)
else:
transaction_initiation_date = None
if transaction_updated_date is not None:
transaction_updated_date = parser.parse(transaction_updated_date)
else:
transaction_updated_date = None
single_transaction = PayPal(paypal_account_id=paypal_account_id,
transaction_id=transaction_id,
event_date=transaction_event_code,
initiation_date=transaction_initiation_date,
updated_date=transaction_updated_date,
amount_gross=transaction_amount,
amount_fee=fee_amount,
status=transaction_status,
invoice_id=invoice_id,
custom=custom_field,
customer_account=account_id,
customer_email=email_address,
customer_add_status=address_status,
customer_payer_status=payer_status,
customer_fname=payer_fname,
customer_lname=payer_lname,
customer_fullname=payer_fullname,
timestamp_db=now
)
entries.append(single_transaction)
try:
db.session.add_all(entries)
db.session.commit()
db.session.close()
print('Starting database inserts... Done.', end='\r')
return 'Ok'
except Exception as e:
print(e)
return 'Error'
def convert_date_format(date):
date = re.sub(r" +","T",date.strftime('%Y-%m-%dT%H:%M:%S.%f'))[:-4] + 'Z'
return date
def make_history():
results = get_transactions()
response = process_data(results)
if response == 'Ok':
return ('Ok', 200)
else:
return ('Error', 999)
def last_entry():
query = """
SELECT initiation_date
FROM fin_paypal
ORDER BY initiation_date DESC
LIMIT 1
"""
result = db.engine.execute(query).fetchone()
if result is None:
return 0
else:
return result[0]
def update_db():
last_entry = last_entry()
results = get_transactions(last_entry)
response = process_data(results)
if response == 'Ok':
return ('Ok', 200)
else:
return ('Error', 500)
`
@lowercase00 I enabled Transaction Search for my application but I am still getting 403. Any ideas?