Skip to content

Instantly share code, notes, and snippets.

@mbarnes
Last active October 14, 2023 19:34
Show Gist options
  • Save mbarnes/9f4b95d7c69159f613c91061a2896521 to your computer and use it in GitHub Desktop.
Save mbarnes/9f4b95d7c69159f613c91061a2896521 to your computer and use it in GitHub Desktop.
Giant Eagle Digital Coupon Clipper
#!/usr/bin/python3
#
# Clip all available digital coupons for a Giant Eagle account.
#
# To avoid interactive prompts, either set environment variables
# GIANT_EAGLE_USERNAME and GIANT_EAGLE_PASSWORD or add credentials
# to your ~/.netrc file:
#
# machine gianteagle.com
# login <GIANT_EAGLE_USERNAME>
# password <GIANT_EAGLE_PASSWORD>
#
import base64
import getpass
import hashlib
import html.parser
import http
import json
import netrc
import os
import re
import urllib
# 3rd-party modules
import requests
# Show HTTP requests and responses
http.client.HTTPConnection.debuglevel = 0
USER_AGENT = 'Mozilla/5.0 (X11; Linux x86_64; rv:78.0) Gecko/20100101 Firefox/78.0'
class ScriptExtractor(html.parser.HTMLParser):
def reset(self):
self.tags = []
self.scripts = []
super().reset()
def handle_starttag(self, tag, attrs):
self.tags.append(tag)
def handle_endtag(self, tag):
self.tags.pop()
def handle_data(self, data):
if self.tags and self.tags[-1] == 'script':
self.scripts.append(data)
def giant_eagle_session_login(method):
def inner(session, *args, **kwargs):
"""Log in to GiantEagle.com on first call"""
if 'authorization' not in session.headers:
def extract_settings(response):
se = ScriptExtractor()
se.feed(response.content.decode('utf-8'))
for script in se.scripts:
match = re.search('var SETTINGS = ({.*});', script)
if match:
return json.loads(match.group(1))
return None
b2c = {
'auth_domain': 'login.accounts.gianteagle.com',
'tenant': '7c5d1338-ef50-4a78-9205-1cf197b3bf9a',
'sign_in_policy': 'b2c_1a_prod_signup_signin',
}
client_id = '7288c76e-7f69-4e12-8148-ece4aaa96223'
scopes = ['https://geb2c101.onmicrosoft.com/l7/layer7.dataread',
'https://geb2c101.onmicrosoft.com/l7/layer7.datawrite']
base_url = 'https://{auth_domain}/{tenant}/{sign_in_policy}'.format(**b2c)
url = base_url + '/v2.0/.well-known/openid-configuration'
response = session.get(url)
response.raise_for_status()
openid_configuration = response.json()
# Based on https://www.stefaanlippens.net/oauth-code-flow-pkce.html
code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode('utf-8')
code_verifier = re.sub('[^a-zA-Z0-9]+', '', code_verifier)
code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest()
code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8')
code_challenge = code_challenge.replace('=', '')
url = openid_configuration['authorization_endpoint']
params = {
'client_id': client_id,
'scope': ' '.join(scopes),
'redirect_uri': 'https://www.gianteagle.com/',
'response_mode': 'fragment',
'response_type': 'code',
'code_challenge': code_challenge,
'code_challenge_method': 'S256',
}
response = session.get(url, params=params)
response.raise_for_status()
# Giant Eagle is using the Microsoft identity platform.
#
# It appears to be a single-page JavaScript app that follows the
# the OpenID Connect protocol for authorization, and the OAuth 2.0
# authorization code flow for application grants.
#
# https://learn.microsoft.com/en-us/azure/active-directory/develop/v2-app-types#single-page-apps-javascript
# Send sign in credentials.
settings = extract_settings(response)
while settings:
base_url = urllib.parse.urljoin(
settings['hosts']['static'],
settings['hosts']['tenant'])
headers = {
'X-CSRF-TOKEN': settings['csrf']
}
params = {
'tx': settings['transId'],
'p': settings['hosts']['policy']
}
data = {'request_type': 'RESPONSE'}
resource = settings['remoteResource']
if 'login-' in resource:
data['signInName'] = session.email
data['password'] = session.password
elif 'mobile-verification-1-' in resource:
data['extension_mobileNumber'] = '+10000000000'
data['mobileFlow'] = 'RemindMeLater'
else:
raise NotImplementedError('Unknown remoteResource: ' + resource)
response = session.post(
base_url + '/SelfAsserted',
headers=headers, params=params, data=data)
response.raise_for_status()
params = {
'csrf_token': settings['csrf'],
'p': settings['hosts']['policy']
}
response = session.get(
base_url + '/api/' + settings['api'] + '/confirmed',
params=params)
response.raise_for_status()
settings = extract_settings(response)
# Extract the authorization code from the URL fragment.
fragment = urllib.parse.urlparse(response.url).fragment
authorization_code = urllib.parse.parse_qs(fragment).get('code').pop()
# Request an access token from the token endpoint.
url = openid_configuration['token_endpoint']
scopes.extend(['openid', 'profile', 'offline_access'])
params = {
'client_id': client_id,
'grant_type': 'authorization_code',
'code': authorization_code,
'code_verifier': code_verifier,
'redirect_uri': 'https://www.gianteagle.com/',
'scope': ' '.join(scopes)
}
response = session.post(url, params=params)
response.raise_for_status()
data = response.json()
authorization = '{token_type} {access_token}'.format_map(data)
session.headers['authorization'] = authorization
return method(session, *args, **kwargs)
return inner
class GiantEagleSession(requests.Session):
"""GiantEagle.com REST API session"""
base_url = "https://core.shop.gianteagle.com/api/v2"
def __init__(self, base_url=None):
if base_url:
self.base_url = base_url
super().__init__()
self.headers['Referer'] = 'https://www.gianteagle.com/'
self.headers['User-Agent'] = USER_AGENT
self.__get_credentials()
def __get_credentials(self):
self.email = os.environ.get('GIANT_EAGLE_USERNAME')
self.password = os.environ.get('GIANT_EAGLE_PASSWORD')
if not (self.email and self.password):
try:
if auth := netrc.netrc().authenticators('gianteagle.com'):
self.email, _, self.password = auth
except FileNotFoundError:
pass
if not (self.email and self.password):
print('Giant Eagle Login')
self.email = input('Email: ').strip()
self.password = getpass.getpass('Password: ').strip()
def request(self, method, url, *args, **kwargs):
"""Send the request after generating the complete URL"""
url = self.create_url(url)
return super().request(method, url, *args, **kwargs)
def create_url(self, url):
"""Create the URL based off this partial path"""
return urllib.parse.urljoin(self.base_url, url)
@giant_eagle_session_login
def clip_coupon(self, coupon):
"""Clip a digital coupon"""
body = {
'operationName': 'clipCouponMutation',
'variables': {
'input': {
'id': coupon['id'],
}
},
'query': """
mutation clipCouponMutation($input: ClipCouponInput!) {
clipCoupon(input: $input) {
clientMutationId
}
}
"""
}
response = self.post('', json=body)
response.raise_for_status()
errors = response.json().get('errors', [])
if not errors:
return True
print('Error clipping {}:'.format(coupon['id']))
for err in errors:
print(' {}: {}'.format(err['code'], err['message']))
return False
@giant_eagle_session_login
def available_coupons(self):
"""List available discount coupons"""
body = {
'operationName': 'AvailableCouponsContainerQuery',
'variables': {
'count': 100,
},
'query': """
query AvailableCouponsContainerQuery($count: Int, $cursor: String) {
coupons(
first: $count
after: $cursor)
{
edges {
node {
description
expiryDate
id
summary
}
}
pageInfo {
endCursor
hasNextPage
}
}
}
"""
}
has_next_page = True
while has_next_page:
response = self.post('', json=body)
response.raise_for_status()
data = response.json()['data']['coupons']
for item in data['edges']:
yield item['node']
body['variables']['cursor'] = data['pageInfo']['endCursor']
has_next_page = data['pageInfo']['hasNextPage']
def main():
with GiantEagleSession() as session:
clipped = []
for coupon in session.available_coupons():
if session.clip_coupon(coupon):
print('CLIPPED:', coupon['summary'], coupon['description'])
clipped.append(coupon)
print(len(clipped), 'coupons clipped')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment