-
-
Save RobertKrajewski/5847ce49333062ea4be1a08f2913288c to your computer and use it in GitHub Desktop.
import os | |
import sqlite3 | |
from datetime import datetime, date | |
from typing import Tuple, Dict, List | |
import getpass | |
from mattermostdriver import Driver | |
import pathlib | |
import json | |
def connect(host: str, login_token: str = None, username: str = None, password: str = None) -> Driver: | |
d = Driver({ | |
"url": host, | |
"port": 443, | |
"token": login_token, | |
"username": username, | |
"password": password | |
}) | |
d.login() | |
return d | |
def get_users(d: Driver) -> Tuple[Dict[str, str], str]: | |
my_user = d.users.get_user("me") | |
my_username = my_user["username"] | |
my_user_id = my_user["id"] | |
print(f"Successfully logged in as {my_username} ({my_user_id})") | |
# Get all usernames as we want to use those instead of the user ids | |
user_id_to_name = {} | |
page = 0 | |
print("Downloading all user information... ", end="") | |
while True: | |
users_resp = d.users.get_users(params={"per_page": 200, "page": page}) | |
if len(users_resp) == 0: | |
break | |
for user in users_resp: | |
user_id_to_name[user["id"]] = user["username"] | |
page += 1 | |
print(f"Found {len(user_id_to_name)} users!") | |
return user_id_to_name, my_user_id | |
def select_team(d: Driver, my_user_id: str) -> str: | |
print("Downloading all team information... ", end="") | |
teams = d.teams.get_user_teams(my_user_id) | |
print(f"Found {len(teams)} teams!") | |
for i_team, team in enumerate(teams): | |
print(f"{i_team}\t{team['name']}\t({team['id']})") | |
team_idx = int(input("Select team by idx: ")) | |
team = teams[team_idx] | |
print(f"Selected team {team['name']}") | |
return team | |
def select_channel(d: Driver, team: str, my_user_id: str, user_id_to_name: Dict[str, str], | |
verbose: bool = False) -> List[str]: | |
print("Downloading all channel information... ", end="") | |
channels = d.channels.get_channels_for_user(my_user_id, team["id"]) | |
# Add display name to direct messages | |
for channel in channels: | |
if channel["type"] != "D": | |
continue | |
# The channel name consists of two user ids connected by a double underscore | |
user_ids = channel["name"].split("__") | |
other_user_id = user_ids[1] if user_ids[0] == my_user_id else user_ids[0] | |
channel["display_name"] = user_id_to_name[other_user_id] | |
# Sort channels by name for easier search | |
channels = sorted(channels, key=lambda x: x["display_name"].lower()) | |
print(f"Found {len(channels)} channels!") | |
for i_channel, channel in enumerate(channels): | |
if verbose: | |
channel_id = f"\t({channel['id']})" | |
else: | |
channel_id = "" | |
print(f"{i_channel}\t{channel['display_name']}{channel_id}") | |
channel_input = input("Select channels by idx separated by comma or type 'all' for downloading all channels: ") | |
if channel_input == "all": | |
channel_idxs = list(range(len(channels))) | |
else: | |
channel_idxs = channel_input.replace(" ", "").split(",") | |
selected_channels = [channels[int(idx)] for idx in channel_idxs] | |
print("Selected channel(s):", ", ".join([channel["display_name"] for channel in selected_channels])) | |
return selected_channels | |
def export_channel(d: Driver, channel: str, user_id_to_name: Dict[str, str], output_base: str, | |
download_files: bool = True, before: str = None, after: str = None): | |
# Sanitize channel name | |
channel_name = channel["display_name"].replace("\\", "").replace("/", "") | |
print("Exporting channel", channel_name) | |
if after: | |
after = datetime.strptime(after, '%Y-%m-%d').timestamp() | |
if before: | |
before = datetime.strptime(before, '%Y-%m-%d').timestamp() | |
# Get all posts for selected channel | |
page = 0 | |
all_posts = [] | |
while True: | |
print(f"Requesting channel page {page}") | |
posts = d.posts.get_posts_for_channel(channel["id"], params={"per_page": 200, "page": page}) | |
if len(posts["posts"]) == 0: | |
# If no posts are returned, we have reached the end | |
break | |
all_posts.extend([posts["posts"][post] for post in posts["order"]]) | |
page += 1 | |
print(f"Found {len(all_posts)} posts") | |
# Create output directory | |
output_base = pathlib.Path(output_base) / channel_name | |
output_base.mkdir(parents=True, exist_ok=True) | |
# Simplify all posts to contain only username, date, message and files in chronological order | |
simple_posts = [] | |
for i_post, post in enumerate(reversed(all_posts)): | |
# Filter posts by date range | |
created = post["create_at"] / 1000 | |
if (before and created > before) or (after and created < after): | |
continue | |
user_id = post["user_id"] | |
if user_id not in user_id_to_name: | |
user_id_to_name[user_id] = d.users.get_user(user_id)["username"] | |
username = user_id_to_name[user_id] | |
created = datetime.utcfromtimestamp(post["create_at"] / 1000).strftime('%Y-%m-%dT%H:%M:%SZ') | |
message = post["message"] | |
simple_post = dict(idx=i_post, id=post["id"], created=created, username=username, message=message) | |
# If a code block is given in the message, dump it to file | |
if message.count("```") > 1: | |
start_pos = message.find("```") + 3 | |
end_pos = message.rfind("```") | |
cut = message[start_pos:end_pos] | |
if not len(cut): | |
print("Code has no length") | |
else: | |
filename = "%03d" % i_post + "_code.txt" | |
with open(output_base / filename, "wb") as f: | |
f.write(cut.encode()) | |
# If any files are attached to the message, download each | |
if "files" in post["metadata"]: | |
filenames = [] | |
for file in post["metadata"]["files"]: | |
if download_files: | |
filename = "%03d" % i_post + "_" + file["name"] | |
print("Downloading", file["name"]) | |
while True: | |
try: | |
resp = d.files.get_file(file["id"]) | |
break | |
except: | |
print("Downloading file failed") | |
# Mattermost Driver unfortunately parses json files to dicts | |
if isinstance(resp, dict): | |
with open(output_base / filename, "w") as f: | |
json.dump(resp, f) | |
else: | |
with open(output_base / filename, "wb") as f: | |
f.write(resp.content) | |
filenames.append(file["name"]) | |
simple_post["files"] = filenames | |
simple_posts.append(simple_post) | |
output = { | |
"channel": { | |
"name": channel["name"], | |
"display_name": channel["display_name"], | |
"header": channel["header"], | |
"id": channel["id"], | |
"team": d.teams.get_team(channel["team_id"])["name"], | |
"team_id": channel["team_id"], | |
"exported_at": datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ') | |
}, | |
"posts": simple_posts | |
} | |
# Export posts to json file | |
filtered_channel_name = ''.join(filter(lambda ch: ch not in "?!/\\.;:*\"<>|", channel_name)) | |
output_filename = filtered_channel_name + ".json" | |
output_filepath = output_base / output_filename | |
with open(output_filepath, "w", encoding='utf8') as f: | |
json.dump(output, f, indent=2, ensure_ascii=False) | |
print(f"Exported channel data to '{output_filepath}'") | |
def get_config_from_json(config_filename: str = "config.json") -> dict: | |
config_path = pathlib.Path(config_filename) | |
if not config_path.exists(): | |
return {} | |
with config_path.open() as f: | |
config = json.load(f) | |
return config | |
def complete_config(config: dict, config_filename: str = "config.json") -> dict: | |
config_changed = False | |
if config.get("host", False): | |
print(f"Using host '{config['host']}' from config") | |
else: | |
config["host"] = input("Please input host/server address (without https://): ") | |
config_changed = True | |
if config.get("login_mode", False): | |
print(f"Using login mode '{config['login_mode']}' from config") | |
else: | |
login_mode = "" | |
while login_mode not in ["password", "token"]: | |
login_mode = input("Please input login_mode 'password' or 'token' (=Gitlab Oauth): ") | |
config["login_mode"] = login_mode | |
config_changed = True | |
password = None | |
if config["login_mode"] == "password": | |
if config.get("username", False): | |
print(f"Using username '{config['username']}' from config") | |
else: | |
config["username"] = input("Please input your username: ") | |
config_changed = True | |
password = getpass.getpass("Enter your password (hidden): ") | |
else: | |
if config.get("token", False): | |
print(f"Using token '{config['token']}' from config") | |
else: | |
print("Are you logged-in into Mattermost using the Firefox Browser? " | |
"If so, token may be automatically extracted") | |
dec = "" | |
while not (dec == "y" or dec == "n"): | |
dec = input("Try to find token automatically? y/n: ") | |
token = None | |
if dec == "y": | |
token = find_mmauthtoken_firefox(config["host"]) | |
elif not token: | |
token = input("Please input your login token (MMAUTHTOKEN): ") | |
config["token"] = token | |
config_changed = True | |
if "download_files" in config: | |
print(f"Download files set to '{config['download_files']}' from config") | |
else: | |
dec = "" | |
while not (dec == "y" or dec == "n"): | |
dec = input("Should files be downloaded? y/n: ") | |
config["download_files"] = dec == "y" | |
config_changed = True | |
if config_changed: | |
dec = "" | |
while not (dec == "y" or dec == "n"): | |
dec = input("Config changed! Would you like to store your config (without password) to file? y/n: ") | |
if dec == "y": | |
with open(config_filename, "w") as f: | |
json.dump(config, f, indent=2) | |
print(f"Stored new config to {config_filename}") | |
config["password"] = password | |
return config | |
def find_mmauthtoken_firefox(host): | |
appdata_dir = pathlib.Path(os.environ["APPDATA"]) | |
profiles_dir = appdata_dir / "Mozilla/Firefox/Profiles" | |
cookie_files = profiles_dir.rglob("cookies.sqlite") | |
all_tokens = [] | |
for cookie_file in cookie_files: | |
print(f"Opening {cookie_file}") | |
connection = sqlite3.connect(str(cookie_file)) | |
cursor = connection.cursor() | |
rows = cursor.execute("SELECT host, value FROM moz_cookies WHERE name = 'MMAUTHTOKEN'").fetchall() | |
all_tokens.extend(rows) | |
all_tokens = [token for token in all_tokens if host in token[0]] | |
print(f"Found {len(all_tokens)} token for {host}") | |
for token in all_tokens: | |
print(f"{token[0]}: {token[1]}") | |
if len(all_tokens) > 1: | |
print("Using first token!") | |
if len(all_tokens): | |
return all_tokens[0][1] | |
else: | |
return None | |
if __name__ == '__main__': | |
config = get_config_from_json() | |
config = complete_config(config) | |
output_base = "results/" + date.today().strftime("%Y%m%d") | |
print(f"Storing downloaded data in {output_base}") | |
# Range of posts to be exported as string in format "YYYY-MM-DD". Use None if no filter should be applied | |
after = config.get("after", None) | |
before = config.get("before", None) | |
d = connect(config["host"], config.get("token", None), | |
config.get("username", None), config.get("password", None)) | |
user_id_to_name, my_user_id = get_users(d) | |
team = select_team(d, my_user_id) | |
channels = select_channel(d, team, my_user_id, user_id_to_name) | |
for i_channel, channel in enumerate(channels): | |
print(f"Start export of channel {i_channel + 1}/{len(channels)}") | |
export_channel(d, channel, user_id_to_name, output_base, config["download_files"], | |
before, after) | |
print("Finished export") |
mattermostdriver |
In the method select_channel
there is the linechannel["display_name"] = user_id_to_name[other_user_id]
. This fails if there is a direct conversation with a user that does not exist anymore. An ugly hack around that is importing exceptions
from mattermostdriver
and doing this instead:
try:
user_id_to_name[other_user_id] = d.users.get_user(other_user_id)["username"]
except exceptions.ResourceNotFound:
user_id_to_name[other_user_id] = other_user_id
For the same reason, we will also have to determine the team name differently when generating output
in export_channel
try:
team = d.teams.get_team(channel["team_id"])["name"]
except exceptions.ResourceNotFound:
team = channel["team_id"]
…
"team": team,
The comment in line 162 is a lie. If the json file encodes a list instead of an object, it will be converted to list
not dict
. Hence, there should be (dict, list)
instead of list
in the next line.
if isinstance(resp, (dict, list)):
(otherwise the script crashed when trying to download json-files with a list as the outer element)
hello
when channel is not related to a team, there is exception in this line 180
fixed with a additional test :
if channel["team_id"] == "":
team_name = ""
else:
team_name = d.teams.get_team(channel["team_id"])["name"]
output = {
"channel": {
"name": channel["name"],
"display_name": channel["display_name"],
"header": channel["header"],
"id": channel["id"],
"team": team_name,
"team_id": channel["team_id"],
"exported_at": datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ')
},
"posts": simple_posts
}
Thank you for providing this! Worked fine for me incl. using small fix from @GuillermoAndrade !
Can I use this in a project of mine, providing the link and recognition?
Thanks a lot!
Did someone manage to use this script with Server version 8
? (8.1.1
to be specific)
I always get a 403
response, regardless of whether I use token or password login.
8.1.2 - works with minor changes
basically https://gist.github.com/RobertKrajewski/5847ce49333062ea4be1a08f2913288c?permalink_comment_id=4542090#gistcomment-4542090 & https://gist.github.com/RobertKrajewski/5847ce49333062ea4be1a08f2913288c?permalink_comment_id=4194956#gistcomment-4194956
I addressed the team_id by adding it manually - as the channel is returned by querying a team.
diff --git a/mattermost-dl.py b/mattermost-dl.py
index ac03bb7..f45a408 100644
--- a/mattermost-dl.py
+++ b/mattermost-dl.py
@@ -4,7 +4,7 @@ from datetime import datetime, date
from typing import Tuple, Dict, List
import getpass
-from mattermostdriver import Driver
+from mattermostdriver import Driver, exceptions
import pathlib
import json
@@ -14,8 +14,9 @@ def connect(host: str, login_token: str = None, username: str = None, password:
"url": host,
"port": 443,
"token": login_token,
- "username": username,
- "password": password
+ "login_id": username,
+ "password": password,
+# "debug":True
})
d.login()
return d
@@ -61,6 +62,7 @@ def select_channel(d: Driver, team: str, my_user_id: str, user_id_to_name: Dict[
channels = d.channels.get_channels_for_user(my_user_id, team["id"])
# Add display name to direct messages
for channel in channels:
+ channel["team_id"] = team["id"]
if channel["type"] != "D":
continue
@@ -127,7 +129,11 @@ def export_channel(d: Driver, channel: str, user_id_to_name: Dict[str, str], out
user_id = post["user_id"]
if user_id not in user_id_to_name:
- user_id_to_name[user_id] = d.users.get_user(user_id)["username"]
+ try:
+ user_id_to_name[user_id] = d.users.get_user(user_id)["username"]
+ except exceptions.ResourceNotFound:
+ user_id_to_name[user_id] = user_id
+
username = user_id_to_name[user_id]
created = datetime.utcfromtimestamp(post["create_at"] / 1000).strftime('%Y-%m-%dT%H:%M:%SZ')
message = post["message"]
Thanks @commonism ! I could find out that my problem is not caused by this script. I have a strange issue on login. I can query the API perfectly fine on the CLI: curl -H "Authorization: Bearer $token" https://mattermost.mydomain.org:443/api/v4/users/me
. The driver library does exactly the same, but always returns a status code 403
.
Found it out: Somehow the reverse proxy (Apache) is configured to require the User-Agent
header! Painful experience …
I use use VS Code to navigate through the json.
Instead of the
token
in line 12, you have to use thepassword
key as shown here: https://pypi.org/project/mattermostdriver/