Created
October 8, 2023 16:01
-
-
Save Robin-Lord/fda8ac325879198903143af461e60363 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tweepy | |
from google.cloud import bigquery | |
import os | |
from datetime import datetime | |
from typing import Dict, List, Tuple, Union, Any, Optional | |
# Load auth from env for security | |
auth: tweepy.OAuthHandler = tweepy.OAuthHandler(os.getenv("consumer_key"), os.getenv("consumer_secret")) | |
# Global flags to control printing | |
print_start: bool = True | |
print_time: bool = True | |
def timeit(method): | |
""" | |
Decorator to print the name of the function when it's called and the time it took to execute. | |
Args: | |
method (function): The function to be timed and printed. | |
Returns: | |
function: Wrapped function that prints its name and execution time. | |
""" | |
def timed(*args, **kw): | |
""" | |
Prints the name of the function, gets the start time, runs the function, | |
finds the end time, then prints the difference in time. | |
Args: | |
*args: Variable length argument list for the original function. | |
**kw: Arbitrary keyword arguments for the original function. | |
Returns: | |
Any: Result from the original method. | |
""" | |
if print_start: | |
print(f"<<{method.__name__}>>") | |
ts: datetime = datetime.now() | |
result = method(*args, **kw) | |
te: datetime = datetime.now() | |
if print_time: | |
print(f"<<{method.__name__}>> time to execute: {te - ts}\n-----------------------------------------") | |
return result | |
return timed | |
@timeit | |
def get_recent_updates(api: object, threshold_time: datetime) -> Tuple[Dict[str, Dict[str, List[str]]], List[str]]: | |
""" | |
Get recent direct messages, filter them by timestamp, and process the contents for updating usernames. | |
Args: | |
api (object): An instance of the API object for making requests. | |
threshold_time (datetime): The time threshold for filtering recent direct messages. | |
Returns: | |
Tuple[Dict[str, Dict[str, List[str]]], List[str]]: | |
- A dictionary of usernames to update with actions. | |
- A list of changing usernames. | |
""" | |
# Get direct messages | |
dms = api.list_direct_messages() | |
# Filter direct messages based on timestamp | |
recent_dms = [dm for dm in dms if datetime.datetime.fromtimestamp(int(dm.created_timestamp) / 1000) > threshold_time] | |
usernames_to_update: Dict[str, Dict[str, List[str]]] = {} | |
for dm in recent_dms: | |
print(">>>") | |
message_body = dm.message_create | |
# Check sender_id to exclude a specific id | |
if message_body["sender_id"] != "id_to_remove": | |
sender_id = message_body["sender_id"] | |
message_contents = message_body["message_data"]["text"].lower() | |
print(message_contents) | |
user_mentions = message_body["message_data"]["entities"]["user_mentions"] | |
names_to_change = [] | |
# Process user mentions in the message | |
if user_mentions: | |
for user_mention in user_mentions: | |
screen_name = user_mention["screen_name"].lower() | |
# Initialize the user if not in the dictionary | |
if screen_name not in usernames_to_update: | |
usernames_to_update[screen_name] = {"add": [], "remove": []} | |
# Check if the message contains stop or remove keywords | |
if "stop" in message_contents or "remove" in message_contents: | |
usernames_to_update[screen_name]["remove"] = list(set(usernames_to_update[screen_name]["remove"] + [sender_id])) | |
message_to_user = f"Thanks, I've got your request to unfollow {screen_name}." | |
message_user(sender_id, message_to_user, api) | |
else: | |
usernames_to_update[screen_name]["add"] = list(set(usernames_to_update[screen_name]["add"] + [sender_id])) | |
message_to_user = f"""Thanks, I've got your request to follow {screen_name}. You can follow a maximum of 20. | |
If you'd like to stop following that name, just send another message containing "stop following @{screen_name}\"""" | |
message_user(sender_id, message_to_user, api) | |
changing_usernames = list(usernames_to_update.keys()) | |
return usernames_to_update, changing_usernames | |
@timeit | |
def get_records(client: object, table_name: str, identifier_column: str) -> Tuple[Dict[str, Dict[str, Any]], List[str]]: | |
""" | |
Fetch records from a BigQuery table and process them into a dictionary and a list. | |
Args: | |
client (object): BigQuery client instance to execute queries. | |
table_name (str): Name of the BigQuery table. | |
identifier_column (str): The column to be used as the main identifier for the dictionary. | |
Returns: | |
Tuple[Dict[str, Dict[str, Any]], List[str]]: | |
- A dictionary of records with identifier_column as key. | |
- A list of identifier values. | |
""" | |
# Constructing the query | |
query = f""" | |
SELECT * | |
FROM {table_name} | |
""" | |
query_job = client.query(query) | |
results = query_job.result() # Waits for job to complete. | |
results_dict: Dict[str, Dict[str, Any]] = {} | |
names_list: List[str] = [] | |
try: | |
for row in results: | |
row_dict = dict(row) | |
col_key = row_dict[identifier_column] | |
results_dict[col_key] = {} | |
names_list.append(col_key) | |
# Iterating through rows to populate results_dict | |
for key, value in row_dict.items(): | |
if key != identifier_column: | |
results_dict[col_key][key] = value | |
except Exception as e: | |
print(table_name) | |
print(row_dict) | |
raise (e) | |
print(results_dict) | |
return results_dict, names_list | |
@timeit | |
def get_to_change(columns_to_check: str, usernames_tuple: Tuple[str, ...]) -> List[Dict[str, Any]]: | |
""" | |
Fetch records from a BigQuery table based on certain columns and usernames. | |
Args: | |
columns_to_check (str): Column name to be used for filtering results. | |
usernames_tuple (Tuple[str, ...]): Tuple containing the usernames to be checked. | |
Returns: | |
List[Dict[str, Any]]: A list of records from the table. | |
""" | |
# Constructing the query using f-strings | |
query = f""" | |
SELECT * | |
FROM twitter_name_change.test_table | |
WHERE {columns_to_check} in {usernames_tuple} | |
""" | |
query_job = client.query(query) | |
results = query_job.result() # Waits for job to complete. | |
records = [dict(row) for row in results] | |
return records | |
@timeit | |
def get_dm(id_value: str, username: str, previous_username: str, profile_image: str, previous_display_url: str) -> str: | |
""" | |
Generate a direct message based on username and profile image changes. | |
Args: | |
id_value (str): User ID. | |
username (str): Current username. | |
previous_username (str): Previous username. | |
profile_image (str): Current profile image URL. | |
previous_display_url (str): Previous profile image URL. | |
Returns: | |
str: Generated direct message. | |
""" | |
# Check if username changed | |
if username != previous_username: | |
message = f"{id_value.lower()} was {previous_username} and is now {username}. T" | |
else: | |
message = f"{username} hasn't changed name but t" | |
# Check if profile image changed | |
if profile_image != previous_display_url: | |
message += f"heir profile picture changed from {previous_display_url} to {profile_image}" | |
else: | |
message += "heir profile picture hasn't changed." | |
return message | |
@timeit | |
def get_tweet(id_value: str, username: str, previous_username: str, profile_image: str, previous_display_url: str, user_length: int) -> str: | |
""" | |
Generate a tweet message based on username and profile image changes and the number of users informed. | |
Args: | |
id_value (str): User ID. | |
username (str): Current username. | |
previous_username (str): Previous username. | |
profile_image (str): Current profile image URL. | |
previous_display_url (str): Previous profile image URL. | |
user_length (int): Number of users informed. | |
Returns: | |
str: Generated tweet message. | |
""" | |
# Determine if the message should use "user" or "users" | |
users_or_user = "users" if user_length > 1 else "user" | |
# Construct the message based on the change in username | |
if username != previous_username: | |
message = f"I just DMd {user_length} {users_or_user} about {id_value.lower()} changing username from {previous_username} to {username}." | |
else: | |
message = f"I just DMd {user_length} {users_or_user} about {id_value.lower()}." | |
# Append the change in profile picture, if any | |
if profile_image != previous_display_url: | |
message += f" Their profile picture changed from {previous_display_url} to {profile_image}." | |
return message | |
@timeit | |
def tweet_message(message: str, api: Any) -> None: | |
""" | |
Post a tweet using the provided API instance. | |
Args: | |
message (str): Message to be tweeted. | |
api (Any): Tweepy API instance. | |
Returns: | |
None | |
""" | |
try: | |
api.update_status(message) | |
except: | |
print("Error posting message") | |
@timeit | |
def message_user(user: str, message: str, api: Any = tweepy.API(auth)) -> None: | |
""" | |
Send a direct message to a user using the provided API instance. | |
Args: | |
user (str): ID of the recipient. | |
message (str): Message to be sent. | |
api (Any): Tweepy API instance. | |
Returns: | |
None | |
""" | |
try: | |
api.send_direct_message(int(user), text=message) | |
except Exception as e: | |
print(f"Failed to message user: {type(user)} -- {user}") | |
from typing import Dict, Tuple, Union, Optional | |
@timeit | |
def check_on_twitter(record_to_check_key: str, | |
record_to_check_values: Dict[str, Union[str, None]], | |
username_col: str, | |
display_img_col: str, | |
api: Optional[tweepy.API] = tweepy.API(auth)) -> Tuple[Dict[str, Union[str, None]], bool, bool]: | |
""" | |
Queries Twitter to get user details based on a record key and compares it against existing data. | |
Sends messages and tweets based on changes detected. | |
Args: | |
record_to_check_key: The key used to check the record. | |
record_to_check_values: The dictionary containing values associated with the record. | |
username_col: The column/key in the dictionary indicating the username. | |
display_img_col: The column/key in the dictionary indicating the display image URL. | |
api: The tweepy API instance to use for Twitter interactions. | |
Returns: | |
Tuple containing the updated record, a flag if the record has changed, and a flag if it's a new record. | |
""" | |
# Get the ID value from the record key | |
id_value = record_to_check_key | |
# Try to retrieve user information from Twitter | |
try: | |
new_record = api.get_user(id_value) | |
except: | |
print ("username {} retrieval failed".format(id_value)) | |
return None, None, None | |
# Extract user details from the response | |
user_info = new_record._json | |
username = user_info["name"] | |
profile_image = user_info["profile_image_url_https"] | |
# Get previous username and display URL; default to None if not found | |
try: | |
previous_username = record_to_check_values[username_col] | |
previous_display_url = record_to_check_values[display_img_col] | |
except: | |
previous_username = None | |
previous_display_url = None | |
# Initialize dictionary for values to upload (currently unused, may be useful later) | |
values_to_upload = {} | |
# Check if the current data matches the previous data | |
if username != previous_username or profile_image != previous_display_url: | |
changed = True | |
# If both previous data are present, send messages and tweets | |
if previous_username and previous_display_url: | |
dm = get_dm(id_value, username, previous_username, profile_image, previous_display_url) | |
interested_handles = record_to_check_values["interested_handles"] | |
print ("Interested handles: {}".format(interested_handles)) | |
users_to_message_list = interested_handles.split(",") | |
print ("users_to_message_list: {}".format(users_to_message_list)) | |
for user in list(set(users_to_message_list)): | |
if user != "": | |
print ("User:{}".format(user)) | |
message_user(user, dm, api) | |
users_to_message_number = str(len(users_to_message_list)) | |
tweet_to_send = get_tweet(id_value, username, previous_username, profile_image, previous_display_url, users_to_message_number) | |
tweet_message(tweet_to_send, api) | |
isnew = False | |
else: | |
isnew = True | |
# Update the record with the new values | |
record_to_check_values[username_col] = username | |
record_to_check_values[display_img_col] = profile_image | |
else: | |
changed = False | |
isnew = False | |
return record_to_check_values, changed, isnew | |
@timeit | |
def make_updates(client: Optional[bigquery.Client] = bigquery.Client(), | |
results_dict: Dict[str, Dict[str, Union[str, List[str]]]] = {}, | |
changing_usernames: List[str] = [], | |
table_name: Optional[str] = getenv("table_name")) -> None: | |
""" | |
Updates specified records in a BigQuery table based on provided changes. | |
Args: | |
client: The BigQuery client instance used for updating the records. | |
results_dict: Dictionary containing keys as handles and values as their attributes. | |
changing_usernames: List of usernames/handles that have changes. | |
table_name: Name of the BigQuery table to update. | |
Returns: | |
None | |
""" | |
# Iterate through results dictionary | |
for key, value in results_dict.items(): | |
# Only process keys/handles that have changes | |
if key in changing_usernames: | |
# Extract relevant information from the dictionary | |
handle = key | |
interested_handles = value["interested_handles"] | |
# Ensure uniqueness and proper format for interested handles | |
if isinstance(interested_handles, list) and len(interested_handles) > 1: | |
interested_handles = list(set(interested_handles)) | |
display_name = value["display_name"] | |
display_image = value["display_image"] | |
# Construct the DML statement to update the records in BigQuery | |
dml_statement = ("""UPDATE {table_name} | |
SET interested_handles = '{interested_handles}', | |
display_name = '{display_name}', | |
display_image = '{display_image}' | |
WHERE handle = '{handle}'""").format( | |
table_name=table_name, | |
interested_handles=interested_handles, | |
display_name=display_name, | |
display_image=display_image, | |
handle=handle) | |
# Execute the DML statement and wait for it to finish | |
query_job = client.query(dml_statement) # API request | |
query_job.result() # Waits for statement to finish | |
@timeit | |
def add_names(client: Optional[bigquery.Client] = bigquery.Client(), | |
results_dict: Dict[str, Dict[str, str]] = {}, | |
new_usernames: List[str] = [], | |
table_name: Optional[str] = getenv("table_name")) -> None: | |
""" | |
Adds new records to a BigQuery table based on the provided data. | |
Args: | |
client: The BigQuery client instance used for adding the records. | |
results_dict: Dictionary containing keys as handles and values as their attributes. | |
new_usernames: List of new usernames/handles to be added. | |
table_name: Name of the BigQuery table to add the records to. | |
Returns: | |
None | |
""" | |
# Iterate through results dictionary | |
for key, value in results_dict.items(): | |
# Only process new usernames/handles | |
if key in new_usernames: | |
# Extract relevant information from the dictionary | |
handle = key | |
interested_handles = value["interested_handles"] | |
display_name = value["display_name"] | |
display_image = value["display_image"] | |
# Construct columns string for the INSERT statement | |
columns_string = ",".join(tuple(["handle"] + [col for col in value.keys()])) | |
# Construct the DML statement to add the records to BigQuery | |
dml_statement = """INSERT {table_name} ({columns}) | |
VALUES('{handle}', '{interested_handles}', '{display_name}', '{display_image}')""".format( | |
table_name=table_name, | |
columns=columns_string, | |
handle=handle, | |
interested_handles=interested_handles, | |
display_name=display_name, | |
display_image=display_image) | |
# Print the DML statement for debugging or logging | |
print(dml_statement) | |
# Execute the DML statement and wait for it to finish | |
query_job = client.query(dml_statement) # API request | |
query_job.result() # Waits for statement to finish | |
@timeit | |
def remove_from_bgq(table_name: str, | |
column_to_change: str, | |
criteria_list: List[str], | |
client: Optional[bigquery.Client] = bigquery.Client()) -> None: | |
""" | |
Removes records from a BigQuery table based on the provided criteria. | |
Args: | |
table_name: Name of the BigQuery table to remove records from. | |
column_to_change: The column name which is used to determine which records to delete. | |
criteria_list: List of criteria values. Records matching these values in the specified column will be removed. | |
client: The BigQuery client instance used for removing the records. | |
Returns: | |
None | |
""" | |
# Construct the DELETE query statement based on the provided parameters | |
query = """DELETE FROM | |
{table_name} | |
WHERE {column_to_change} in unnest({criteria_list}) | |
""".format(table_name=table_name, column_to_change=column_to_change, criteria_list=criteria_list) | |
# Print the DELETE query statement for debugging or logging | |
print(query) | |
# Execute the DELETE statement and remove records from the BigQuery table | |
query_job = client.query(query) # API request | |
query_job.result() # Waits for statement to finish | |
@timeit | |
def make_changes(update_dict: dict, | |
usernames_tuple: tuple, | |
results_dict: dict, | |
names_returned: list, | |
id_column: str = "display_name", | |
column_to_change: str = "interested_handles") -> (dict, bool, list): | |
""" | |
Processes changes based on the provided update dictionary and updates results_dict accordingly. | |
Args: | |
update_dict: Dictionary containing keys of user IDs and values with lists of names to add and remove. | |
usernames_tuple: Tuple of usernames. | |
results_dict: Dictionary with existing information. | |
names_returned: List of names returned from some external source. | |
id_column: Name of the column which represents display names. Defaults to 'display_name'. | |
column_to_change: Name of the column that will be changed. Defaults to 'interested_handles'. | |
Returns: | |
results_dict: Updated dictionary after processing the changes. | |
removing_any: Boolean indicating if any names were removed. | |
to_remove: List of keys that need to be removed. | |
""" | |
print("Update dict: {}".format(update_dict)) | |
to_remove = [] | |
removing_any = False | |
# Iterate over the update dictionary to process additions | |
for key, value in update_dict.items(): | |
names_to_add = value["add"] | |
for name in names_to_add: | |
name = str(name) | |
existing_requests = 0 | |
# Check if name is already in the results dictionary and count occurrences | |
for res_key, res_val in results_dict.items(): | |
interested_handles_list = str(res_val["interested_handles"]).split(",") | |
if name in interested_handles_list: | |
existing_requests += 1 | |
# Skip if there are too many existing requests for a name | |
if existing_requests > 19: | |
continue | |
else: | |
# Add the name to the results dictionary | |
if key in results_dict.keys(): | |
accessed_key = results_dict[key] | |
if "interested_handles" in accessed_key: | |
accessed_key["interested_handles"] = str(accessed_key["interested_handles"]) + "," + name | |
else: | |
accessed_key["interested_handles"] = [name] | |
else: | |
results_dict[key] = {"interested_handles": name} | |
example_item = next(iter(results_dict.values())) | |
# Initialize other attributes in the dictionary to None | |
for exa_key in example_item.keys(): | |
if not exa_key == "interested_handles": | |
results_dict[key][exa_key] = None | |
# Process removals | |
names_to_remove = str(value["remove"]) | |
if key in results_dict: | |
existing_interest = str(results_dict[key]["interested_handles"]) | |
new_interested_list = [x for x in existing_interest.split(",") if not x in names_to_remove] | |
# Check if we need to remove any names | |
if len(new_interested_list) < 1: | |
removing_any = True | |
to_remove = to_remove + [key] | |
if key in results_dict.keys(): | |
results_dict[key]["interested_handles"] = ",".join(new_interested_list) | |
print("Removing: {}".format(to_remove)) | |
return results_dict, removing_any, to_remove | |
@timeit | |
def query_bq(request: dict, | |
id_column: str = getenv("id_col"), | |
column_to_change: str = getenv("column_to_change")) -> None: | |
""" | |
Function to query BigQuery based on given request, and update records based on | |
Twitter information and recent DMs. | |
Args: | |
request: Dictionary containing information regarding the query. | |
id_column: Column name for the ID, fetched from environment variables by default. | |
column_to_change: Column name to be changed, fetched from environment variables by default. | |
Returns: | |
None | |
""" | |
# Initializing Twitter and BigQuery client variables from environment variables | |
consumer_key = getenv("consumer_key") | |
consumer_secret = getenv("consumer_secret") | |
access_token = getenv("access_token") | |
access_secret = getenv("access_secret") | |
# Recording when the process starts | |
start_time = datetime.datetime.now() | |
# Setting the threshold for how far back to check DMs for new requests | |
threshold = int(getenv("threshold")) | |
threshold_time = start_time - datetime.timedelta(minutes=threshold) | |
# Initializing BigQuery client and setting table name | |
client = bigquery.Client() | |
table_name = getenv("table_name") | |
# Setting up Twitter authorization | |
auth = tweepy.OAuthHandler(consumer_key, consumer_secret) | |
auth.set_access_token(access_token, access_secret) | |
api = tweepy.API(auth) | |
# Checking for recent DMs to determine changes to make | |
to_update, changing_usernames = get_recent_updates(api, threshold_time) | |
# Process updates if any are found | |
if len(to_update) > 0: | |
results_dict, names_list = get_records(client, table_name, id_column) | |
updated_dict, removing_any, to_remove = make_changes( | |
update_dict=to_update, | |
usernames_tuple=changing_usernames, | |
results_dict=results_dict, | |
names_returned=names_list, | |
id_column=id_column, | |
column_to_change=column_to_change) | |
# If any records need to be removed, remove them from BigQuery | |
if removing_any: | |
remove_from_bgq(table_name, column_to_change="handle", criteria_list=to_remove, client=client) | |
else: | |
updated_dict, names_list = get_records(client, table_name, id_column) | |
# Dictionaries to hold changing and new usernames | |
changing_usernames = {} | |
new_usernames = {} | |
# Check each record on Twitter and classify if it's new or changing | |
for key, value in updated_dict.items(): | |
checked_value, changed, isnew = check_on_twitter( | |
record_to_check_key=key, | |
record_to_check_values=value, | |
username_col="display_name", | |
display_img_col="display_image", | |
api=api) | |
if checked_value is None: | |
continue | |
# If record has changed, categorize as new or just changing | |
if changed: | |
if isnew: | |
new_usernames[key] = checked_value | |
else: | |
changing_usernames[key] = checked_value | |
# Update and add records to BigQuery as required | |
make_updates(client, results_dict=updated_dict, | |
changing_usernames=changing_usernames, table_name=table_name) | |
add_names(client, results_dict=updated_dict, | |
new_usernames=new_usernames, table_name=table_name) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment