-
-
Save RobertKrajewski/5847ce49333062ea4be1a08f2913288c to your computer and use it in GitHub Desktop.
import os | |
import sqlite3 | |
from datetime import datetime, date | |
from typing import Tuple, Dict, List | |
import getpass | |
from mattermostdriver import Driver | |
import pathlib | |
import json | |
def connect(host: str, login_token: str = None, username: str = None, password: str = None) -> Driver: | |
d = Driver({ | |
"url": host, | |
"port": 443, | |
"token": login_token, | |
"username": username, | |
"password": password | |
}) | |
d.login() | |
return d | |
def get_users(d: Driver) -> Tuple[Dict[str, str], str]: | |
my_user = d.users.get_user("me") | |
my_username = my_user["username"] | |
my_user_id = my_user["id"] | |
print(f"Successfully logged in as {my_username} ({my_user_id})") | |
# Get all usernames as we want to use those instead of the user ids | |
user_id_to_name = {} | |
page = 0 | |
print("Downloading all user information... ", end="") | |
while True: | |
users_resp = d.users.get_users(params={"per_page": 200, "page": page}) | |
if len(users_resp) == 0: | |
break | |
for user in users_resp: | |
user_id_to_name[user["id"]] = user["username"] | |
page += 1 | |
print(f"Found {len(user_id_to_name)} users!") | |
return user_id_to_name, my_user_id | |
def select_team(d: Driver, my_user_id: str) -> str: | |
print("Downloading all team information... ", end="") | |
teams = d.teams.get_user_teams(my_user_id) | |
print(f"Found {len(teams)} teams!") | |
for i_team, team in enumerate(teams): | |
print(f"{i_team}\t{team['name']}\t({team['id']})") | |
team_idx = int(input("Select team by idx: ")) | |
team = teams[team_idx] | |
print(f"Selected team {team['name']}") | |
return team | |
def select_channel(d: Driver, team: str, my_user_id: str, user_id_to_name: Dict[str, str], | |
verbose: bool = False) -> List[str]: | |
print("Downloading all channel information... ", end="") | |
channels = d.channels.get_channels_for_user(my_user_id, team["id"]) | |
# Add display name to direct messages | |
for channel in channels: | |
if channel["type"] != "D": | |
continue | |
# The channel name consists of two user ids connected by a double underscore | |
user_ids = channel["name"].split("__") | |
other_user_id = user_ids[1] if user_ids[0] == my_user_id else user_ids[0] | |
channel["display_name"] = user_id_to_name[other_user_id] | |
# Sort channels by name for easier search | |
channels = sorted(channels, key=lambda x: x["display_name"].lower()) | |
print(f"Found {len(channels)} channels!") | |
for i_channel, channel in enumerate(channels): | |
if verbose: | |
channel_id = f"\t({channel['id']})" | |
else: | |
channel_id = "" | |
print(f"{i_channel}\t{channel['display_name']}{channel_id}") | |
channel_input = input("Select channels by idx separated by comma or type 'all' for downloading all channels: ") | |
if channel_input == "all": | |
channel_idxs = list(range(len(channels))) | |
else: | |
channel_idxs = channel_input.replace(" ", "").split(",") | |
selected_channels = [channels[int(idx)] for idx in channel_idxs] | |
print("Selected channel(s):", ", ".join([channel["display_name"] for channel in selected_channels])) | |
return selected_channels | |
def export_channel(d: Driver, channel: str, user_id_to_name: Dict[str, str], output_base: str, | |
download_files: bool = True, before: str = None, after: str = None): | |
# Sanitize channel name | |
channel_name = channel["display_name"].replace("\\", "").replace("/", "") | |
print("Exporting channel", channel_name) | |
if after: | |
after = datetime.strptime(after, '%Y-%m-%d').timestamp() | |
if before: | |
before = datetime.strptime(before, '%Y-%m-%d').timestamp() | |
# Get all posts for selected channel | |
page = 0 | |
all_posts = [] | |
while True: | |
print(f"Requesting channel page {page}") | |
posts = d.posts.get_posts_for_channel(channel["id"], params={"per_page": 200, "page": page}) | |
if len(posts["posts"]) == 0: | |
# If no posts are returned, we have reached the end | |
break | |
all_posts.extend([posts["posts"][post] for post in posts["order"]]) | |
page += 1 | |
print(f"Found {len(all_posts)} posts") | |
# Create output directory | |
output_base = pathlib.Path(output_base) / channel_name | |
output_base.mkdir(parents=True, exist_ok=True) | |
# Simplify all posts to contain only username, date, message and files in chronological order | |
simple_posts = [] | |
for i_post, post in enumerate(reversed(all_posts)): | |
# Filter posts by date range | |
created = post["create_at"] / 1000 | |
if (before and created > before) or (after and created < after): | |
continue | |
user_id = post["user_id"] | |
if user_id not in user_id_to_name: | |
user_id_to_name[user_id] = d.users.get_user(user_id)["username"] | |
username = user_id_to_name[user_id] | |
created = datetime.utcfromtimestamp(post["create_at"] / 1000).strftime('%Y-%m-%dT%H:%M:%SZ') | |
message = post["message"] | |
simple_post = dict(idx=i_post, id=post["id"], created=created, username=username, message=message) | |
# If a code block is given in the message, dump it to file | |
if message.count("```") > 1: | |
start_pos = message.find("```") + 3 | |
end_pos = message.rfind("```") | |
cut = message[start_pos:end_pos] | |
if not len(cut): | |
print("Code has no length") | |
else: | |
filename = "%03d" % i_post + "_code.txt" | |
with open(output_base / filename, "wb") as f: | |
f.write(cut.encode()) | |
# If any files are attached to the message, download each | |
if "files" in post["metadata"]: | |
filenames = [] | |
for file in post["metadata"]["files"]: | |
if download_files: | |
filename = "%03d" % i_post + "_" + file["name"] | |
print("Downloading", file["name"]) | |
while True: | |
try: | |
resp = d.files.get_file(file["id"]) | |
break | |
except: | |
print("Downloading file failed") | |
# Mattermost Driver unfortunately parses json files to dicts | |
if isinstance(resp, dict): | |
with open(output_base / filename, "w") as f: | |
json.dump(resp, f) | |
else: | |
with open(output_base / filename, "wb") as f: | |
f.write(resp.content) | |
filenames.append(file["name"]) | |
simple_post["files"] = filenames | |
simple_posts.append(simple_post) | |
output = { | |
"channel": { | |
"name": channel["name"], | |
"display_name": channel["display_name"], | |
"header": channel["header"], | |
"id": channel["id"], | |
"team": d.teams.get_team(channel["team_id"])["name"], | |
"team_id": channel["team_id"], | |
"exported_at": datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ') | |
}, | |
"posts": simple_posts | |
} | |
# Export posts to json file | |
filtered_channel_name = ''.join(filter(lambda ch: ch not in "?!/\\.;:*\"<>|", channel_name)) | |
output_filename = filtered_channel_name + ".json" | |
output_filepath = output_base / output_filename | |
with open(output_filepath, "w", encoding='utf8') as f: | |
json.dump(output, f, indent=2, ensure_ascii=False) | |
print(f"Exported channel data to '{output_filepath}'") | |
def get_config_from_json(config_filename: str = "config.json") -> dict: | |
config_path = pathlib.Path(config_filename) | |
if not config_path.exists(): | |
return {} | |
with config_path.open() as f: | |
config = json.load(f) | |
return config | |
def complete_config(config: dict, config_filename: str = "config.json") -> dict: | |
config_changed = False | |
if config.get("host", False): | |
print(f"Using host '{config['host']}' from config") | |
else: | |
config["host"] = input("Please input host/server address (without https://): ") | |
config_changed = True | |
if config.get("login_mode", False): | |
print(f"Using login mode '{config['login_mode']}' from config") | |
else: | |
login_mode = "" | |
while login_mode not in ["password", "token"]: | |
login_mode = input("Please input login_mode 'password' or 'token' (=Gitlab Oauth): ") | |
config["login_mode"] = login_mode | |
config_changed = True | |
password = None | |
if config["login_mode"] == "password": | |
if config.get("username", False): | |
print(f"Using username '{config['username']}' from config") | |
else: | |
config["username"] = input("Please input your username: ") | |
config_changed = True | |
password = getpass.getpass("Enter your password (hidden): ") | |
else: | |
if config.get("token", False): | |
print(f"Using token '{config['token']}' from config") | |
else: | |
print("Are you logged-in into Mattermost using the Firefox Browser? " | |
"If so, token may be automatically extracted") | |
dec = "" | |
while not (dec == "y" or dec == "n"): | |
dec = input("Try to find token automatically? y/n: ") | |
token = None | |
if dec == "y": | |
token = find_mmauthtoken_firefox(config["host"]) | |
elif not token: | |
token = input("Please input your login token (MMAUTHTOKEN): ") | |
config["token"] = token | |
config_changed = True | |
if "download_files" in config: | |
print(f"Download files set to '{config['download_files']}' from config") | |
else: | |
dec = "" | |
while not (dec == "y" or dec == "n"): | |
dec = input("Should files be downloaded? y/n: ") | |
config["download_files"] = dec == "y" | |
config_changed = True | |
if config_changed: | |
dec = "" | |
while not (dec == "y" or dec == "n"): | |
dec = input("Config changed! Would you like to store your config (without password) to file? y/n: ") | |
if dec == "y": | |
with open(config_filename, "w") as f: | |
json.dump(config, f, indent=2) | |
print(f"Stored new config to {config_filename}") | |
config["password"] = password | |
return config | |
def find_mmauthtoken_firefox(host): | |
appdata_dir = pathlib.Path(os.environ["APPDATA"]) | |
profiles_dir = appdata_dir / "Mozilla/Firefox/Profiles" | |
cookie_files = profiles_dir.rglob("cookies.sqlite") | |
all_tokens = [] | |
for cookie_file in cookie_files: | |
print(f"Opening {cookie_file}") | |
connection = sqlite3.connect(str(cookie_file)) | |
cursor = connection.cursor() | |
rows = cursor.execute("SELECT host, value FROM moz_cookies WHERE name = 'MMAUTHTOKEN'").fetchall() | |
all_tokens.extend(rows) | |
all_tokens = [token for token in all_tokens if host in token[0]] | |
print(f"Found {len(all_tokens)} token for {host}") | |
for token in all_tokens: | |
print(f"{token[0]}: {token[1]}") | |
if len(all_tokens) > 1: | |
print("Using first token!") | |
if len(all_tokens): | |
return all_tokens[0][1] | |
else: | |
return None | |
if __name__ == '__main__': | |
config = get_config_from_json() | |
config = complete_config(config) | |
output_base = "results/" + date.today().strftime("%Y%m%d") | |
print(f"Storing downloaded data in {output_base}") | |
# Range of posts to be exported as string in format "YYYY-MM-DD". Use None if no filter should be applied | |
after = config.get("after", None) | |
before = config.get("before", None) | |
d = connect(config["host"], config.get("token", None), | |
config.get("username", None), config.get("password", None)) | |
user_id_to_name, my_user_id = get_users(d) | |
team = select_team(d, my_user_id) | |
channels = select_channel(d, team, my_user_id, user_id_to_name) | |
for i_channel, channel in enumerate(channels): | |
print(f"Start export of channel {i_channel + 1}/{len(channels)}") | |
export_channel(d, channel, user_id_to_name, output_base, config["download_files"], | |
before, after) | |
print("Finished export") |
mattermostdriver |
Hi Robert, thanks for this cool tool! Do you have any idea how to make it work without gitlab?
Hi Robert - this is awesome! Do you have a recommended tool for working with the downloaded json and files?
Thanks!
Tom.
I use use VS Code to navigate through the json.
Hi Robert, thanks for this cool tool! Do you have any idea how to make it work without gitlab?
Instead of the token
in line 12, you have to use the password
key as shown here: https://pypi.org/project/mattermostdriver/
In the method select_channel
there is the linechannel["display_name"] = user_id_to_name[other_user_id]
. This fails if there is a direct conversation with a user that does not exist anymore. An ugly hack around that is importing exceptions
from mattermostdriver
and doing this instead:
try:
user_id_to_name[other_user_id] = d.users.get_user(other_user_id)["username"]
except exceptions.ResourceNotFound:
user_id_to_name[other_user_id] = other_user_id
For the same reason, we will also have to determine the team name differently when generating output
in export_channel
try:
team = d.teams.get_team(channel["team_id"])["name"]
except exceptions.ResourceNotFound:
team = channel["team_id"]
…
"team": team,
The comment in line 162 is a lie. If the json file encodes a list instead of an object, it will be converted to list
not dict
. Hence, there should be (dict, list)
instead of list
in the next line.
if isinstance(resp, (dict, list)):
(otherwise the script crashed when trying to download json-files with a list as the outer element)
hello
when channel is not related to a team, there is exception in this line 180
fixed with a additional test :
if channel["team_id"] == "":
team_name = ""
else:
team_name = d.teams.get_team(channel["team_id"])["name"]
output = {
"channel": {
"name": channel["name"],
"display_name": channel["display_name"],
"header": channel["header"],
"id": channel["id"],
"team": team_name,
"team_id": channel["team_id"],
"exported_at": datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ')
},
"posts": simple_posts
}
Thank you for providing this! Worked fine for me incl. using small fix from @GuillermoAndrade !
Can I use this in a project of mine, providing the link and recognition?
Thanks a lot!
Did someone manage to use this script with Server version 8
? (8.1.1
to be specific)
I always get a 403
response, regardless of whether I use token or password login.
8.1.2 - works with minor changes
basically https://gist.github.com/RobertKrajewski/5847ce49333062ea4be1a08f2913288c?permalink_comment_id=4542090#gistcomment-4542090 & https://gist.github.com/RobertKrajewski/5847ce49333062ea4be1a08f2913288c?permalink_comment_id=4194956#gistcomment-4194956
I addressed the team_id by adding it manually - as the channel is returned by querying a team.
diff --git a/mattermost-dl.py b/mattermost-dl.py
index ac03bb7..f45a408 100644
--- a/mattermost-dl.py
+++ b/mattermost-dl.py
@@ -4,7 +4,7 @@ from datetime import datetime, date
from typing import Tuple, Dict, List
import getpass
-from mattermostdriver import Driver
+from mattermostdriver import Driver, exceptions
import pathlib
import json
@@ -14,8 +14,9 @@ def connect(host: str, login_token: str = None, username: str = None, password:
"url": host,
"port": 443,
"token": login_token,
- "username": username,
- "password": password
+ "login_id": username,
+ "password": password,
+# "debug":True
})
d.login()
return d
@@ -61,6 +62,7 @@ def select_channel(d: Driver, team: str, my_user_id: str, user_id_to_name: Dict[
channels = d.channels.get_channels_for_user(my_user_id, team["id"])
# Add display name to direct messages
for channel in channels:
+ channel["team_id"] = team["id"]
if channel["type"] != "D":
continue
@@ -127,7 +129,11 @@ def export_channel(d: Driver, channel: str, user_id_to_name: Dict[str, str], out
user_id = post["user_id"]
if user_id not in user_id_to_name:
- user_id_to_name[user_id] = d.users.get_user(user_id)["username"]
+ try:
+ user_id_to_name[user_id] = d.users.get_user(user_id)["username"]
+ except exceptions.ResourceNotFound:
+ user_id_to_name[user_id] = user_id
+
username = user_id_to_name[user_id]
created = datetime.utcfromtimestamp(post["create_at"] / 1000).strftime('%Y-%m-%dT%H:%M:%SZ')
message = post["message"]
Thanks @commonism ! I could find out that my problem is not caused by this script. I have a strange issue on login. I can query the API perfectly fine on the CLI: curl -H "Authorization: Bearer $token" https://mattermost.mydomain.org:443/api/v4/users/me
. The driver library does exactly the same, but always returns a status code 403
.
Found it out: Somehow the reverse proxy (Apache) is configured to require the User-Agent
header! Painful experience …
Hi Robert - this is awesome! Do you have a recommended tool for working with the downloaded json and files?
Thanks!
Tom.