Last active
May 5, 2024 07:52
-
-
Save marcelrf/4e45ef73e1a9d350c0170e1f78e58651 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# | |
# Commons Impact Metrics Downloader | |
# https://wikitech.wikimedia.org/wiki/Commons_Impact_Metrics | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
# Python Requirements: | |
# docopt | |
# pendulum | |
# bz2 | |
# | |
# Examples of use: | |
# | |
# # Download a given dataset for a specified time range. | |
# cim_downloader --dataset pageviews_by_category --start 2024-01 --end 2024-04 | |
# | |
# # Dowload several datasets for a given month. | |
# cim_downloader --dataset pageviews_by_category --dataset edits --start 2024-01 | |
# | |
# # Download all datasets filtering by your primary categories. | |
# cim_downloader --category Some_Category --category Other_Category --start 2024-01 | |
# | |
# # Download all datasets for a given time range and merge the monthly files into one. | |
# cim_downloader --merge --start 2024-01 --end 2024-04 | |
""" | |
Download, uncompress, filter and merge Commons Impact Metrics dumps. | |
https://wikitech.wikimedia.org/wiki/Commons_Impact_Metrics | |
Usage: | |
cim_downloader -h | |
cim_downloader [-d=<dataset>]... [-c=<category>]... [-m] [-t=<path>] [-o=<path>] [-v] -s=<date> [-e=<date>] | |
Options: | |
-h --help Show this help message and exit. | |
-d --dataset=<dataset> Download only these datasets. | |
Use one or more of: category_metrics_snapshot, | |
media_file_metrics_snapshot, pageviews_by_category, | |
pageviews_by_media_file, or edits. | |
If not specified, download all datasets. | |
-c --category=<category> Filter data to include these primary categories. | |
Use category names (URL format with underscores). | |
If not specified, includes all categories. | |
-m --merge Merge all files of a same dataset into one. | |
-t --temp-folder=<path> Use this temporary folder for downloads. | |
Removes temporary files once finished. | |
Default: /tmp/cim_downloader | |
-o --output-folder=<path> Write the output to this folder. | |
If not specified, write to the current folder. | |
-v --verbose Print more detailed logs. | |
-s --start=<date> Download data starting at this date (YYYY-MM). | |
Mandatory. | |
-e --end=<date> Download data up to this date, inclusive (YYYY-MM). | |
If not specified, download only the start month. | |
""" | |
from docopt import docopt | |
from os import getcwd, mkdir, path, remove | |
from shutil import rmtree | |
from urllib.request import urlretrieve | |
import bz2 | |
import logging | |
import pendulum | |
import re | |
import sys | |
import uuid | |
LOG_FORMAT = "%(asctime)s %(levelname)-6s %(message)s" | |
LOG_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S" | |
DEFAULT_TEMP_FOLDER = "/tmp/cim_downloader" | |
DUMPS_BASE_URL = "https://dumps.wikimedia.org/other/commons_impact_metrics" | |
# Lists the available datasets and specifies which | |
# column contains the primary categories for filtering. | |
DATASETS = { | |
"category_metrics_snapshot": {"primary_column": 2}, | |
"edits": {"primary_column": 4}, | |
"media_file_metrics_snapshot": {"primary_column": 3}, | |
"pageviews_by_media_file": {"primary_column": 2}, | |
"pageviews_by_category": {"primary_column": 2}, | |
} | |
def check_and_default_args(args): | |
""" | |
Checks the correctness of the argument values | |
and gives them defaults in case they are not specified. | |
""" | |
for dataset in args["--dataset"]: | |
if dataset not in DATASETS: | |
raise Exception(f"Invalid --dataset argument {dataset}.") | |
if not args["--dataset"]: | |
args["--dataset"] = DATASETS.keys() | |
if not args["--temp-folder"]: | |
args["--temp-folder"] = DEFAULT_TEMP_FOLDER | |
if not args["--output-folder"]: | |
args["--output-folder"] = getcwd() | |
year_month_re = r"[0-9]{4}-[0-9]{2}" | |
if not re.fullmatch(year_month_re, args["--start"]): | |
raise Exception(f"Invalid --start argument {args['--start']}") | |
if args["--end"]: | |
if not re.fullmatch(year_month_re, args["--end"]): | |
raise Exception(f"Invalid --end argument {args['--end']}") | |
else: | |
args["--end"] = args["--start"] | |
if args["--start"] > args["--end"]: | |
raise Exception("Argument --start is greater than argument --end.") | |
def get_logger(verbose): | |
""" | |
Sets up and returns a logger object. | |
""" | |
logger = logging.getLogger() | |
logger.setLevel(logging.DEBUG if args["--verbose"] else logging.INFO) | |
formatter = logging.Formatter(fmt=LOG_FORMAT, datefmt=LOG_DATE_FORMAT) | |
stdout_handler = logging.StreamHandler(sys.stdout) | |
stdout_handler.setFormatter(formatter) | |
logger.addHandler(stdout_handler) | |
return logger | |
def get_target_months(start, end): | |
""" | |
Returns the timestamps (YYYY-MM) of all the months contained | |
within the specified time range, end is included. | |
""" | |
current_dt = pendulum.parse(start) | |
end_dt = pendulum.parse(end) | |
target_dts = [current_dt] | |
while current_dt < end_dt: | |
current_dt = current_dt.add(months=1) | |
target_dts.append(current_dt) | |
target_months = [dt.format("YYYY-MM") for dt in target_dts] | |
return target_months | |
def main(args): | |
# Prepare execution. | |
check_and_default_args(args) | |
logger = get_logger(args["--verbose"]) | |
temp_folder = f"{args['--temp-folder']}/{uuid.uuid4().hex}" | |
mkdir(temp_folder) | |
target_months = get_target_months(args["--start"], args["--end"]) | |
# Iterate over datasets and months to download the data. | |
for dataset in args["--dataset"]: | |
# If merge is set, use the same output file for all dump files of the same dataset. | |
if args["--merge"]: | |
output_file_path = f"{args['--output-folder']}/commons_{dataset}.tsv" | |
# Use "w" file mode for the first dump file. | |
output_file_mode = "w" | |
for month in target_months: | |
logger.info(f"Downloading {dataset} for {month}...") | |
# Download the dump file. | |
dumps_url = f"{DUMPS_BASE_URL}/{dataset}/commons_{dataset}_{month}.tsv.bz2" | |
dumps_file_name = path.basename(dumps_url) | |
dumps_file_path = f"{temp_folder}/{dumps_file_name}" | |
logger.debug(f"Downloading {dumps_url} into {dumps_file_path}") | |
urlretrieve(dumps_url, dumps_file_path) | |
# If merge is not set, use a specific output file for this dump file. | |
if not args["--merge"]: | |
output_file_path = f"{args['--output-folder']}/{dumps_file_name[0:-4]}" | |
output_file_mode = "w" | |
# Extract, filter and merge the downloaded file into the output file. | |
logger.debug(f"Extracting and filtering {dumps_file_path} into {output_file_path}") | |
with bz2.open(dumps_file_path, "rt") as extracted_file: | |
with open(output_file_path, output_file_mode) as output_file: | |
line = extracted_file.readline() | |
while line: | |
# If categories are specified, filter. | |
if args["--category"]: | |
# Get the primary category column and values. | |
primary_column = DATASETS[dataset]["primary_column"] | |
primary_categories = line.strip().split("\t")[primary_column].split("|") | |
# Filter. | |
if any([c in primary_categories for c in args["--category"]]): | |
output_file.write(line) | |
else: | |
output_file.write(line) | |
line = extracted_file.readline() | |
# Remove the downloaded file asap to reduce disk usage. | |
remove(dumps_file_path) | |
# If merge is set, change the output file mode to "a" | |
# so that subsequent files of the same dataset are merged to it. | |
if args["--merge"]: | |
output_file_mode = "a" | |
# Cleanup. | |
rmtree(temp_folder) | |
if __name__ == "__main__": | |
args = docopt(__doc__) | |
main(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment