Useful for database backend migrations and/or backup and restore operations.
Requires Python 3.6+ and AIOHTTP 3.7.4+
Run it on your mailman server. You may need to edit the API credentials if you changed the defaults.
Useful for database backend migrations and/or backup and restore operations.
Requires Python 3.6+ and AIOHTTP 3.7.4+
Run it on your mailman server. You may need to edit the API credentials if you changed the defaults.
import asyncio | |
import json | |
from pathlib import Path | |
from typing import Any, Awaitable | |
from aiohttp.client import ClientSession | |
API_URL = "http://restadmin:restpass@localhost:8001/3.1" | |
OUT_DIR = Path("./mailman_dump_prod") | |
async def load_url(sess: ClientSession, url: str) -> Any: | |
cache_file = OUT_DIR / ( | |
"cache." + url[len(API_URL) :].replace("/", ".").replace("@", ".") + ".json" | |
) | |
if cache_file.exists(): | |
with cache_file.open("r") as cache_fd: | |
return json.load(cache_fd) | |
res = await sess.get(url) | |
if res.status != 200: | |
raise ValueError(f"Response code {res.status}") | |
data = await res.json() | |
with cache_file.open("w") as cache_fd: | |
json.dump(data, cache_fd) | |
return data | |
async def main() -> None: | |
if not OUT_DIR.exists(): | |
OUT_DIR.mkdir() | |
async with ClientSession() as sess: | |
print("Loading lists") | |
lists: dict = await load_url(sess, f"{API_URL}/lists") | |
print("Loading members") | |
await load_url(sess, f"{API_URL}/members") | |
print("Loading list configs") | |
list_config_queries: list[Awaitable[dict]] = [ | |
load_url(sess, f"{API_URL}/lists/{lst['fqdn_listname']}/config") | |
for lst in lists["entries"] | |
] | |
await asyncio.gather(*list_config_queries) | |
print("Done") | |
if __name__ == "__main__": | |
asyncio.get_event_loop().run_until_complete(main()) |
import asyncio | |
import json | |
from pathlib import Path | |
from typing import Any | |
from aiohttp.client import ClientSession | |
API_URL = "http://restadmin:restpass@localhost:8002/3.1" | |
OUT_DIR = Path("./mailman_dump_prod") | |
# Fields to ignore when setting config on a list | |
# All tested to either be read-only or cause explicit erros | |
IGNORED_KEYS = [ | |
"bounces_address", | |
"fqdn_listname", | |
"join_address", | |
"last_post_at", | |
"leave_address", | |
"list_name", | |
"mail_host", | |
"next_digest_number", | |
"no_reply_address", | |
"owner_address", | |
"post_id", | |
"posting_address", | |
"request_address", | |
"usenet_watermark", | |
"volume", | |
"http_etag", | |
"created_at", | |
"digest_last_sent_at", | |
] | |
SUBSCRIPTION_KEYS = ["delivery_mode", "list_id", "role", "display_name"] | |
semaphore = asyncio.Semaphore(value=5) | |
def load_url(sess: ClientSession, url: str) -> Any: | |
cache_file = OUT_DIR / ( | |
"cache." + url[len(API_URL) :].replace("/", ".").replace("@", ".") + ".json" | |
) | |
if cache_file.exists(): | |
with cache_file.open("r") as cache_fd: | |
return json.load(cache_fd) | |
raise ValueError("Failed to load cache data for " + url) | |
async def add_subscription(sess: ClientSession, subscription: dict) -> None: | |
data_filtered = {k: v for k, v in subscription.items() if k in SUBSCRIPTION_KEYS} | |
async with semaphore: | |
res = await sess.post( | |
f"{API_URL}/members", | |
json={ | |
**data_filtered, | |
"subscriber": subscription["email"], | |
"pre_verified": "True", | |
"pre_confirmed": "True", | |
"pre_approved": "True", | |
"invitation": "False", | |
"send_welcome_message": "False", | |
}, | |
) | |
print(res.status, "adding", subscription["email"], "to", subscription["list_id"]) | |
if res.status > 210: | |
text = await res.text() | |
print(text) | |
return | |
async def main() -> None: | |
if not OUT_DIR.exists(): | |
OUT_DIR.mkdir() | |
async with ClientSession() as sess: | |
print("Loading lists") | |
lists: dict = load_url(sess, f"{API_URL}/lists") | |
print("Loading members") | |
members: dict = load_url(sess, f"{API_URL}/members") | |
print("Loading list configs") | |
list_configs: list[dict] = [ | |
load_url(sess, f"{API_URL}/lists/{mlist['fqdn_listname']}/config") | |
for mlist in lists["entries"] | |
] | |
print("Creating lists") | |
for i, mlist in enumerate(lists["entries"]): | |
res = await sess.post( | |
f"{API_URL}/lists", json={k: v for k, v in mlist.items() if k in ["fqdn_listname"]}, | |
) | |
print(res.status, "for list", mlist["fqdn_listname"]) | |
if res.status > 210: | |
text = await res.text() | |
print(text) | |
# Skip this list if the error is NOT already exists | |
# If it does exist, re-apply the config | |
# if "list exists" not in text.lower(): | |
# continue | |
continue | |
config = {} | |
for k, v in list_configs[i].items(): | |
if k not in IGNORED_KEYS: | |
# Mailman crashes if bool types are not string. | |
if isinstance(v, bool): | |
config[k] = str(v) | |
else: | |
config[k] = v | |
res = await sess.put(f"{API_URL}/lists/{mlist['fqdn_listname']}/config", json=config) | |
print(res.status, "setting config for list", mlist["fqdn_listname"]) | |
if res.status > 210: | |
text = await res.text() | |
print(text) | |
print("Creating subscriptions") | |
sub_calls = [add_subscription(sess, subscription) for subscription in members["entries"]] | |
await asyncio.gather(*sub_calls, return_exceptions=False) | |
if __name__ == "__main__": | |
asyncio.get_event_loop().run_until_complete(main()) |
aiohttp >= 3.7.4, < 4 |