Last active
July 24, 2024 17:11
-
-
Save essoen/5fddb415b531ad91759fbb2590b8927b to your computer and use it in GitHub Desktop.
Script to move Readwise later to Pocket. Used with Zapier
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import requests | |
output = { | |
"Results": [] | |
} | |
def fetch_reader_document_list_api(token = None, updated_after=None, location=None, category=None): | |
full_data = [] | |
next_page_cursor = None | |
while True: | |
params = {} | |
if next_page_cursor: | |
params['pageCursor'] = next_page_cursor | |
if updated_after: | |
params['updatedAfter'] = updated_after | |
if location: | |
params['location'] = location | |
if category: | |
params["category"] = category | |
print("Making export API request with params " + str(params) + "...") | |
try: | |
response = requests.get( | |
url="https://readwise.io/api/v3/list/", | |
params=params, | |
headers={"Authorization": f"Token {token}"}, verify=False | |
) | |
response.raise_for_status() # Raises an HTTPError if the HTTP request returned an unsuccessful status code | |
try: | |
data = response.json() | |
except ValueError: | |
print("Error: Unable to parse JSON response") | |
return full_data | |
results = data.get('results') | |
if results is None: | |
print("Error: 'results' key not found in the response") | |
return full_data | |
full_data.extend(results) | |
next_page_cursor = data.get('nextPageCursor') | |
if not next_page_cursor: | |
break | |
except requests.RequestException as e: | |
print(f"Error: {e}") | |
return full_data | |
return full_data | |
def get_readwise_data(): | |
readwise_results = [] | |
try: | |
docs_after_date = datetime.datetime.now() - datetime.timedelta(hours=1) | |
readwise_results = fetch_reader_document_list_api(token=input_data["readwise_token"], updated_after=docs_after_date.isoformat(), location="later", category="article") | |
print(f"Archived Data from Readwise: {readwise_results}") | |
except Exception as e: | |
print(f"Unexpected error: {e}") | |
return readwise_results | |
def clean_list_for_value(obj_list, key, value): | |
""" | |
Filter a list of dictionaries by a specific key-value pair. | |
:param obj_list: List of dictionaries to filter | |
:param key: The key to check in each dictionary | |
:param value: The value to filter by | |
:return: List of dictionaries where the specified key has the specified value | |
""" | |
return [obj for obj in obj_list if obj.get(key) != value] | |
def main(): | |
data = get_readwise_data() #get data in "later" | |
# remove those originating from pocket | |
output["Results"] = clean_list_for_value(data, "source", "pocket") if data else [] | |
main() # add to pocket is done in zap |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import requests | |
def fetch_reader_document_list_api(token = None, updated_after=None, location=None, category=None, source=None): | |
full_data = [] | |
next_page_cursor = None | |
while True: | |
params = {} | |
if next_page_cursor: | |
params['pageCursor'] = next_page_cursor | |
if updated_after: | |
params['updatedAfter'] = updated_after | |
if location: | |
params['location'] = location | |
if category: | |
params["category"] = category | |
if source: | |
params["source"] = source | |
print("Making export API request with params " + str(params) + "...") | |
try: | |
response = requests.get( | |
url="https://readwise.io/api/v3/list/", | |
params=params, | |
headers={"Authorization": f"Token {token}"}, verify=False | |
) | |
response.raise_for_status() # Raises an HTTPError if the HTTP request returned an unsuccessful status code | |
try: | |
data = response.json() | |
except ValueError: | |
print("Error: Unable to parse JSON response") | |
return full_data | |
results = data.get('results') | |
if results is None: | |
print("Error: 'results' key not found in the response") | |
return full_data | |
full_data.extend(results) | |
next_page_cursor = data.get('nextPageCursor') | |
if not next_page_cursor: | |
break | |
except requests.RequestException as e: | |
print(f"Error: {e}") | |
return full_data | |
return full_data | |
def fetch_pocket_data(consumer_key, access_token, state=None, favorite=None, tag=None, contentType=None, sort=None, detailType="complete", search=None, domain=None, since=None, count=None, offset=None): | |
# Define the URL for the Pocket API | |
url = "https://getpocket.com/v3/get" | |
# Define the parameters for the request | |
params = { | |
"consumer_key": consumer_key, | |
"access_token": access_token, | |
"detailType": detailType | |
} | |
# Add optional parameters if provided | |
if state is not None: | |
params["state"] = state | |
if favorite is not None: | |
params["favorite"] = favorite | |
if tag is not None: | |
params["tag"] = tag | |
if contentType is not None: | |
params["contentType"] = contentType | |
if sort is not None: | |
params["sort"] = sort | |
if search is not None: | |
params["search"] = search | |
if domain is not None: | |
params["domain"] = domain | |
if since is not None: | |
params["since"] = since | |
if count is not None: | |
params["count"] = count | |
if offset is not None: | |
params["offset"] = offset | |
try: | |
response = requests.post(url, json=params, headers={"Content-Type": "application/json", "X-Accept": "application/json"}) | |
# Check if the request was successful | |
if response.status_code == 200: | |
data = response.json() | |
return data['list'] | |
except Exception as e: | |
print(f"Unexpected error: {e}") | |
# Send the request to the Pocket API | |
pocket_actions = [] | |
def add_to_pocket_actions(item_id): | |
pocket_actions.append({ | |
"action": "archive", | |
"item_id": item_id | |
}) | |
def archive_pocket_articles(consumer_key, access_token, actions): | |
# Define the URL for the Pocket API | |
if actions.length == 0: | |
print("No actions for pocket") | |
return | |
url = "https://getpocket.com/v3/send" | |
# Define the parameters for the request | |
params = { | |
"consumer_key": consumer_key, | |
"access_token": access_token, | |
"actions": actions | |
} | |
# Send the request to the Pocket API | |
response = requests.post(url, json=params, headers={"Content-Type": "application/json", "X-Accept": "application/json"}) | |
# Check if the request was successful | |
if response.status_code == 200: | |
return response.json() | |
else: | |
print(f"Error: {response.status_code} - {response.text}") | |
return None | |
def get_readwise_data(): | |
readwise_results = [] | |
try: | |
docs_after_date = datetime.datetime.now() - datetime.timedelta(hours=1) | |
readwise_results = fetch_reader_document_list_api(token=input_data["readwise_token"], updated_after=docs_after_date.isoformat(), location='archive') | |
print(f"Successfuly got readwise data.") | |
return readwise_results | |
except Exception as e: | |
print(f"Unexpected error: {e}") | |
return readwise_results | |
def string_has_substring(string, substring, length): | |
""" | |
Check if any substring of 'substring' longer than the specified 'length' is in 'string'. | |
:param string: The string to search within | |
:param substring: The string from which to generate substrings | |
:param length: Minimum length of the substrings to check | |
:return: True if any substring longer than the specified length is found within 'string', False otherwise | |
""" | |
if length >= len(substring): | |
return False | |
for sub_length in range(length + 1, len(substring) + 1): | |
for i in range(len(substring) - sub_length + 1): | |
if substring[i:i + sub_length] in string: | |
return True | |
return False | |
def convert_dict_to_list(dictionary): | |
""" | |
Convert a dictionary to a list of its values, ensuring the input is valid. | |
:param dictionary: Dictionary to convert | |
:return: List of values from the dictionary | |
""" | |
if dictionary is None: | |
raise ValueError("The provided dictionary is None") | |
if not isinstance(dictionary, dict): | |
raise TypeError("The provided input is not a dictionary") | |
# Convert dictionary values to a list | |
return list(dictionary.values()) | |
def needs_archive_sync(readwise_article, pocket_article): | |
return readwise_article["source_url"] == pocket_article["given_url"] and pocket_article["status"] == "0" # R is archived, not in Pocket | |
def main(): | |
consumer_key = input_data['pocket_consumer_key'] # Replace with your Pocket consumer key | |
access_token = input_data['pocket_access_token'] # Replace with your Pocket access token | |
# Fetch data with optional parameters (customize as needed) | |
pocket_data = fetch_pocket_data( | |
consumer_key, | |
access_token, | |
state="unread" | |
) | |
if pocket_data: | |
print("Pocket Data Retrieved Successfully") | |
try: | |
pocket_data = convert_dict_to_list(pocket_data) | |
except (ValueError, TypeError) as e: | |
print(f"Error: {e}") | |
else: | |
print("Failed to retrieve Pocket data.") | |
return | |
readwise_data = get_readwise_data() | |
for readwise_article in readwise_data: | |
for pocket_article in pocket_data: | |
if needs_archive_sync(readwise_article, pocket_article): | |
print(pocket_article["given_url"], readwise_article[""]) | |
add_to_pocket_actions(pocket_article["item_id"]) | |
archive_pocket_articles(input_data["pocket_consumer_key"], input_data["pocket_access_token"], pocket_actions) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment