This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Callable, Iterator, Union, Optional, List | |
import boto3 | |
def get_named_queries() -> List[dict]: | |
client = boto3.client('athena') | |
query_id_resp = client.list_named_queries( | |
MaxResults=50, # max 50 per page | |
) | |
saved_queries = client.batch_get_named_query(NamedQueryIds=query_id_resp['NamedQueryIds'])['NamedQueries'] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import csv | |
def get_csv_from_s3_as_list_of_dictionaries(bucket_name, filename): | |
s3_obj = boto3.client("s3").get_object(Bucket=bucket_name, Key=filename.lstrip("/"))["Body"] | |
return [{k:v for k, v in row.items()} for row in csv.DictReader(s3_obj.read().decode('utf-8').splitlines(True), skipinitialspace=True)] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import requests | |
def query_ad_interest_api(query, api_type): | |
query_url = "https://graph.facebook.com/v9.0/search?locale=en_US&limit=1000" | |
query_url += '&access_token=' + os.environ['FB_ACCESS_TOKEN'] | |
if api_type.title() == "Suggestion": | |
query_url += '&type=adinterestsuggestion&interest_list=["' + str(query.title()) + '"]' | |
elif api_type.title() == "Search": |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def trim_url(url, **kwargs): | |
if not url: | |
return url | |
if "://" in url: | |
url = url.split("://")[1] | |
if "www." in url: | |
url = url.split("www.", 1) | |
if kwargs.get("remove_subsite"): |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
.aaa | |
.aarp | |
.abarth | |
.abb | |
.abbott | |
.abbvie | |
.abc | |
.able | |
.abogado | |
.abudhabi |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import requests | |
def construct_api_call(date_tuple, structure_level, time_level): | |
url = f"https://graph.facebook.com/v10.0/{os.environ['FB_ACCOUNT_ID']}/insights" | |
url += "?action_attribution_windows=['7d_click','1d_view']" | |
url += f"&time_range[since]={date_tuple[0]}" | |
url += f"&time_range[until]={date_tuple[1]}" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import os | |
adset_id = "WRITE ADSET ID HERE" | |
budget = "WRITE BUDGET HERE" | |
def change_adset_budget(adset_id, budget): | |
url_endpoint = "https://graph.facebook.com/v10.0/" + str(adset_id) | |
url_endpoint += "?daily_budget=" + str(budget) | |
resp = requests.post( |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- sum / case | |
sum(case when col1 is not null then 1 else 0 end) AS count_col1 | |
-- COALESCE cols into one | |
COALESCE(col1, COALESCE(col2, default) -- etc | |
-- GROUP BY / HAVING | |
GROUP BY user_id, log_type | |
HAVING log_type = 'products' |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import os | |
from pprint import pprint | |
def make_sourcestack_query(endpoint, params_dict): | |
querystrings = "?" | |
for k,v in params_dict.items(): | |
querystrings += k + "=" + str(v) + "&" | |
querystrings = querystrings.rstrip("&") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import defaultdict | |
""" | |
Zip is at the dict level - if only some of the dicts in a lod have a key, | |
only resultant dicts with one of their primary_keys will have that given k:v pair | |
When both lods have a given (non-primary) key, the lod_2 value is prioritized. | |
Originally from https://stackoverflow.com/questions/5501810/join-two-lists-of-dictionaries-on-a-single-key | |
""" | |
def zip_lods(lod_1, lod_2, primary_key, **kwargs): |