Useful for database backend migrations and/or backup and restore operations.
Requires Python 3.6+ and AIOHTTP 3.7.4+
Run it on your mailman server. You may need to edit the API credentials if you changed the defaults.
Useful for database backend migrations and/or backup and restore operations.
Requires Python 3.6+ and AIOHTTP 3.7.4+
Run it on your mailman server. You may need to edit the API credentials if you changed the defaults.
| import asyncio | |
| import json | |
| from pathlib import Path | |
| from typing import Any, Awaitable | |
| from aiohttp.client import ClientSession | |
| API_URL = "http://restadmin:restpass@localhost:8001/3.1" | |
| OUT_DIR = Path("./mailman_dump_prod") | |
| async def load_url(sess: ClientSession, url: str) -> Any: | |
| cache_file = OUT_DIR / ( | |
| "cache." + url[len(API_URL) :].replace("/", ".").replace("@", ".") + ".json" | |
| ) | |
| if cache_file.exists(): | |
| with cache_file.open("r") as cache_fd: | |
| return json.load(cache_fd) | |
| res = await sess.get(url) | |
| if res.status != 200: | |
| raise ValueError(f"Response code {res.status}") | |
| data = await res.json() | |
| with cache_file.open("w") as cache_fd: | |
| json.dump(data, cache_fd) | |
| return data | |
| async def main() -> None: | |
| if not OUT_DIR.exists(): | |
| OUT_DIR.mkdir() | |
| async with ClientSession() as sess: | |
| print("Loading lists") | |
| lists: dict = await load_url(sess, f"{API_URL}/lists") | |
| print("Loading members") | |
| await load_url(sess, f"{API_URL}/members") | |
| print("Loading list configs") | |
| list_config_queries: list[Awaitable[dict]] = [ | |
| load_url(sess, f"{API_URL}/lists/{lst['fqdn_listname']}/config") | |
| for lst in lists["entries"] | |
| ] | |
| await asyncio.gather(*list_config_queries) | |
| print("Done") | |
| if __name__ == "__main__": | |
| asyncio.get_event_loop().run_until_complete(main()) |
| import asyncio | |
| import json | |
| from pathlib import Path | |
| from typing import Any | |
| from aiohttp.client import ClientSession | |
| API_URL = "http://restadmin:restpass@localhost:8002/3.1" | |
| OUT_DIR = Path("./mailman_dump_prod") | |
| # Fields to ignore when setting config on a list | |
| # All tested to either be read-only or cause explicit erros | |
| IGNORED_KEYS = [ | |
| "bounces_address", | |
| "fqdn_listname", | |
| "join_address", | |
| "last_post_at", | |
| "leave_address", | |
| "list_name", | |
| "mail_host", | |
| "next_digest_number", | |
| "no_reply_address", | |
| "owner_address", | |
| "post_id", | |
| "posting_address", | |
| "request_address", | |
| "usenet_watermark", | |
| "volume", | |
| "http_etag", | |
| "created_at", | |
| "digest_last_sent_at", | |
| ] | |
| SUBSCRIPTION_KEYS = ["delivery_mode", "list_id", "role", "display_name"] | |
| semaphore = asyncio.Semaphore(value=5) | |
| def load_url(sess: ClientSession, url: str) -> Any: | |
| cache_file = OUT_DIR / ( | |
| "cache." + url[len(API_URL) :].replace("/", ".").replace("@", ".") + ".json" | |
| ) | |
| if cache_file.exists(): | |
| with cache_file.open("r") as cache_fd: | |
| return json.load(cache_fd) | |
| raise ValueError("Failed to load cache data for " + url) | |
| async def add_subscription(sess: ClientSession, subscription: dict) -> None: | |
| data_filtered = {k: v for k, v in subscription.items() if k in SUBSCRIPTION_KEYS} | |
| async with semaphore: | |
| res = await sess.post( | |
| f"{API_URL}/members", | |
| json={ | |
| **data_filtered, | |
| "subscriber": subscription["email"], | |
| "pre_verified": "True", | |
| "pre_confirmed": "True", | |
| "pre_approved": "True", | |
| "invitation": "False", | |
| "send_welcome_message": "False", | |
| }, | |
| ) | |
| print(res.status, "adding", subscription["email"], "to", subscription["list_id"]) | |
| if res.status > 210: | |
| text = await res.text() | |
| print(text) | |
| return | |
| async def main() -> None: | |
| if not OUT_DIR.exists(): | |
| OUT_DIR.mkdir() | |
| async with ClientSession() as sess: | |
| print("Loading lists") | |
| lists: dict = load_url(sess, f"{API_URL}/lists") | |
| print("Loading members") | |
| members: dict = load_url(sess, f"{API_URL}/members") | |
| print("Loading list configs") | |
| list_configs: list[dict] = [ | |
| load_url(sess, f"{API_URL}/lists/{mlist['fqdn_listname']}/config") | |
| for mlist in lists["entries"] | |
| ] | |
| print("Creating lists") | |
| for i, mlist in enumerate(lists["entries"]): | |
| res = await sess.post( | |
| f"{API_URL}/lists", json={k: v for k, v in mlist.items() if k in ["fqdn_listname"]}, | |
| ) | |
| print(res.status, "for list", mlist["fqdn_listname"]) | |
| if res.status > 210: | |
| text = await res.text() | |
| print(text) | |
| # Skip this list if the error is NOT already exists | |
| # If it does exist, re-apply the config | |
| # if "list exists" not in text.lower(): | |
| # continue | |
| continue | |
| config = {} | |
| for k, v in list_configs[i].items(): | |
| if k not in IGNORED_KEYS: | |
| # Mailman crashes if bool types are not string. | |
| if isinstance(v, bool): | |
| config[k] = str(v) | |
| else: | |
| config[k] = v | |
| res = await sess.put(f"{API_URL}/lists/{mlist['fqdn_listname']}/config", json=config) | |
| print(res.status, "setting config for list", mlist["fqdn_listname"]) | |
| if res.status > 210: | |
| text = await res.text() | |
| print(text) | |
| print("Creating subscriptions") | |
| sub_calls = [add_subscription(sess, subscription) for subscription in members["entries"]] | |
| await asyncio.gather(*sub_calls, return_exceptions=False) | |
| if __name__ == "__main__": | |
| asyncio.get_event_loop().run_until_complete(main()) |
| aiohttp >= 3.7.4, < 4 |