Last active
August 10, 2024 18:04
-
-
Save jikamens/bad36fadfa73ee4f0ac1269ab3025f67 to your computer and use it in GitHub Desktop.
toodledo-agenda.py - Prepare your daily agenda in Toodledo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
'''toodledo-agenda.py - Prepare your daily agenda in Toodledo | |
Toodldeo doesn't _quite_ do the right thing, at least for me, whether I sort by | |
date and then priority, or by priority and then date. | |
* If I sort by date first, then top-priority items that aren't due by a | |
specific date get pushed all the way to the bottom of my to-do list and I | |
never see them. | |
* If I sort by priority first, then anything that isn't top priority gets | |
pushed to the bottom even if it's due soon. | |
In addition, Toodledo's prioritization isn't granular enough. At least for the | |
things I hope to work on _today_, I need to be able to prioritize within the | |
high-level priority: Top, High, etc. just isn't specific enough. | |
Here's how I've addressed these issues, with the assistance of this script. | |
This takes a lot of words to describe but it's actually quite straightforward | |
once you get into the habit of using the script every day to create your daily | |
agenda. | |
Display tasks in the app in priority and then date order. | |
By definition anything with a due date is Top priority, and everything that's | |
Top priority must have a due date, so when you run the script it changes the | |
priority of any non-Top tasks with dates to Top, and changes any Top-priority | |
tasks without due dates to High. | |
When run, the script assembles a list of tasks in a text file for you to edit, | |
as follows (N.B. all of this works within a single folder which you specify | |
when configuring the script, and subtasks are ignored): | |
* Tasks due today or earlier with due times, sorted by due date and time, with | |
any due times after 11:00pm erased (see below). | |
* Tasks due today or earlier without due times, sorted by due date and then the | |
user's previously indicated granular priority (more on that below). | |
* Tasks due BY (not ON) any date in the next week, sorted by due date, due | |
time, and granular order. | |
* Any tasks pulled in with --include-regexp not included in the above. | |
* All undated tasks not pulled in via --include-regexp, sorted by priority and | |
then granular order. | |
If you have calendars configured as described below, today's events from those | |
calendars are pulled into the file, commented out, for you to review and add to | |
today's agenda as you deem appropriate. | |
Then the file is loaded into your $EDITOR or $VISUAL for you to edit. You can | |
do the following: | |
* Put "=" in front of a task to tell the script not to make any changes to it | |
whatsoever, i.e., it is excluded from all of the following logic. | |
* Put "-" in front of a task to mark it complete. If it's a repeating task, you | |
can also move it around within the text file to reprioritize the _next_ | |
instance of the task, i.e., the one after the one you're marking complete. | |
Note that when you edit the date on a task and mark it complete, the date you | |
specify is used as the completion date of the task, rather than changing its | |
due date. | |
* Put "*" in front of a repeating task to explode it, i.e., separate the next | |
upcoming instance of the task so you can edit it independently. For example, | |
if you explode a task due today that repeats weekly from its completion date, | |
you'll end up with two tasks, one non-repeating task due today and one | |
repeating task due in a week. Any changes you make to an exploded task apply | |
to the non-repeating task, not the subsequent repeating task. | |
* Add or edit the due date and/or due time by putting them inside the square | |
brackets at the start of the task. Dates are specified in YYYY-MM-DD format. | |
You can use "Yesterday", "Today", and "Tomorrow" as shorthand. | |
* Edit the priority of the task. | |
* Edit the "repeat" and "meta" metadata of the task, if you specified | |
"--edit-metadata" when invoking the script. | |
* Add new tasks. | |
* Change the order of tasks to indicate their granular priorities. | |
* Put "!calendar" on a line by itself to skip calendar pulls (see below) | |
until tomorrow. | |
* Put "!reedit" (or "!edit") on a line by itself to edit the (updated) agenda | |
again after your changes in this round of editing are processed. This may be | |
useful, e.g., if you mark complete a repeating task and you want to edit or | |
reprioritize the next instance of it. | |
* Put "!abort" on a line by itself to abort the agenda edit without changing | |
anything in Toodledo. | |
Note that you can't change the title of a task; that'll just delete the old | |
task and create a new one with the new title. | |
Note also that the script can't handle multiple tasks with the same title, so | |
if it encounters more than one task with the same title it'll warn you and | |
exclude all but the first. | |
Once you finish editing the agenda the script processes the updated tasks as | |
follows: | |
* Tasks' metadata is updated to reflect the granular sort order you indicated | |
when you edited the file. | |
* Change the priorities of all tasks with due dates to TOP as needed. | |
* The due date of any task due before today is changed to today. | |
* Any repeat-from-due-date task whose due date has been changed is | |
automatically exploded before further changes are made to the current | |
instance of it. | |
* Tasks you marked for explosion are also exploded. | |
* The priorities of any tasks that need to be changed to preserve the granular | |
sort order are changed. | |
* Tasks without priorities are defaulted to Top. | |
* For tasks due today only that do not have due times, the granular sort order | |
is replicated into the tasks by assigning them due times between 11:00pm and | |
midnight in the order you specified. This way you will be able to see the | |
granular sort order in the app so you can see the order in which you want to | |
do things. | |
* These changes are all propagated to the server. | |
* Go around again if you specified "!reedit". | |
INSTALLING AND CONFIGURING THE SCRIPT | |
Save this script somewhere local and make the file executable. | |
This script uses the Python Toodledo API library. How to install Python | |
libraries is too much for me to document here, but there's a lot of info about | |
it online, e.g., https://docs.python.org/3/installing/index.html . In a | |
nutshell, `pip install toodledo` is likely to do the right thing. | |
Once you've got the toodledo-python library installed, you need to register an | |
"app" in your Toodledo account for the script to talk to. Log into Toodledo, go | |
to https://api.toodledo.com/3/account/doc_register.php , and fill in the form | |
there. It doesn't really matter what you put into any of the fields as long as | |
it accepts your values. You can use fake URLs like https://localhost/ in the | |
Website and Redirect URI fields. | |
Once you've registered your app, run the script and it will prompt you for the | |
client ID and client secret of your app from the registration page, as well as | |
for the Toodledo folder you want the script to work in, and save the values you | |
specify into a config file so you don't need to specify them every time. | |
The folder whose name you specify is assumed to be the folder you use for you | |
main to-do list in general. If you have Top-priority tasks in that folder with | |
dates on them, then they'll get pulled in by the agenda script automatically | |
every day as appropriate. If not, you'll be creating your agenda pretty much | |
from scratch every day. :shrug: | |
Once you've done all of the above, you should be able to run the script and it | |
should just work. Let me know if you run into any trouble! Heck, let me know if | |
you're using it successfully so I don't feel like I'm just screaming into the | |
void here. | |
The first time you run the script it'll display a URL for you to paste into | |
your browser to authenticate the script. Do that and then click the sign-in | |
button, which will redirect you to the bogus link you entered when registering | |
the app, with a bunch of query parameters. It doesn't matter that the link is | |
bogus; just copy the URL and feed it as input back into the script, and it'll | |
finish logging you in and saving your authentication information so you don't | |
have to keep doing that. | |
PULLING EVENTS FROM ICAL CALENDARS | |
The script supports pulling today's events in from one or multiple iCal URLs. | |
These events will appear as comments in the agenda edit file for you to | |
uncomment and add to your agenda as you see fit. | |
For this to work, the icalendar and recurring_ical_events modules need to be | |
installed. As above, you can install them with pip. | |
To add a calendar, run the script with the --add-calendar argument followed by | |
the URL of the calendar. To edit your calendars, load ~/.agenda-config.json | |
into a text editor and edit as appropriate. | |
COPYRIGHT | |
Copyright 2023 Jonathan Kamens <[email protected]> | |
This program is free software: you can redistribute it and/or modify it under | |
the terms of the GNU General Public License as published by the Free Software | |
Foundation, either version 3 of the License, or (at your option) any later | |
version. | |
This program is distributed in the hope that it will be useful, but WITHOUT ANY | |
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A | |
PARTICULAR PURPOSE. See the GNU General Public License at | |
<https://www.gnu.org/licenses/> for more details. | |
''' | |
# pylint: disable=invalid-name,missing-function-docstring,global-statement | |
# pylint: disable=too-many-arguments,too-many-locals,too-many-branches | |
# pylint: disable=too-many-statements,too-many-lines | |
import argparse | |
from collections import defaultdict, OrderedDict | |
import datetime | |
from itertools import chain | |
import json | |
import os | |
import re | |
import shutil | |
import subprocess | |
import sys | |
import tempfile | |
import threading | |
from toodledo import ( | |
CommandLineAuthorization, | |
DueDateModifier, | |
Priority, | |
Task, | |
TaskCache, | |
TokenStorageFile, | |
Toodledo, | |
) | |
try: | |
from icalendar import Calendar | |
import recurring_ical_events | |
import requests | |
calendars_active = True | |
except ModuleNotFoundError: | |
print('Calendar module import failed, calendars disabled') | |
calendars_active = False | |
config = None | |
tokenFile = os.path.expanduser('~/.agenda-token.json') | |
configFile = os.path.expanduser('~/.agenda-config.json') | |
cacheFile = os.path.expanduser('~/.agenda-cache.pickle') | |
scope = 'basic tasks notes folders write' | |
tokenStorage = TokenStorageFile(tokenFile) | |
beginningOfTime = datetime.date(1970, 1, 1) | |
today = datetime.date.today() | |
beginningOfDay = datetime.datetime.today().replace(hour=0, minute=0, second=0) | |
endOfDay = datetime.datetime.today().replace(hour=23, minute=59, second=59) | |
yesterday = today - datetime.timedelta(days=1) | |
tomorrow = today + datetime.timedelta(days=1) | |
one_week_away = today + datetime.timedelta(days=7) | |
taskFieldsNeeded = \ | |
'folder,priority,duedate,duedatemod,duetime,repeat,parent,meta' | |
customProperties = defaultdict(dict) | |
DUE_BY = DueDateModifier.DUE_BY | |
meta_types = { | |
'jkaorder': float, | |
} | |
def read_config(args): | |
global config | |
if not os.path.exists(args.config_file): | |
config = {} | |
else: | |
with open(args.config_file, encoding='ascii') as f: | |
config = json.load(f) | |
if 'calendars' not in config: | |
config['calendars'] = [] | |
def write_config(args): | |
config_file = os.path.realpath(args.config_file) | |
if os.path.exists(config_file): | |
shutil.copyfile(config_file, f'{config_file}.bak') | |
try: | |
with open(config_file, 'w', encoding='ascii') as f: | |
json.dump(config, f) | |
except Exception: | |
print(f'Error saving {config_file}! Backup preserved in ' | |
f'{config_file}.bak', file=sys.stderr) | |
raise | |
def parse_args(): | |
parser = argparse.ArgumentParser( | |
description='Set my daily agenda in Toodledo') | |
parser.add_argument('--dryrun', action='store_true', default=False) | |
parser.add_argument('--confirm', action=argparse.BooleanOptionalAction, | |
default=False, | |
help='Confirm changes before making them') | |
parser.add_argument('--configure', action='store_true', default=False, | |
help='Configure or reconfigure the script') | |
parser.add_argument('--config-file', action='store', default=configFile, | |
help=f'Configuration file path (default {configFile})') | |
group = parser.add_mutually_exclusive_group() | |
group.add_argument('--no-cache', dest='cache', action='store_false', | |
default=True, help='Disable task cache') | |
group.add_argument('--clear-cache', action='store_true', default=False, | |
help='Clear task cache and reload from server') | |
parser.add_argument('--folder', action='store', help='Override the ' | |
'default configured Toodledo folder') | |
parser.add_argument('--add-calendar', metavar='URL', action='append', | |
default=[], help="URL(s) of iCal calendar(s) to scan " | |
"for today's events") | |
parser.add_argument('--skip-calendars', default=None, | |
action=argparse.BooleanOptionalAction, | |
help="Don't read events from configured calendars") | |
parser.add_argument('--unprioritize', action='store_true', default=False, | |
help='Remove due times after 11pm') | |
parser.add_argument('--edit-metadata', action='store_true', default=False, | |
help='Include metadata in tasks so it can be edited') | |
parser.add_argument('--include-regexp', action='append', default=[], | |
type=re.compile, help="Include matching tasks in " | |
"agenda even if they're undated or in the future") | |
args = parser.parse_args() | |
return args | |
def get_yorn(prompt): | |
while True: | |
response = input(prompt) | |
if re.match(r'[Yy]', response): | |
return True | |
if re.match(r'[Nn]', response): | |
return False | |
print('Unrecognized response.', file=sys.stderr) | |
def configure(args): | |
config['clientId'] = prompt_with_default('Enter Toodledo client ID', | |
config.get('clientId', None)) | |
config['clientSecret'] = prompt_with_default( | |
'Enter Toodledo client secret', config.get('clientSecret', None)) | |
config['folderName'] = prompt_with_default('Enter Toodledo folder name', | |
config.get('folderName', None)) | |
write_config(args) | |
def prompt_with_default(prompt, value): | |
if value: | |
prompt += f' (Enter for {value})' | |
prompt += ': ' | |
return input(prompt) | |
def task_key(t): | |
return (t.dueDate or beginningOfTime, | |
t.dueTime if t.dueTime and t.dueTime.hour < 23 else endOfDay, | |
get_meta(t, 'jkaorder') or 0, | |
t.title) | |
def undated_task_key(t): | |
return (-t.priority.value, get_meta(t, 'jkaorder') or 0) | |
def get_meta(t, tag): | |
if cp(t, tag): | |
return cp(t, tag) | |
if t.meta: | |
match = re.match(r';' + tag + r'\s*=\s*([^;]*)', t.meta) | |
if match: | |
value = meta_types.get(tag, str)(match[1]) | |
cp(t, tag, value) | |
return value | |
return None | |
def set_meta(t, tag, value): | |
# An earlier bug in the script caused the metadata value to be inserted | |
# into the task field multiple times. We need to clean that up. | |
old_values = list(re.finditer(r';' + tag + r'\s*=\s*[^;]*', t.meta or '')) | |
if len(old_values) > 1: | |
old_values = reversed(old_values[1:]) | |
for old_value in old_values: | |
t.meta = t.meta[0:old_value.start()] + t.meta[old_value.end():] | |
if get_meta(t, tag) == value: | |
return | |
cp(t, tag, value) | |
fragment = f';{tag}={value}' | |
if not t.meta: | |
t.meta = fragment | |
return | |
subbed = re.sub(r';' + tag + r'\s*=\s*[^;]*', fragment, t.meta) | |
if subbed == t.meta: | |
t.meta += fragment | |
else: | |
t.meta = subbed | |
def get_toodledo_tasks(args, threads, threadNum, toodledo, folderId, | |
tasks_return): | |
tasks = [add_missing_fields(t) for t in | |
toodledo.GetTasks(comp=0, fields=taskFieldsNeeded) | |
if t.folderId == folderId and t.parent is None] | |
tasks_return.extend(sorted( | |
(t for t in tasks if t.dueDate and t.dueDate <= today), | |
key=task_key)) | |
tasks_return.extend(sorted( | |
(t for t in tasks if t not in tasks_return and | |
t.dueDate and t.dueDate <= today), | |
key=task_key)) | |
tasks_return.extend(sorted( | |
(t for t in tasks if t not in tasks_return and | |
not any(prior.title == t.title for prior in tasks_return) and | |
t.dueDate and t.dueDate < one_week_away and | |
t.dueDateModifier == DUE_BY), | |
key=task_key)) | |
tasks_return.extend(sorted( | |
(t for t in tasks if t not in tasks_return and | |
not any(prior.title == t.title for prior in tasks_return) and | |
any(reg.search(t.title) for reg in args.include_regexp)), | |
key=task_key)) | |
tasks_return.extend(sorted( | |
(t for t in tasks if t not in tasks_return and not t.dueDate and | |
not any(prior.title == t.title for prior in tasks_return)), | |
key=undated_task_key)) | |
threads[threadNum][2] = True | |
def get_calendar_tasks(threads, threadNum, calendar, tasks): | |
response = requests.get(calendar, timeout=30) | |
response.raise_for_status() | |
cal = Calendar.from_ical(response.text) | |
for event in recurring_ical_events.of(cal).at(datetime.date.today()): | |
dt = event.decoded('DTSTART') | |
if isinstance(dt, datetime.datetime): | |
dueDate = dt.date() | |
dueTime = dt.astimezone().replace(tzinfo=None) | |
else: | |
dueDate = dt | |
dueTime = None | |
title = event.decoded('SUMMARY').decode() | |
tasks.append(Task(dueDate=dueDate, dueTime=dueTime, title=title, | |
dueDateModifier=DUE_BY)) | |
threads[threadNum][2] = True | |
def launch_calendars(args, threads, calendarTasks): | |
if not calendars_active or args.skip_calendars is True or \ | |
((args.skip_calendars is None and | |
config.get('calendar_skip') == str(today))): | |
return | |
for calendar in config['calendars']: | |
threadNum = len(threads) | |
calendarTasks.append([]) | |
threads.append([ | |
threading.Thread( | |
target=get_calendar_tasks, | |
args=(threads, threadNum, calendar, calendarTasks[threadNum])), | |
f'Calendar {calendar}', | |
None]) | |
threads[threadNum][0].daemon = True | |
threads[threadNum][0].start() | |
def main(): | |
# Token and config files should be protected by default. | |
os.umask(0o077) | |
args = parse_args() | |
read_config(args) | |
if args.configure or not config.get('clientId', None) or \ | |
not config.get('clientSecret', None) or \ | |
not config.get('folderName', None): | |
configure(args) | |
for calendar in args.add_calendar: | |
if calendar in config['calendars']: | |
sys.exit(f'Calendar {calendar} is already configured') | |
config['calendars'].append(calendar) | |
if args.add_calendar: | |
write_config(args) | |
clientId = config['clientId'] | |
clientSecret = config['clientSecret'] | |
folderName = args.folder or config['folderName'] | |
if not os.path.exists(tokenFile): | |
CommandLineAuthorization(clientId, clientSecret, scope, tokenStorage) | |
toodledo = Toodledo( | |
clientId=clientId, | |
clientSecret=clientSecret, | |
tokenStorage=tokenStorage, | |
scope=scope) | |
if args.cache: | |
toodledo = TaskCache(toodledo, cacheFile, comp=0, | |
fields=taskFieldsNeeded, clear=args.clear_cache) | |
folders = toodledo.GetFolders() | |
folderId = next(f for f in folders | |
if not f.archived and f.name == folderName).id_ | |
while format_and_edit(args, toodledo, folderId): | |
pass | |
def format_and_edit(args, toodledo, folderId): | |
customProperties.clear() | |
threads = [] | |
threadTasks = [] | |
threadNum = len(threads) | |
threadTasks.append([]) | |
threads.append( | |
[threading.Thread( | |
target=get_toodledo_tasks, | |
args=(args, threads, threadNum, toodledo, folderId, | |
threadTasks[threadNum])), | |
'Toodledo', None]) | |
threads[threadNum][0].daemon = True | |
threads[threadNum][0].start() | |
launch_calendars(args, threads, threadTasks) | |
for thread in threads: | |
thread[0].join() | |
if not thread[2]: | |
print(f'{thread[1]} task fetch failed', file=sys.stderr) | |
if any(True for t in threads if not t[2]): | |
sys.exit(1) | |
if duplicates := check_duplicates(threadTasks[0]): | |
sys.exit(f"Can't edit agenda with duplicate task titles: {duplicates}") | |
tasks = threadTasks[0] | |
last_priority = None | |
last_due = None | |
last_time = None | |
with tempfile.NamedTemporaryFile( | |
mode='w+', encoding='UTF-8', delete=False) as f: | |
for t in tasks: | |
formatted = unparse_task(args, t) | |
spaced = False | |
if not spaced and last_priority is not None and \ | |
t.priority != last_priority: | |
print('', file=f) | |
spaced = True | |
last_priority = t.priority | |
if not spaced and last_due and t.dueDate != last_due: | |
print('', file=f) | |
spaced = True | |
last_due = t.dueDate | |
if not spaced and last_time and not due_time(t): | |
print('', file=f) | |
spaced = True | |
last_time = due_time(t) | |
print(formatted, file=f) | |
calendarEvents = [ | |
e for e in chain.from_iterable(threadTasks[1:]) | |
if not any(True for t in tasks | |
if t.title == e.title and t.dueDate == e.dueDate and | |
t.dueTime == e.dueTime.replace(tzinfo=None))] | |
if calendarEvents: | |
calendarEvents.sort(key=lambda t: t.dueTime or beginningOfDay) | |
print('\n# Calendar events:', file=f) | |
print('# Uncomment the next line to skip until tomorrow.', file=f) | |
print('#!calendar', file=f) | |
for event in calendarEvents: | |
print(f'# {unparse_task(args, event)}', file=f) | |
print('', file=f) | |
print(''' | |
### Everything after this line will be ignored. | |
# Other lines starting with "#" will be ignored. | |
# | |
# You can: | |
# | |
# * Delete a task from this list to delete it entirely. | |
# * Put "=" in front of a task to prevent the script from modifying it in any | |
# way. | |
# * Put "-" in front of a task to mark it complete. If it's a repeating task, | |
# you can also prioritize it in the list for today. | |
# NOTE: When you both change the date on a task and mark it complete, the new | |
# date you specify is used as the completion date, rather than changing the | |
# due date. | |
# * Put "*" in front of a repeating task to "explode" it, i.e., separate | |
# today's task from the next repeating instance. Any edits you make to an | |
# exploded task (e.g., priority, date, time, sort order) apply to today's, | |
# not to the next repeating instance. | |
# * Put [YYYY-MM-DD], [YYYY-MM-DD HH:MM], or [HH:MM] at the start of a task | |
# to schedule it. You can specify "Yesterday" or "Tomorrow" as shorthand. | |
# * Add a new task, optionally scheduled as above. | |
# * Reorder tasks to prioritize them. | |
# * Put "!calendar" on a line by itself to skip calendar updates until | |
# tomorrow. | |
# * Put "!reedit" (or "!edit")" on a line by itself to edit the (updated) | |
# agenda again after your changes are sent to the server. This may be | |
# necessary, e.g., if you mark complete a repeating task from a previous day | |
# and you want to reprioritize it for today. | |
# * Put "!abort" on a line by itself to abort the agenda edit without changing | |
# anything in Toodledo. | |
# | |
# N.B. Changing the title of a task will mark the old one complete and create | |
# a new one with the specified title, which may not be what you want!''', | |
file=f) | |
f.flush() | |
try: | |
while True: | |
result = edit_and_process( | |
args, toodledo, folderId, tasks, f) | |
if result == 'reedit': # pylint: disable=no-else-return | |
os.unlink(f.name) | |
return True | |
elif result == 'abort': | |
sys.exit(f'Aborting as requested, edit saved in {f.name}') | |
elif result == 'success': | |
os.unlink(f.name) | |
return False | |
elif result == 'failure': | |
again = get_yorn( | |
'Task editing/processing failed. Edit again? ') | |
if not again: | |
sys.exit(f'Edit saved in {f.name}') | |
else: | |
raise Exception(f'Internal error: unrecognized response ' | |
f'{result} from edit_and_process') | |
except Exception: | |
print(f'Task editing/processing failed, edit saved in {f.name}', | |
file=sys.stderr) | |
raise | |
def edit_and_process( # pylint: disable=too-many-return-statements | |
args, toodledo, folderId, tasks, f): | |
return_value = 'success' | |
editor = os.environ.get('EDITOR', os.environ.get('VISUAL', None)) | |
if not editor: | |
print('You must set EDITOR or VISUAL environment variable', | |
file=sys.stderr) | |
return 'failure' | |
try: | |
subprocess.run((editor, f.name), encoding='UTF-8', check=True) | |
f.seek(0) | |
editedTasks = list(parse_tasks(args, f, folderId=folderId)) | |
if 'abort' in editedTasks: | |
return 'abort' | |
if 'reedit' in editedTasks: | |
editedTasks = [t for t in editedTasks if t != 'reedit'] | |
return_value = 'reedit' | |
except subprocess.CalledProcessError as e: | |
print(f'Editor failed ({e})', file=sys.stderr) | |
return 'failure' | |
except ValueError as e: | |
print(str(e)) | |
return 'failure' | |
if duplicates := check_duplicates(editedTasks): | |
print(f"Can't duplicate task titles in agenda: {duplicates}", | |
file=sys.stderr) | |
return 'failure' | |
# From this point on, we're going to have to cross-reference tasks in | |
# several different sets, so to make that easier we're going to use | |
# OrderedDicts for all of them. Strictly speaking OrderedDicts aren't | |
# necessary for all of them, but for consistency and to reduce confusion | |
# we'll use them across the board so we don't have to deal with them being | |
# a mixture of lists and OrderedDicts. | |
tasks = OrderedDict((t.title, t) for t in tasks) | |
editedTasks = OrderedDict((t.title, t) for t in editedTasks | |
if not cp(t, 'preserved')) | |
# Copy fields the user was unable to edit from old to new tasks. | |
if not args.edit_metadata: | |
for t in tasks.values(): | |
if found := editedTasks.get(t.title, None): | |
found.meta = t.meta | |
found.repeat = t.repeat | |
for t in tasks.values(): | |
if found := editedTasks.get(t.title, None): | |
found.id_ = t.id_ | |
renumber_tasks(editedTasks.values()) | |
# The dates on tasks we're marking complete are actually used as the | |
# completion date, not the due date. | |
for title, t in ((title, t) for title, t in editedTasks.items() | |
if cp(t, 'completed')): | |
t.completedDate = t.dueDate or today | |
t.dueDate = tasks[title].dueDate if title in tasks else None | |
fix_due_time(t) | |
# Pull tasks due before today to today | |
for t in (t for t in editedTasks.values() if not cp(t, 'completed')): | |
if t.dueDate and t.dueDate < today: | |
try: | |
ot = tasks[t.title] | |
except KeyError: | |
# Task created by user with explicit due date. | |
continue | |
if ot.dueDate != t.dueDate: | |
# If the user explicitly changed the due date, | |
# don't mess with it. | |
continue | |
t.dueDate = today | |
fix_due_time(t) | |
# Explode repeat-from-due-date tasks whose due dates have changed. | |
for title, t in editedTasks.items(): | |
if not t.repeat or 'FROMCOMP' in t.repeat or title not in tasks or \ | |
cp(t, 'completed'): | |
continue | |
ot = tasks[title] | |
if not ot.repeat: | |
# User is editing metadata and converted it into a repeating task, | |
# no need to explode. | |
continue | |
if ot.dueDate != t.dueDate: | |
cp(t, 'exploded', True) | |
# Tasks with due dates are always Top priority, other default to Top. | |
for t in (t for t in editedTasks.values() if t.priority != Priority.TOP): | |
if t.dueDate: | |
t.priority = Priority.TOP | |
elif t.priority is None: | |
t.priority = Priority.TOP | |
if not check_task_priorities(editedTasks.values()): | |
return 'failure' | |
# Handle all the exploding | |
for t in (t for t in editedTasks.values() if cp(t, 'exploded')): | |
if not t.id_: | |
raise Exception("Can't explode a task you're adding!") | |
if not t.repeat: | |
raise Exception("Can't explode a non-repeating task!") | |
explode_task(args, toodledo, tasks[t.title], t) | |
clean_meta(editedTasks.values()) | |
# Clear fake due times before adding them back on the tasks that need them. | |
working_set = (t for t in editedTasks.values() | |
if t.dueTime and t.dueTime.hour > 22) | |
for t in working_set: | |
t.dueTime = None | |
# Put in fake due times | |
working_set = (t for t in editedTasks.values() if t.dueDate == today) | |
working_set = (t for t in working_set if not t.dueTime) | |
working_set = sorted(working_set, key=task_key, reverse=True) | |
minute = 59 | |
if len(working_set) > 59: | |
decrement = 59 / len(working_set) | |
else: | |
decrement = 1 | |
for t in working_set: | |
t.dueTime = datetime.datetime.combine( | |
today, datetime.time(23, int(minute))) | |
minute -= decrement | |
deletedTasks = OrderedDict((title, t) for title, t in tasks.items() | |
if title not in editedTasks | |
and not cp(t, 'preserved')) | |
delete_tasks(args, toodledo, deletedTasks) | |
addedTasks = OrderedDict((title, t) for title, t in editedTasks.items() | |
if title not in tasks) | |
add_tasks(args, toodledo, addedTasks) | |
# This is a list, not an OrderedDict, and the tasks in the list contain | |
# only the id_ field and the other fields that have changed, not all of | |
# the task fields. | |
changes = find_changes(tasks, editedTasks) | |
make_changes = confirm_changes(args, tasks, changes) | |
if make_changes: | |
for t in changes: | |
t.reschedule = 1 | |
completed_titles = [t.title for t in editedTasks.values() | |
if t.completedDate] | |
toodledo.EditTasks(changes) | |
if completed_titles: | |
account = toodledo.GetAccount() | |
relevant = relevant_rescheduled_tasks( | |
toodledo, account, completed_titles) | |
if relevant: | |
print('Reediting because completed and rescheduled ' | |
'task(s) is/are still in agenda range:') | |
for task in relevant: | |
print(f' {unparse_task(args, task)}') | |
return 'reedit' | |
return return_value | |
def find_changes(oldTasks, newTasks): | |
'''Return a list of tasks containing id_ and changed fields.''' | |
changes = [] | |
for title, newTask in newTasks.items(): | |
try: | |
oldTask = oldTasks[title] | |
except KeyError: | |
continue | |
differences = {key: value for key, value in newTask.__dict__.items() | |
if value != getattr(oldTask, key)} | |
if differences: | |
changes.append(Task(id_=newTask.id_, **differences)) | |
return changes | |
def confirm_changes(args, old, changes): | |
if not changes: | |
return False | |
change_lines = [] | |
by_id = {t.id_: t for t in old.values()} | |
for c in changes: | |
old = by_id[c.id_] | |
change_strings = [] | |
for key, value in c.__dict__.items(): | |
label = f'{key} ' | |
old_value = getattr(old, key) | |
if key in ('id_', 'meta'): | |
continue | |
if key == 'priority': | |
if value: | |
value = value.name.title() | |
old_value = old.priority.name.title() if old.priority else None | |
label = '' | |
elif key == 'dueDateModifier': | |
value = value.name.title() | |
old_value = old.dueDateModifier.name.title() | |
label = '' | |
elif key == 'dueDate': | |
label = 'due ' | |
elif key == 'completedDate': | |
label = 'completed ' | |
elif key == 'dueTime': | |
if value and value.hour > 22: | |
value = None | |
if old_value and old_value.hour > 22: | |
old_value = None | |
value = format_time(value) if value else None | |
old_value = format_time(old_value) if old_value else None | |
label = '' | |
if value == old_value: | |
continue | |
change_strings.append(f'{label}{old_value} -> {value}') | |
if not change_strings: | |
continue | |
change_lines.append( | |
f' {old.title}: {", ".join(change_strings)}') | |
if change_lines: | |
print('Changes:\n') | |
for line in change_lines: | |
print(line) | |
else: | |
print('Only metadata are changing.') | |
if args.dryrun: | |
return False | |
if args.confirm and not get_yorn('Proceed? '): | |
print('OK, skipping.') | |
return False | |
return True | |
def renumber_tasks(tasks, force=False): | |
tasks = list(tasks) # We need random access | |
if force or not any(get_meta(t, 'jkaorder') for t in tasks): | |
for i, t in enumerate(tasks): | |
set_meta(t, 'jkaorder', i + 1) | |
return | |
last_ord = 0.0 | |
changed_count = 0 | |
for i, t in enumerate(tasks): | |
if get_meta(t, 'jkaorder') and get_meta(t, 'jkaorder') > last_ord: | |
last_ord = get_meta(t, 'jkaorder') | |
# Bug workaround: multiple jkaorder values in some tasks. | |
# See code in set_meta. | |
set_meta(t, 'jkaorder', last_ord) | |
continue | |
value, count = next_ord(tasks, i, last_ord) | |
if not value: | |
for t in tasks[i:]: | |
last_ord += 1 | |
set_meta(t, 'jkaorder', last_ord) | |
changed_count += 1 | |
break | |
set_meta(t, 'jkaorder', last_ord + (value - last_ord) / (count + 1)) | |
last_ord = get_meta(t, 'jkaorder') | |
changed_count += 1 | |
if changed_count > len(tasks) / 2: | |
# Too many changes, renumber everything | |
renumber_tasks(tasks, force=True) | |
def check_task_priorities(tasks): | |
tasks = list(t for t in tasks if not t.dueDate) | |
last_prio = Priority.HIGH | |
valid = True | |
for t in tasks: | |
if t.priority.value > last_prio.value: | |
print(f'Task {t.title} is out of order given its priority') | |
valid = False | |
last_prio = t.priority | |
return valid | |
def clean_meta(tasks): | |
for t in tasks: | |
if not t.meta: | |
continue | |
# Obsolete metadata we're no longer using | |
t.meta = re.sub(r'(^|\s*;)\s*preserve\s*=[^;]+', '', t.meta) | |
t.meta = re.sub(r';;+', ';', t.meta) | |
def next_ord(tasks, i, last_ord): | |
skipped = 0 | |
for t in tasks[i+1:]: | |
skipped += 1 | |
if (get_meta(t, 'jkaorder') or 0) > last_ord: | |
return get_meta(t, 'jkaorder'), skipped | |
return 0, 0 | |
def parse_tasks(args, file, **kwargs): | |
for t in file: | |
if t.startswith('### Everything after'): | |
return | |
if t.startswith('#'): | |
continue | |
t = t.strip() | |
if not t: | |
continue | |
task = parse_task(args, t, **kwargs) | |
if task is not None: | |
yield task | |
def check_duplicates(tasks): | |
titles = defaultdict(int) | |
for task in tasks: | |
titles[task.title] += 1 | |
dups = sorted(title for title, count in titles.items() if count > 1) | |
return ', '.join(f'"{title}"' for title in dups) | |
def changed(args, old_task, task): | |
# pylint: disable=consider-using-in | |
if old_task.dueDate != task.dueDate and task.dueDate != today: | |
print(f'Moving {task.title} to {task.dueDate}') | |
# pylint: enable=consider-using-in | |
# Previously set -> Currently same | |
# Previously unset -> Currently unset | |
if due_time(old_task) == due_time(task): | |
pass | |
# Previously unset -> Currently set | |
elif not due_time(old_task) and due_time(task): | |
print(f'Setting due time of {task.title} to {due_time(task)}') | |
# Previously set -> Currently unset | |
elif due_time(old_task) and not due_time(task): | |
print(f'Removing due time from {task.title}') | |
# Previously set -> Currently different | |
else: | |
print(f'Changing due time of {task.title} to {due_time(task)}') | |
if args.edit_metadata: | |
if old_task.meta is not None and task.meta is None: | |
task.meta = '' | |
if old_task.repeat is not None and task.repeat is None: | |
task.repeat = '' | |
if old_task.repeat != task.repeat: | |
return True | |
return (task.completedDate or | |
old_task.meta != task.meta or | |
old_task.dueDate != task.dueDate or | |
old_task.dueTime != task.dueTime) | |
def due_time(task): | |
if not task.dueTime: | |
return None | |
if task.dueTime.hour > 22: | |
return None | |
return f'{task.dueTime.hour}:{task.dueTime.minute:02}' | |
def relevant_rescheduled_tasks(toodledo, account, titles): | |
"""Check if any tasks were rescheduled for soon enough to be relevant""" | |
modifiedTasks = toodledo.GetTasks(after=account.lastEditTask, | |
fields="duedate", comp=0) | |
return list(task for task in modifiedTasks | |
if task.title in titles and task.dueDate and | |
task.dueDate <= today) | |
def delete_tasks(args, toodledo, tasks): | |
if not tasks: | |
return | |
print('Deleting tasks:') | |
print('', '\n '.join(tasks.keys())) | |
if args.dryrun: | |
return | |
if args.confirm and not get_yorn('Proceed? '): | |
print('OK, skipping.') | |
return | |
toodledo.DeleteTasks([Task(id_=t.id_) for t in tasks.values()]) | |
def add_tasks(args, toodledo, tasks): | |
if not tasks: | |
return | |
print('Adding tasks:') | |
print('', '\n '.join(tasks.keys())) | |
if args.dryrun: | |
return | |
if args.confirm and not get_yorn('Proceed? '): | |
print('OK, skipping.') | |
return | |
toodledo.AddTasks(tasks.values()) | |
def explode_task(args, toodledo, oldTask, task): | |
if not (args.dryrun or args.confirm): | |
print(f'Exploding "{task.title}"') | |
if args.dryrun: | |
return | |
# 1. Mark the repeating task completed, which will cause a NEW task | |
# (different ID) to be created which is marked completed, and the | |
# OLD task to be rescheduled automatically. | |
# 2. Find the new task. | |
# 3. Uncomplete it. | |
# 4. Update our ID to match the ID of the new task. | |
if args.confirm and not get_yorn( | |
f'Proceed with exploding "{task.title}"? '): | |
print('OK, skipping.\n') | |
return | |
with toodledo.caching_everything(): | |
while True: # See below for explanation of why this is a loop | |
toodledo.EditTasks( | |
[Task(id_=task.id_, completedDate=today, reschedule=1)]) | |
taskList = toodledo.GetTasks( | |
# Five minutes in the past as a clock skew safety margin | |
after=datetime.datetime.now().timestamp() - 300, | |
fields='duedate') | |
taskList.sort(key=lambda t: t.modified, reverse=True) | |
completeTask = next(t for t in taskList | |
if t.title == task.title and | |
t.completedDate is not None) | |
incompleteTask = next(t for t in taskList | |
if t.title == task.title and | |
t.completedDate is None) | |
# What happens, e.g., if there's a daily from-due-date task due on | |
# Monday, and we don't run agenda.py on Tuesday, and then we run it | |
# on Wednesday? The script will change the due date of the task to | |
# Wednesday, which will cause it to be exploded automatically, but | |
# then the recurring task will be due _Tuesday_, the day before the | |
# task newly created by the explode. If this happens, we want to | |
# keep exploding the task until its due date is on or after the | |
# due date of the newly created task. | |
if incompleteTask.dueDate >= task.dueDate: | |
break | |
print(f'Recurring task "{task.title}" due date is before desired ' | |
f'due date after explode; exploding again.') | |
toodledo.DeleteTasks([completeTask]) | |
if task.dueDate == incompleteTask.dueDate: | |
print(f'Identical due dates for "{task.title}" after explode; ' | |
f'undoing explode.') | |
toodledo.DeleteTasks([completeTask]) | |
# The edit has already been made on the server so we update our | |
# old task so we don't try to make the same edit again later when | |
# we're propagating changes. | |
oldTask.dueDate = task.dueDate | |
else: | |
toodledo.EditTasks([ | |
Task(id_=completeTask.id_, completedDate=None)]) | |
task.id_ = completeTask.id_ | |
oldTask.id_ = task.id_ | |
def fix_due_time(task): | |
if not task.dueDate: | |
task.dueTime = None | |
return | |
if not task.dueTime: | |
return | |
try: | |
if task.dueTime.date() == task.dueDate: | |
return | |
except AttributeError: | |
return | |
task.dueTime.replace(year=task.dueDate.year, | |
month=task.dueDate.month, | |
day=task.dueDate.day) | |
def format_time(time): | |
return f'{time.hour or "0"}:{time.minute:02}' | |
def unparse_task(args, task): | |
title = task.title | |
special = re.match(r'([\[\#\-=*!])', title) | |
params = [] | |
if task.dueDate == yesterday: | |
params.append('Yesterday') | |
elif task.dueDate == today: | |
params.append('Today') | |
elif task.dueDate == tomorrow: | |
params.append('Tomorrow') | |
elif task.dueDate: | |
params.append(str(task.dueDate)) | |
# Due times after 11pm are assumed to be fake times for prioritization. | |
if getattr(task, 'dueTime', None) and task.dueTime.hour < 23: | |
formattedTime = format_time(task.dueTime) | |
params.append(formattedTime) | |
if getattr(task, 'priority', None) is not None: | |
params.append(task.priority.name.title()) | |
ddm = getattr(task, 'dueDateModifier', None) | |
if ddm and ddm != DUE_BY: | |
params.append(task.dueDateModifier.name.title()) | |
if args.edit_metadata: | |
if task.meta: | |
params.append(f'meta="{task.meta}"') | |
if task.repeat: | |
params.append(f'repeat="{task.repeat}"') | |
params = "[" + " ".join(params) + "] " if params or special else "" | |
return f'{params}{title}' | |
def add_missing_fields(t): | |
for field in ('completedDate', 'meta', 'dueDate', | |
'dueDateModifier', 'dueTime', 'priority', 'id_', | |
'priority'): | |
try: | |
getattr(t, field) | |
except AttributeError: | |
setattr(t, field, None) | |
# Some fields are empty string, not None, when empty | |
for field in ('repeat',): | |
try: | |
getattr(t, field) | |
except AttributeError: | |
setattr(t, field, '') | |
return t | |
def parse_task(args, text, **kwargs): | |
text = text.strip() | |
task = Task(**kwargs) | |
add_missing_fields(task) | |
exploded = None | |
completed = None | |
preserved = None | |
match = re.match(r'\s*([-*=!])\s*(.*)', text) | |
if match: | |
text = match[2].strip() | |
if match[1] == '-': | |
completed = True | |
elif match[1] == '*': | |
exploded = True | |
elif match[1] == '!': | |
if text.lower() == 'calendar': | |
if config.get('calendar_skip', None) != str(today): | |
config['calendar_skip'] = str(today) | |
write_config(args) | |
print('Skipping calendars until tomorrow') | |
elif text.lower() in ('reedit', 'edit'): | |
return 'reedit' | |
elif text.lower() == 'abort': | |
return 'abort' | |
else: | |
raise ValueError(f'Unrecognized meta command !{text}') | |
return None | |
else: | |
preserved = True | |
match = re.match(r'\s*\[\s*(.*?)\s*\]\s*(.*)', text) | |
if match: | |
params = match[1] | |
task.title = match[2] | |
taskDate = None | |
taskHour = None | |
while params: | |
match = re.match(r'(?i)\s*(Yesterday|Today|Tomorrow|' | |
r'\d\d\d\d-\d\d-\d\d)\s*(.*)', params) | |
if match: | |
if taskDate: | |
raise ValueError(f'Date specified twice in "{text}"') | |
taskDate = match[1] | |
params = match[2] | |
continue | |
match = re.match(r'\s*(\d+):(\d+)\s*(.*)', params) | |
if match: | |
if taskHour is not None: | |
raise ValueError(f'Time specified twice in "{text}"') | |
taskHour = int(match[1]) | |
taskMinute = int(match[2]) | |
params = match[3] | |
continue | |
match = re.match(r'(?i)\s*repeat\s*=\s*"\s*([^\"]*?)\s*"\s*(.*)', | |
params) | |
if match: | |
if task.repeat: | |
raise ValueError(f'Repeat specified twice in "{text}"') | |
task.repeat = match[1] | |
params = match[2] | |
continue | |
match = re.match(r'(?i)\s*meta\s*=\s*"\s*([^\"]*?)\s*"\s*(.*)', | |
params) | |
if match: | |
if task.meta is not None: | |
raise ValueError(f'Meta specified twice in "{text}"') | |
task.meta = match[1] | |
params = match[2] | |
continue | |
match = re.match(r'(?i)\s*(top|high|medium|low|negative)\s*(.*)', | |
params) | |
if match: | |
if task.priority is not None: | |
raise ValueError(f'Priority specified twice in "{text}"') | |
task.priority = getattr(Priority, match[1].upper()) | |
params = match[2] | |
continue | |
match = re.match(r'(?i)\s*(due_(?:by|on|after))\s*(.*)', params) | |
if match: | |
if task.dueDateModifier is not None: | |
raise ValueError( | |
f'Due date modifier specified twice in "{text}"') | |
task.dueDateModifier = getattr(DueDateModifier, | |
match[1].upper()) | |
params = match[2] | |
continue | |
raise ValueError( | |
f'Malformed parameter spec "{params}" in "{text}"') | |
if (taskDate or '').lower() == 'yesterday': | |
taskDate = yesterday | |
elif (taskDate or '').lower() == 'tomorrow': | |
taskDate = tomorrow | |
elif taskDate and taskDate.lower() == 'today': | |
taskDate = today | |
elif taskDate: | |
taskDate = datetime.datetime.strptime(taskDate, '%Y-%m-%d').date() | |
task.dueDate = taskDate | |
if taskHour and taskHour < 23: | |
if taskDate is None: | |
raise ValueError(f'Can\'t have due time without due date ' | |
f'(for "{text}")') | |
taskTime = datetime.datetime( | |
taskDate.year, taskDate.month, taskDate.day, taskHour, | |
taskMinute) | |
task.dueTime = taskTime | |
else: | |
task.title = text | |
if not task.dueDateModifier: | |
task.dueDateModifier = DUE_BY | |
cp(task, 'completed', completed) | |
cp(task, 'exploded', exploded) | |
cp(task, 'preserved', preserved) | |
return add_missing_fields(task) | |
# pylint: disable=inconsistent-return-statements # OK for a setter/getter | |
def cp(task, prop_name, value=None): | |
"""Get or set a custom property on a task. | |
This is necessary because we can't just add custom properties to Task | |
objects or Marshmallow will complain. | |
The name of this method (and the one below) is very short not to make | |
the code opaque, but rather to avoid cluttering it up with long method | |
names. | |
If getting, then returns None if the property isn't set. | |
Keyword arguments: | |
value -- Set properly if specified, get property otherwise | |
""" | |
if value is not None: | |
customProperties[task.title][prop_name] = value | |
else: | |
return customProperties[task.title].get(prop_name, None) | |
# pylint: enable=inconsistent-return-statements | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment