Skip to content

Instantly share code, notes, and snippets.

@dukechem
Last active August 20, 2024 06:19
Show Gist options
  • Save dukechem/f8f535d279b3d496444abaeb2b11e3e4 to your computer and use it in GitHub Desktop.
Save dukechem/f8f535d279b3d496444abaeb2b11e3e4 to your computer and use it in GitHub Desktop.
Download full macos installers with curl --limit-rate and can resume partial downloads
#!/usr/bin/python
# encoding: utf-8
#
# Copyright 2017 Greg Neagle.
# https://github.com/munki/macadmin-scripts/blob/main/installinstallmacos.py
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Thanks to Tim Sutton for ideas, suggestions, and sample code.
#
'''installinstallmacos.py
A tool to download the parts for an Install macOS app from Apple's
softwareupdate servers and install a functioning Install macOS app onto an
empty disk image'''
# Python 3 compatibility shims
from __future__ import (
absolute_import, division, print_function, unicode_literals)
import argparse
import gzip
import os
import plistlib
import subprocess
import sys
try:
# python 2
from urllib.parse import urlsplit
except ImportError:
# python 3
from urlparse import urlsplit
from xml.dom import minidom
from xml.parsers.expat import ExpatError
import xattr
DEFAULT_SUCATALOGS = {
'17': 'https://swscan.apple.com/content/catalogs/others/'
'index-10.13-10.12-10.11-10.10-10.9'
'-mountainlion-lion-snowleopard-leopard.merged-1.sucatalog',
'18': 'https://swscan.apple.com/content/catalogs/others/'
'index-10.14-10.13-10.12-10.11-10.10-10.9'
'-mountainlion-lion-snowleopard-leopard.merged-1.sucatalog',
'19': 'https://swscan.apple.com/content/catalogs/others/'
'index-10.15-10.14-10.13-10.12-10.11-10.10-10.9'
'-mountainlion-lion-snowleopard-leopard.merged-1.sucatalog',
'something': 'https://swscan.apple.com/content/catalogs/others/index-10.15seed-10.15-10.14-10.13-10.12-10.11-10.10-10.9-mountainlion-lion-snowleopard-leopard.merged-1.sucatalog'
}
SEED_CATALOGS_PLIST = (
'/System/Library/PrivateFrameworks/Seeding.framework/Versions/Current/'
'Resources/SeedCatalogs.plist'
)
def get_input(prompt=None):
'''Python 2 and 3 wrapper for raw_input/input'''
try:
return raw_input(prompt)
except NameError:
# raw_input doesn't exist in Python 3
return input(prompt)
def read_plist(filepath):
'''Wrapper for the differences between Python 2 and Python 3's plistlib'''
try:
with open(filepath, "rb") as fileobj:
return plistlib.load(fileobj)
except AttributeError:
# plistlib module doesn't have a load function (as in Python 2)
return plistlib.readPlist(filepath)
def read_plist_from_string(bytestring):
'''Wrapper for the differences between Python 2 and Python 3's plistlib'''
try:
return plistlib.loads(bytestring)
except AttributeError:
# plistlib module doesn't have a load function (as in Python 2)
return plistlib.readPlistFromString(bytestring)
def get_seeding_program(sucatalog_url):
'''Returns a seeding program name based on the sucatalog_url'''
try:
seed_catalogs = read_plist(SEED_CATALOGS_PLIST)
for key, value in seed_catalogs.items():
if sucatalog_url == value:
return key
return ''
except (OSError, IOError, ExpatError, AttributeError, KeyError) as err:
print(err, file=sys.stderr)
return ''
def get_seed_catalog(seedname='DeveloperSeed'):
'''Returns the developer seed sucatalog'''
try:
seed_catalogs = read_plist(SEED_CATALOGS_PLIST)
return seed_catalogs.get(seedname)
except (OSError, IOError, ExpatError, AttributeError, KeyError) as err:
print(err, file=sys.stderr)
return ''
def get_seeding_programs():
'''Returns the list of seeding program names'''
try:
seed_catalogs = read_plist(SEED_CATALOGS_PLIST)
return list(seed_catalogs.keys())
except (OSError, IOError, ExpatError, AttributeError, KeyError) as err:
print(err, file=sys.stderr)
return ''
def get_default_catalog():
'''Returns the default softwareupdate catalog for the current OS'''
# darwin_major = os.uname()[2].split('.')[0]
# return DEFAULT_SUCATALOGS.get(darwin_major)
return DEFAULT_SUCATALOGS.get('something')
def make_sparse_image(volume_name, output_path):
'''Make a sparse disk image we can install a product to'''
cmd = ['/usr/bin/hdiutil', 'create', '-size', '16g', '-fs', 'HFS+',
'-volname', volume_name, '-type', 'SPARSE', '-plist', output_path]
try:
output = subprocess.check_output(cmd)
except subprocess.CalledProcessError as err:
print(err, file=sys.stderr)
exit(-1)
try:
return read_plist_from_string(output)[0]
except IndexError as err:
print('Unexpected output from hdiutil: %s' % output, file=sys.stderr)
exit(-1)
except ExpatError as err:
print('Malformed output from hdiutil: %s' % output, file=sys.stderr)
print(err, file=sys.stderr)
exit(-1)
def make_compressed_dmg(app_path, diskimagepath):
"""Returns path to newly-created compressed r/o disk image containing
Install macOS.app"""
print('Making read-only compressed disk image containing %s...'
% os.path.basename(app_path))
cmd = ['/usr/bin/hdiutil', 'create', '-fs', 'HFS+',
'-srcfolder', app_path, diskimagepath]
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError as err:
print(err, file=sys.stderr)
else:
print('Disk image created at: %s' % diskimagepath)
def mountdmg(dmgpath):
"""
Attempts to mount the dmg at dmgpath and returns first mountpoint
"""
mountpoints = []
dmgname = os.path.basename(dmgpath)
cmd = ['/usr/bin/hdiutil', 'attach', dmgpath,
'-mountRandom', '/tmp', '-nobrowse', '-plist',
'-owners', 'on']
proc = subprocess.Popen(cmd, bufsize=-1,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(pliststr, err) = proc.communicate()
if proc.returncode:
print('Error: "%s" while mounting %s.' % (err, dmgname),
file=sys.stderr)
return None
if pliststr:
plist = read_plist_from_string(pliststr)
for entity in plist['system-entities']:
if 'mount-point' in entity:
mountpoints.append(entity['mount-point'])
return mountpoints[0]
def unmountdmg(mountpoint):
"""
Unmounts the dmg at mountpoint
"""
proc = subprocess.Popen(['/usr/bin/hdiutil', 'detach', mountpoint],
bufsize=-1, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
(dummy_output, err) = proc.communicate()
if proc.returncode:
print('Polite unmount failed: %s' % err, file=sys.stderr)
print('Attempting to force unmount %s' % mountpoint, file=sys.stderr)
# try forcing the unmount
retcode = subprocess.call(['/usr/bin/hdiutil', 'detach', mountpoint,
'-force'])
if retcode:
print('Failed to unmount %s' % mountpoint, file=sys.stderr)
def install_product(dist_path, target_vol):
'''Install a product to a target volume.
Returns a boolean to indicate success or failure.'''
# set CM_BUILD env var to make Installer bypass eligibilty checks
# when installing packages (for machine-specific OS builds)
os.environ["CM_BUILD"] = "CM_BUILD"
cmd = ['/usr/sbin/installer', '-pkg', dist_path, '-target', target_vol]
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError as err:
print(err, file=sys.stderr)
return False
else:
# Apple postinstall script bug ends up copying files to a path like
# /tmp/dmg.T9ak1HApplications
path = target_vol + 'Applications'
if os.path.exists(path):
print('*********************************************************')
print('*** Working around a very dumb Apple bug in a package ***')
print('*** postinstall script that fails to correctly target ***')
print('*** the Install macOS.app when installed to a volume ***')
print('*** other than the current boot volume. ***')
print('*** Please file feedback with Apple! ***')
print('*********************************************************')
subprocess.check_call(
['/usr/bin/ditto',
path,
os.path.join(target_vol, 'Applications')]
)
subprocess.check_call(['/bin/rm', '-r', path])
return True
class ReplicationError(Exception):
'''A custom error when replication fails'''
pass
def replicate_url(full_url,
root_dir='/tmp',
show_progress=False,
ignore_cache=False,
attempt_resume=False):
'''Downloads a URL and stores it in the same relative path on our
filesystem. Returns a path to the replicated file.'''
path = urlsplit(full_url)[2]
relative_url = path.lstrip('/')
relative_url = os.path.normpath(relative_url)
local_file_path = os.path.join(root_dir, relative_url)
if show_progress:
options = '-fL'
else:
options = '-fL'
curl_cmd = ['/usr/bin/curl', options, '--limit-rate', ' 377k', '--create-dirs',
'-o', local_file_path]
if not ignore_cache and os.path.exists(local_file_path):
curl_cmd.extend(['-z', local_file_path])
if attempt_resume:
curl_cmd.extend(['-C', '-'])
curl_cmd.append(full_url)
print("Downloading %s..." % full_url)
try:
subprocess.check_call(curl_cmd)
except subprocess.CalledProcessError as err:
raise ReplicationError(err)
return local_file_path
def parse_server_metadata(filename):
'''Parses a softwareupdate server metadata file, looking for information
of interest.
Returns a dictionary containing title, version, and description.'''
title = ''
vers = ''
try:
md_plist = read_plist(filename)
except (OSError, IOError, ExpatError) as err:
print('Error reading %s: %s' % (filename, err), file=sys.stderr)
return {}
vers = md_plist.get('CFBundleShortVersionString', '')
localization = md_plist.get('localization', {})
preferred_localization = (localization.get('English') or
localization.get('en'))
if preferred_localization:
title = preferred_localization.get('title', '')
metadata = {}
metadata['title'] = title
metadata['version'] = vers
return metadata
def get_server_metadata(catalog, product_key, workdir, ignore_cache=False):
'''Replicate ServerMetaData'''
try:
url = catalog['Products'][product_key]['ServerMetadataURL']
try:
smd_path = replicate_url(
url, root_dir=workdir, ignore_cache=ignore_cache)
return smd_path
except ReplicationError as err:
print('Could not replicate %s: %s' % (url, err), file=sys.stderr)
return None
except KeyError:
print('Malformed catalog.', file=sys.stderr)
return None
def parse_dist(filename):
'''Parses a softwareupdate dist file, returning a dict of info of
interest'''
dist_info = {}
try:
dom = minidom.parse(filename)
except ExpatError:
print('Invalid XML in %s' % filename, file=sys.stderr)
return dist_info
except IOError as err:
print('Error reading %s: %s' % (filename, err), file=sys.stderr)
return dist_info
auxinfos = dom.getElementsByTagName('auxinfo')
if not auxinfos:
return dist_info
auxinfo = auxinfos[0]
key = None
value = None
children = auxinfo.childNodes
# handle the possibility that keys from auxinfo may be nested
# within a 'dict' element
dict_nodes = [n for n in auxinfo.childNodes
if n.nodeType == n.ELEMENT_NODE and
n.tagName == 'dict']
if dict_nodes:
children = dict_nodes[0].childNodes
for node in children:
if node.nodeType == node.ELEMENT_NODE and node.tagName == 'key':
key = node.firstChild.wholeText
if node.nodeType == node.ELEMENT_NODE and node.tagName == 'string':
value = node.firstChild.wholeText
if key and value:
dist_info[key] = value
key = None
value = None
return dist_info
def download_and_parse_sucatalog(sucatalog, workdir, ignore_cache=False):
'''Downloads and returns a parsed softwareupdate catalog'''
try:
localcatalogpath = replicate_url(
sucatalog, root_dir=workdir, ignore_cache=ignore_cache)
except ReplicationError as err:
print('Could not replicate %s: %s' % (sucatalog, err), file=sys.stderr)
exit(-1)
if os.path.splitext(localcatalogpath)[1] == '.gz':
with gzip.open(localcatalogpath) as the_file:
content = the_file.read()
try:
catalog = read_plist_from_string(content)
return catalog
except ExpatError as err:
print('Error reading %s: %s' % (localcatalogpath, err),
file=sys.stderr)
exit(-1)
else:
try:
catalog = read_plist(localcatalogpath)
return catalog
except (OSError, IOError, ExpatError) as err:
print('Error reading %s: %s' % (localcatalogpath, err),
file=sys.stderr)
exit(-1)
def find_mac_os_installers(catalog):
'''Return a list of product identifiers for what appear to be macOS
installers'''
mac_os_installer_products = []
if 'Products' in catalog:
for product_key in catalog['Products'].keys():
product = catalog['Products'][product_key]
try:
if product['ExtendedMetaInfo'][
'InstallAssistantPackageIdentifiers'][
'OSInstall'] == 'com.apple.mpkg.OSInstall':
mac_os_installer_products.append(product_key)
except KeyError:
continue
return mac_os_installer_products
def os_installer_product_info(catalog, workdir, ignore_cache=False):
'''Returns a dict of info about products that look like macOS installers'''
product_info = {}
installer_products = find_mac_os_installers(catalog)
for product_key in installer_products:
product_info[product_key] = {}
filename = get_server_metadata(catalog, product_key, workdir)
if filename:
product_info[product_key] = parse_server_metadata(filename)
product = catalog['Products'][product_key]
product_info[product_key]['PostDate'] = product['PostDate']
distributions = product['Distributions']
dist_url = distributions.get('English') or distributions.get('en')
try:
dist_path = replicate_url(
dist_url, root_dir=workdir, ignore_cache=ignore_cache)
except ReplicationError as err:
print('Could not replicate %s: %s' % (dist_url, err),
file=sys.stderr)
else:
dist_info = parse_dist(dist_path)
product_info[product_key]['DistributionPath'] = dist_path
product_info[product_key].update(dist_info)
return product_info
def replicate_product(catalog, product_id, workdir, ignore_cache=False):
'''Downloads all the packages for a product'''
product = catalog['Products'][product_id]
for package in product.get('Packages', []):
# TO-DO: Check 'Size' attribute and make sure
# we have enough space on the target
# filesystem before attempting to download
if 'URL' in package:
try:
replicate_url(
package['URL'], root_dir=workdir,
show_progress=True, ignore_cache=ignore_cache,
attempt_resume=(not ignore_cache))
except ReplicationError as err:
print('Could not replicate %s: %s' % (package['URL'], err),
file=sys.stderr)
exit(-1)
if 'MetadataURL' in package:
try:
replicate_url(package['MetadataURL'], root_dir=workdir,
ignore_cache=ignore_cache)
except ReplicationError as err:
print('Could not replicate %s: %s'
% (package['MetadataURL'], err), file=sys.stderr)
exit(-1)
def find_installer_app(mountpoint):
'''Returns the path to the Install macOS app on the mountpoint'''
applications_dir = os.path.join(mountpoint, 'Applications')
for item in os.listdir(applications_dir):
if item.endswith('.app'):
return os.path.join(applications_dir, item)
return None
def main():
'''Do the main thing here'''
parser = argparse.ArgumentParser()
parser.add_argument('--seedprogram', default='',
help='Which Seed Program catalog to use. Valid values '
'are %s.' % ', '.join(get_seeding_programs()))
parser.add_argument('--catalogurl', default='',
help='Software Update catalog URL. This option '
'overrides any seedprogram option.')
parser.add_argument('--workdir', metavar='path_to_working_dir',
default='.',
help='Path to working directory on a volume with over '
'10G of available space. Defaults to current working '
'directory.')
parser.add_argument('--compress', action='store_true',
help='Output a read-only compressed disk image with '
'the Install macOS app at the root. This is now the '
'default. Use --raw to get a read-write sparse image '
'with the app in the Applications directory.')
parser.add_argument('--raw', action='store_true',
help='Output a read-write sparse image '
'with the app in the Applications directory. Requires '
'less available disk space and is faster.')
parser.add_argument('--ignore-cache', action='store_true',
help='Ignore any previously cached files.')
args = parser.parse_args()
# if os.getuid() != 0:
# sys.exit('This command requires root (to install packages), so please '
# 'run again with sudo or as root.')
if args.catalogurl:
su_catalog_url = args.catalogurl
elif args.seedprogram:
su_catalog_url = get_seed_catalog(args.seedprogram)
if not su_catalog_url:
print('Could not find a catalog url for seed program %s'
% args.seedprogram, file=sys.stderr)
print('Valid seeding programs are: %s'
% ', '.join(get_seeding_programs()), file=sys.stderr)
exit(-1)
else:
su_catalog_url = get_default_catalog()
if not su_catalog_url:
print('Could not find a default catalog url for this OS version.',
file=sys.stderr)
exit(-1)
# download sucatalog and look for products that are for macOS installers
catalog = download_and_parse_sucatalog(
su_catalog_url, args.workdir, ignore_cache=args.ignore_cache)
product_info = os_installer_product_info(
catalog, args.workdir, ignore_cache=args.ignore_cache)
if not product_info:
print('No macOS installer products found in the sucatalog.',
file=sys.stderr)
exit(-1)
# display a menu of choices (some seed catalogs have multiple installers)
print('%2s %12s %10s %8s %11s %s'
% ('#', 'ProductID', 'Version', 'Build', 'Post Date', 'Title'))
for index, product_id in enumerate(product_info):
print('%2s %12s %10s %8s %11s %s' % (
index + 1,
product_id,
product_info[product_id]['version'],
product_info[product_id]['BUILD'],
product_info[product_id]['PostDate'].strftime('%Y-%m-%d'),
product_info[product_id]['title']
))
answer = get_input(
'\nChoose a product to download (1-%s): ' % len(product_info))
try:
index = int(answer) - 1
if index < 0:
raise ValueError
product_id = list(product_info.keys())[index]
except (ValueError, IndexError):
print('Exiting.')
exit(0)
# download all the packages for the selected product
replicate_product(
catalog, product_id, args.workdir, ignore_cache=args.ignore_cache)
# generate a name for the sparseimage
volname = ('Install_macOS_%s-%s'
% (product_info[product_id]['version'],
product_info[product_id]['BUILD']))
sparse_diskimage_path = os.path.join(args.workdir, volname + '.sparseimage')
if os.path.exists(sparse_diskimage_path):
os.unlink(sparse_diskimage_path)
# make an empty sparseimage and mount it
print('Making empty sparseimage...')
sparse_diskimage_path = make_sparse_image(volname, sparse_diskimage_path)
mountpoint = mountdmg(sparse_diskimage_path)
if mountpoint:
# install the product to the mounted sparseimage volume
success = install_product(
product_info[product_id]['DistributionPath'],
mountpoint)
if not success:
print('Product installation failed.', file=sys.stderr)
unmountdmg(mountpoint)
exit(-1)
# add the seeding program xattr to the app if applicable
seeding_program = get_seeding_program(su_catalog_url)
if seeding_program:
installer_app = find_installer_app(mountpoint)
if installer_app:
print("Adding seeding program %s extended attribute to app"
% seeding_program)
xattr.setxattr(installer_app, 'SeedProgram', seeding_program)
print('Product downloaded and installed to %s' % sparse_diskimage_path)
if args.raw:
unmountdmg(mountpoint)
else:
# if --raw option not given, create a r/o compressed diskimage
# containing the Install macOS app
compressed_diskimagepath = os.path.join(
args.workdir, volname + '.dmg')
if os.path.exists(compressed_diskimagepath):
os.unlink(compressed_diskimagepath)
app_path = find_installer_app(mountpoint)
if app_path:
make_compressed_dmg(app_path, compressed_diskimagepath)
# unmount sparseimage
unmountdmg(mountpoint)
# delete sparseimage since we don't need it any longer
os.unlink(sparse_diskimage_path)
if __name__ == '__main__':
main()
@dukechem
Copy link
Author

dukechem commented Jun 19, 2020

Intended for home users to get huge installers very slowly without hogging bandwidth. Supposedly softwareupdate also throttles downloads based on machine usage, but this script always forces throttling.

This script can download on a 10.11 laptop, for example, the entire 6gig+ full mojave installer very slowly in background, via curl with a --limit-rate 377k throttling. Edit the value to suit your needs for example "3M" should finish 6gig download in about 6 hours . At the 377k/sec as is, it might take a 1 or 2 days for entire 6gigs per https://www.download-time.com/. Re-run the command if need be to resume downloads after laptop wakes up. Should probably be run from /tmp so a reboot would delete any partial/incomplete downloads. Just remember to move downloads out of tmp before a restart, because macos clears /private/tmp upon every startup.

This is a hack of munki//installinstallmacos.py script
My python skills are horrible, or I would try to fork the munki script, and add proper command line argument "--limit-rate" to pass to
curl, and fix the url issue. I may try making feature request in issues...

The munki script on 10.11 macbook-air died saying Could not find a default catalog url for this OS version. unless a complicated --catalogurl was passed.

This fetch-macOS.py would get 10.14 but had no rate-limit nor resume ability. so I used the "something" url from it. Only other tweaks: comment out the check if running as root, and added --limit-rate 377k to curl command it uses.

Quickstart

Save to fetch-macos-throttled-resume.py in your Downloads folder.
Optionally edit the --limit-rate value which is very slow in this example 377k

$   chmod a+x  ~/Downloadsfetch-macos-throttled-resume.py
$   cd /tmp;   ~/Downloads/fetch-macos-throttled-resume.py
...
 #    ProductID    Version    Build   Post Date  Title
 1    061-77704    10.15.4  19E242d  2020-02-26  macOS Catalina Beta
 2    001-15219    10.15.5  19F2200  2020-06-15  macOS Catalina
 3    001-04366    10.15.4  19E2269  2020-05-04  macOS Catalina
 4    041-91758    10.13.6    17G66  2019-10-19  macOS High Sierra
 5    041-88800    10.14.4  18E2034  2019-10-23  macOS Mojave
 6    061-26589    10.14.6   18G103  2019-10-14  macOS Mojave
 7    061-86291    10.15.3  19D2064  2020-03-23  macOS Catalina
 8    041-90855    10.13.5   17F66a  2019-10-23  Install macOS High Sierra Beta
 9    061-26578    10.14.5  18F2059  2019-10-14  macOS Mojave
10    061-44345    10.15.2   19C39d  2019-11-15  macOS Catalina Beta

Choose a product to download (1-10): 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment