Last active
October 14, 2023 19:34
-
-
Save mbarnes/9f4b95d7c69159f613c91061a2896521 to your computer and use it in GitHub Desktop.
Giant Eagle Digital Coupon Clipper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# | |
# Clip all available digital coupons for a Giant Eagle account. | |
# | |
# To avoid interactive prompts, either set environment variables | |
# GIANT_EAGLE_USERNAME and GIANT_EAGLE_PASSWORD or add credentials | |
# to your ~/.netrc file: | |
# | |
# machine gianteagle.com | |
# login <GIANT_EAGLE_USERNAME> | |
# password <GIANT_EAGLE_PASSWORD> | |
# | |
import base64 | |
import getpass | |
import hashlib | |
import html.parser | |
import http | |
import json | |
import netrc | |
import os | |
import re | |
import urllib | |
# 3rd-party modules | |
import requests | |
# Show HTTP requests and responses | |
http.client.HTTPConnection.debuglevel = 0 | |
USER_AGENT = 'Mozilla/5.0 (X11; Linux x86_64; rv:78.0) Gecko/20100101 Firefox/78.0' | |
class ScriptExtractor(html.parser.HTMLParser): | |
def reset(self): | |
self.tags = [] | |
self.scripts = [] | |
super().reset() | |
def handle_starttag(self, tag, attrs): | |
self.tags.append(tag) | |
def handle_endtag(self, tag): | |
self.tags.pop() | |
def handle_data(self, data): | |
if self.tags and self.tags[-1] == 'script': | |
self.scripts.append(data) | |
def giant_eagle_session_login(method): | |
def inner(session, *args, **kwargs): | |
"""Log in to GiantEagle.com on first call""" | |
if 'authorization' not in session.headers: | |
def extract_settings(response): | |
se = ScriptExtractor() | |
se.feed(response.content.decode('utf-8')) | |
for script in se.scripts: | |
match = re.search('var SETTINGS = ({.*});', script) | |
if match: | |
return json.loads(match.group(1)) | |
return None | |
b2c = { | |
'auth_domain': 'login.accounts.gianteagle.com', | |
'tenant': '7c5d1338-ef50-4a78-9205-1cf197b3bf9a', | |
'sign_in_policy': 'b2c_1a_prod_signup_signin', | |
} | |
client_id = '7288c76e-7f69-4e12-8148-ece4aaa96223' | |
scopes = ['https://geb2c101.onmicrosoft.com/l7/layer7.dataread', | |
'https://geb2c101.onmicrosoft.com/l7/layer7.datawrite'] | |
base_url = 'https://{auth_domain}/{tenant}/{sign_in_policy}'.format(**b2c) | |
url = base_url + '/v2.0/.well-known/openid-configuration' | |
response = session.get(url) | |
response.raise_for_status() | |
openid_configuration = response.json() | |
# Based on https://www.stefaanlippens.net/oauth-code-flow-pkce.html | |
code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode('utf-8') | |
code_verifier = re.sub('[^a-zA-Z0-9]+', '', code_verifier) | |
code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest() | |
code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8') | |
code_challenge = code_challenge.replace('=', '') | |
url = openid_configuration['authorization_endpoint'] | |
params = { | |
'client_id': client_id, | |
'scope': ' '.join(scopes), | |
'redirect_uri': 'https://www.gianteagle.com/', | |
'response_mode': 'fragment', | |
'response_type': 'code', | |
'code_challenge': code_challenge, | |
'code_challenge_method': 'S256', | |
} | |
response = session.get(url, params=params) | |
response.raise_for_status() | |
# Giant Eagle is using the Microsoft identity platform. | |
# | |
# It appears to be a single-page JavaScript app that follows the | |
# the OpenID Connect protocol for authorization, and the OAuth 2.0 | |
# authorization code flow for application grants. | |
# | |
# https://learn.microsoft.com/en-us/azure/active-directory/develop/v2-app-types#single-page-apps-javascript | |
# Send sign in credentials. | |
settings = extract_settings(response) | |
while settings: | |
base_url = urllib.parse.urljoin( | |
settings['hosts']['static'], | |
settings['hosts']['tenant']) | |
headers = { | |
'X-CSRF-TOKEN': settings['csrf'] | |
} | |
params = { | |
'tx': settings['transId'], | |
'p': settings['hosts']['policy'] | |
} | |
data = {'request_type': 'RESPONSE'} | |
resource = settings['remoteResource'] | |
if 'login-' in resource: | |
data['signInName'] = session.email | |
data['password'] = session.password | |
elif 'mobile-verification-1-' in resource: | |
data['extension_mobileNumber'] = '+10000000000' | |
data['mobileFlow'] = 'RemindMeLater' | |
else: | |
raise NotImplementedError('Unknown remoteResource: ' + resource) | |
response = session.post( | |
base_url + '/SelfAsserted', | |
headers=headers, params=params, data=data) | |
response.raise_for_status() | |
params = { | |
'csrf_token': settings['csrf'], | |
'p': settings['hosts']['policy'] | |
} | |
response = session.get( | |
base_url + '/api/' + settings['api'] + '/confirmed', | |
params=params) | |
response.raise_for_status() | |
settings = extract_settings(response) | |
# Extract the authorization code from the URL fragment. | |
fragment = urllib.parse.urlparse(response.url).fragment | |
authorization_code = urllib.parse.parse_qs(fragment).get('code').pop() | |
# Request an access token from the token endpoint. | |
url = openid_configuration['token_endpoint'] | |
scopes.extend(['openid', 'profile', 'offline_access']) | |
params = { | |
'client_id': client_id, | |
'grant_type': 'authorization_code', | |
'code': authorization_code, | |
'code_verifier': code_verifier, | |
'redirect_uri': 'https://www.gianteagle.com/', | |
'scope': ' '.join(scopes) | |
} | |
response = session.post(url, params=params) | |
response.raise_for_status() | |
data = response.json() | |
authorization = '{token_type} {access_token}'.format_map(data) | |
session.headers['authorization'] = authorization | |
return method(session, *args, **kwargs) | |
return inner | |
class GiantEagleSession(requests.Session): | |
"""GiantEagle.com REST API session""" | |
base_url = "https://core.shop.gianteagle.com/api/v2" | |
def __init__(self, base_url=None): | |
if base_url: | |
self.base_url = base_url | |
super().__init__() | |
self.headers['Referer'] = 'https://www.gianteagle.com/' | |
self.headers['User-Agent'] = USER_AGENT | |
self.__get_credentials() | |
def __get_credentials(self): | |
self.email = os.environ.get('GIANT_EAGLE_USERNAME') | |
self.password = os.environ.get('GIANT_EAGLE_PASSWORD') | |
if not (self.email and self.password): | |
try: | |
if auth := netrc.netrc().authenticators('gianteagle.com'): | |
self.email, _, self.password = auth | |
except FileNotFoundError: | |
pass | |
if not (self.email and self.password): | |
print('Giant Eagle Login') | |
self.email = input('Email: ').strip() | |
self.password = getpass.getpass('Password: ').strip() | |
def request(self, method, url, *args, **kwargs): | |
"""Send the request after generating the complete URL""" | |
url = self.create_url(url) | |
return super().request(method, url, *args, **kwargs) | |
def create_url(self, url): | |
"""Create the URL based off this partial path""" | |
return urllib.parse.urljoin(self.base_url, url) | |
@giant_eagle_session_login | |
def clip_coupon(self, coupon): | |
"""Clip a digital coupon""" | |
body = { | |
'operationName': 'clipCouponMutation', | |
'variables': { | |
'input': { | |
'id': coupon['id'], | |
} | |
}, | |
'query': """ | |
mutation clipCouponMutation($input: ClipCouponInput!) { | |
clipCoupon(input: $input) { | |
clientMutationId | |
} | |
} | |
""" | |
} | |
response = self.post('', json=body) | |
response.raise_for_status() | |
errors = response.json().get('errors', []) | |
if not errors: | |
return True | |
print('Error clipping {}:'.format(coupon['id'])) | |
for err in errors: | |
print(' {}: {}'.format(err['code'], err['message'])) | |
return False | |
@giant_eagle_session_login | |
def available_coupons(self): | |
"""List available discount coupons""" | |
body = { | |
'operationName': 'AvailableCouponsContainerQuery', | |
'variables': { | |
'count': 100, | |
}, | |
'query': """ | |
query AvailableCouponsContainerQuery($count: Int, $cursor: String) { | |
coupons( | |
first: $count | |
after: $cursor) | |
{ | |
edges { | |
node { | |
description | |
expiryDate | |
id | |
summary | |
} | |
} | |
pageInfo { | |
endCursor | |
hasNextPage | |
} | |
} | |
} | |
""" | |
} | |
has_next_page = True | |
while has_next_page: | |
response = self.post('', json=body) | |
response.raise_for_status() | |
data = response.json()['data']['coupons'] | |
for item in data['edges']: | |
yield item['node'] | |
body['variables']['cursor'] = data['pageInfo']['endCursor'] | |
has_next_page = data['pageInfo']['hasNextPage'] | |
def main(): | |
with GiantEagleSession() as session: | |
clipped = [] | |
for coupon in session.available_coupons(): | |
if session.clip_coupon(coupon): | |
print('CLIPPED:', coupon['summary'], coupon['description']) | |
clipped.append(coupon) | |
print(len(clipped), 'coupons clipped') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment