Created
January 5, 2024 22:56
-
-
Save Terrance/f7f75d4161f60cdc6bb0cb7282006240 to your computer and use it in GitHub Desktop.
Script to migrate calendars and address books from a Radicale user collection to a Baikal principal (assuming a PostgreSQL database backend).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Migrate Radicale calendars and contacts to Baikal. | |
""" | |
import datetime | |
import json | |
import pathlib | |
import time | |
import uuid | |
import icalendar | |
import psycopg | |
import recurring_ical_events | |
USAGE = """ | |
Usage: | |
{} <root> <dsn> <user> | |
Parameters: | |
root path to user subdirectory of Radicale's collection-root | |
dsn PostgreSQL connection string (e.g. "user=http dbname=baikal") | |
user Baikal principal username to receive data | |
""".strip() | |
def etag(): | |
return str(uuid.uuid4()).replace("-", "") | |
def timestamp(component, *fields): | |
for field in fields: | |
try: | |
value = component[field].dt | |
except KeyError: | |
continue | |
else: | |
break | |
else: | |
return None | |
if not isinstance(value, datetime.datetime): | |
value = datetime.datetime.combine(value, datetime.time()) | |
return int(value.timestamp()) | |
def main(root: str, dsn: str, user: str): | |
with psycopg.connect(dsn) as conn: | |
with conn.cursor() as cur: | |
principal = "principals/{}".format(user) | |
cur.execute("SELECT id FROM principals WHERE uri = %s", (principal,)) | |
uid = cur.fetchone()[0] | |
print("Using principal: {} ({})".format(uid, user)) | |
for res in pathlib.Path(root).iterdir(): | |
if not res.is_dir(): | |
continue | |
props = res / ".Radicale.props" | |
if not props.is_file(): | |
continue | |
with open(props) as fd: | |
meta = json.load(fd) | |
tag = meta["tag"] | |
name = meta["D:displayname"] | |
uri = res.name | |
print("# {}: {} ({})".format(uri, name, tag)) | |
if tag == "VCALENDAR": | |
types = meta["C:supported-calendar-component-set"] | |
colour = meta.get("ICAL:calendar-color") | |
cur.execute("SELECT id, calendarid FROM calendarinstances WHERE principaluri = %s AND uri = %s", (principal, uri,)) | |
if cur.rowcount: | |
cid, ciid = cur.fetchone() | |
print("Using existing calendar ({}, instance {})".format(cid, ciid)) | |
cur.execute("SELECT COUNT(id) FROM calendarobjects WHERE calendarid = %s", (cid,)) | |
count = cur.fetchone()[0] | |
if count: | |
print("! Calendar not empty") | |
continue | |
else: | |
cur.execute("INSERT INTO calendars (components) VALUES (%s) RETURNING id", (types,)) | |
cid = cur.fetchone()[0] | |
cur.execute( | |
"""INSERT INTO calendarinstances | |
(calendarid, principaluri, displayname, uri, calendarcolor, timezone) | |
VALUES (%s, %s, %s, %s, %s, 'Europe/London') | |
RETURNING id""", | |
(cid, principal, name, uri, colour), | |
) | |
ciid = cur.fetchone()[0] | |
print("Created new calendar ({}, instance {})".format(cid, ciid)) | |
with cur.copy("COPY calendarobjects (calendardata, uri, calendarid, lastmodified, etag, size, componenttype, firstoccurence, lastoccurence, uid) FROM STDIN") as copy: | |
for item in res.iterdir(): | |
if not item.is_file() or not item.name.endswith(".ics"): | |
continue | |
data = item.read_text() | |
cal = icalendar.Calendar.from_ical(data) | |
for comp in cal.subcomponents: | |
if comp.name in ("VEVENT", "VTODO", "VJOURNAL"): | |
break | |
else: | |
raise Exception("{}: no valid component?".format(item.name)) | |
sid = comp["UID"] | |
if comp.name == "VEVENT": | |
events = recurring_ical_events.of(cal).all() | |
try: | |
start = min(timestamp(event, "DTSTART") for event in events) | |
end = max(timestamp(event, "DTEND") for event in events) | |
except ValueError: | |
print("{}: no occurences?".format(item.name)) | |
continue | |
else: | |
start = timestamp(comp, "DTSTART") | |
end = timestamp(comp, "DTEND", "DUE") or start | |
copy.write_row((data, item.name, cid, int(time.time()), etag(), len(data), comp.name, start, end, sid)) | |
elif tag == "VADDRESSBOOK": | |
colour = meta.get("{http://inf-it.com/ns/ab/}addressbook-color") | |
cur.execute("SELECT id FROM addressbooks WHERE principaluri = %s AND uri = %s", (principal, uri,)) | |
if cur.rowcount: | |
aid = cur.fetchone()[0] | |
print("Using existing address book ({})".format(aid)) | |
cur.execute("SELECT COUNT(id) FROM cards WHERE addressbookid = %s", (aid,)) | |
count = cur.fetchone()[0] | |
if count: | |
print("! Address book not empty") | |
continue | |
else: | |
cur.execute( | |
"""INSERT INTO addressbooks (principaluri, displayname, uri) | |
VALUES (%s, %s, %s) RETURNING id""", | |
(principal, name, uri), | |
) | |
aid = cur.fetchone()[0] | |
print("Created new address book ({})".format(aid)) | |
with cur.copy("COPY cards (addressbookid, carddata, uri, lastmodified, etag, size) FROM STDIN") as copy: | |
for item in res.iterdir(): | |
if not item.is_file() or not item.name.endswith(".vcf"): | |
continue | |
data = item.read_text() | |
copy.write_row((aid, data, item.name, int(time.time()), etag(), len(data))) | |
else: | |
continue | |
if __name__ == "__main__": | |
import sys | |
if len(sys.argv) != 4: | |
print(USAGE.format(sys.argv[0]), file=sys.stderr) | |
exit(1) | |
main(*sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment