Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 135 additions & 50 deletions brigadier
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#!/usr/bin/env python

from __future__ import print_function

import os
import sys
import subprocess
import urllib2
import plistlib
import re
import tempfile
Expand All @@ -13,17 +14,43 @@ import datetime
import platform
import requests

from pprint import pprint
from urllib import urlretrieve
from xml.dom import minidom

try:
from urllib.request import urlopen, urlretrieve
except ImportError:
from urllib2 import urlopen
from urllib import urlretrieve

VERSION = '0.2.6'
SUCATALOG_URL = 'https://swscan.apple.com/content/catalogs/others/index-11-10.15-10.14-10.13-10.12-10.11-10.10-10.9-mountainlion-lion-snowleopard-leopard.merged-1.sucatalog'
# 7-Zip MSI (15.14)
SEVENZIP_URL = 'https://www.7-zip.org/a/7z2201-x64.msi'

def status(msg):
print "%s\n" % msg
print("%s\n" % msg)

def plistLoads(data):
if hasattr(plistlib, 'loads'):
return plistlib.loads(data)
return plistlib.readPlistFromString(data)

def plistLoad(path):
if hasattr(plistlib, 'load'):
with open(path, 'rb') as plistfd:
return plistlib.load(plistfd)
return plistlib.readPlist(path)

def firstKey(item):
return next(iter(item))

def firstValue(item):
return next(iter(item.values()))

def ensureText(data, encoding='utf-8'):
if isinstance(data, bytes):
return data.decode(encoding)
return data

def getCommandOutput(cmd):
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
Expand All @@ -42,7 +69,7 @@ def getMachineModel():
model = nodes[0].data
elif platform.system() == 'Darwin':
plistxml = getCommandOutput(['system_profiler', 'SPHardwareDataType', '-xml'])
plist = plistlib.readPlistFromString(plistxml)
plist = plistLoads(plistxml)
model = plist[0]['_items'][0]['machine_model']
return model

Expand All @@ -61,19 +88,34 @@ def downloadFile(url, filename, use_requests=False):
else: # total size is unknown
sys.stderr.write("read %d\n" % (readsofar,))

if use_requests:
if use_requests or sys.version_info[0] >= 3:
resp = requests.get(url, stream=True)
resp.raise_for_status()
total_size = int(resp.headers.get('content-length', 0))
readsofar = 0
with open(filename, 'wb') as fd:
for chunk in resp.iter_content(chunk_size=1024):
for chunk in resp.iter_content(chunk_size=1024 * 1024):
if not chunk:
continue
fd.write(chunk)
readsofar += len(chunk)
if total_size > 0 and sys.stderr.isatty():
percent = readsofar * 1e2 / total_size
console_out = "\r%5.1f%% %*d / %d bytes" % (
percent, len(str(total_size)), readsofar, total_size)
sys.stderr.write(console_out)
if total_size > 0 and sys.stderr.isatty():
sys.stderr.write("\n")
return

# urlretrieve method will likely just go away soon, and we'll use only requests
# with some kind of report/progress output
urlretrieve(url, filename, reporthook=reporthook)

def sevenzipExtract(arcfile, command='e', out_dir=None):
cmd = [os.path.join(os.environ['SYSTEMDRIVE'] + "\\", "Program Files", "7-Zip", "7z.exe")]
def sevenzipExtract(arcfile, command='e', out_dir=None, sevenzip_binary=None):
if not sevenzip_binary:
sevenzip_binary = os.path.join(os.environ['SYSTEMDRIVE'] + "\\", "Program Files", "7-Zip", "7z.exe")
cmd = [sevenzip_binary]
cmd.append(command)
if not out_dir:
out_dir = os.path.dirname(arcfile)
Expand All @@ -85,6 +127,66 @@ def sevenzipExtract(arcfile, command='e', out_dir=None):
if retcode:
sys.exit("Command failure: %s exited %s." % (' '.join(cmd), retcode))

def findFile(search_dir, filename):
for root, dirs, files in os.walk(search_dir):
if filename in files:
return os.path.join(root, filename)
return None

def clearDirectoryContentsWindows(path):
cmd = [
'powershell', '-NoProfile', '-ExecutionPolicy', 'Bypass',
'-Command',
'if (Test-Path -LiteralPath $args[0]) { '
'Get-ChildItem -LiteralPath $args[0] -Force -ErrorAction SilentlyContinue | '
'Remove-Item -Recurse -Force -ErrorAction SilentlyContinue }',
path
]
return subprocess.call(cmd)

def prepareLandingDir(landing_dir):
if os.path.exists(landing_dir):
status("Final output path %s already exists, removing it..." % landing_dir)
if platform.system() == 'Windows':
# using rmdir /qs because shutil.rmtree dies on the Doc files with foreign language characters
subprocess.call(['cmd', '/c', 'rmdir', '/q', '/s', landing_dir])
if os.path.exists(landing_dir):
status("Final output path %s is still in use, clearing existing contents before reuse." % landing_dir)
clearDirectoryContentsWindows(landing_dir)
status("Final output path %s is still in use, reusing the existing directory." % landing_dir)
return
else:
shutil.rmtree(landing_dir)

status("Making directory %s.." % landing_dir)
os.mkdir(landing_dir)

def resolveSevenZipBinary():
sevenzip_binary = os.path.join(os.environ['SYSTEMDRIVE'] + "\\", 'Program Files', '7-Zip', '7z.exe')
if os.path.exists(sevenzip_binary):
return sevenzip_binary, False, None, None

tempdir = tempfile.mkdtemp()
sevenzip_msi_dl_path = os.path.join(tempdir, SEVENZIP_URL.split('/')[-1])
downloadFile(SEVENZIP_URL, sevenzip_msi_dl_path, use_requests=True)
status("Downloaded 7-zip to %s." % sevenzip_msi_dl_path)
status("We need to install 7-Zip..")
retcode = subprocess.call(['msiexec', '/qn', '/i', sevenzip_msi_dl_path])
status("7-Zip install returned exit code %s." % retcode)

if os.path.exists(sevenzip_binary):
return sevenzip_binary, True, tempdir, sevenzip_msi_dl_path

status("Falling back to a local 7-Zip extraction because the MSI install did not make 7z.exe available.")
portable_dir = os.path.join(tempdir, 'portable')
os.mkdir(portable_dir)
retcode = subprocess.call(['msiexec', '/a', sevenzip_msi_dl_path, '/qn', 'TARGETDIR=%s' % portable_dir])
status("7-Zip administrative extract returned exit code %s." % retcode)
portable_binary = findFile(portable_dir, '7z.exe')
if not portable_binary:
sys.exit("Unable to locate 7z.exe after extracting %s." % sevenzip_msi_dl_path)
return portable_binary, False, tempdir, sevenzip_msi_dl_path

def postInstallConfig():
regdata = """Windows Registry Editor Version 5.00

Expand Down Expand Up @@ -116,9 +218,9 @@ def installBootcamp(msipath):
status("Executing command: '%s'" % " ".join(cmd))
subprocess.call(cmd)
status("Install log output:")
with open(logpath, 'r') as logfd:
with open(logpath, 'rb') as logfd:
logdata = logfd.read()
print logdata.decode('utf-16')
print(logdata.decode('utf-16'))
postInstallConfig()

def main():
Expand Down Expand Up @@ -147,7 +249,7 @@ according to the post date.")

opts, args = o.parse_args()
if opts.version:
print VERSION
print(VERSION)
sys.exit(0)

if opts.install:
Expand Down Expand Up @@ -191,7 +293,7 @@ when running the installer out of 'system32'." % output_dir)
plist_path = os.path.join(scriptdir, 'brigadier.plist')
if os.path.isfile(plist_path):
try:
config_plist = plistlib.readPlist(plist_path)
config_plist = plistLoad(plist_path)
except:
status("Config plist was found at %s but it could not be read. \
Verify that it is readable and is an XML formatted plist." % plist_path)
Expand All @@ -200,9 +302,9 @@ when running the installer out of 'system32'." % output_dir)
sucatalog_url = config_plist['CatalogURL']


urlfd = urllib2.urlopen(sucatalog_url)
urlfd = urlopen(sucatalog_url)
data = urlfd.read()
p = plistlib.readPlistFromString(data)
p = plistLoads(data)
allprods = p['Products']

# Get all Boot Camp ESD products
Expand All @@ -218,8 +320,8 @@ when running the installer out of 'system32'." % output_dir)
for bc_prod in bc_prods:
if 'English' in bc_prod[1]['Distributions'].keys():
disturl = bc_prod[1]['Distributions']['English']
distfd = urllib2.urlopen(disturl)
dist_data = distfd.read()
distfd = urlopen(disturl)
dist_data = ensureText(distfd.read())
if re.search(model, dist_data):
supported_models = []
pkg_data.append({bc_prod[0]: bc_prod[1]})
Expand All @@ -241,48 +343,39 @@ when running the installer out of 'system32'." % output_dir)
# sys.exit("There is more than one ESD product available for this model: %s. "
# "Automically selecting the one with the most recent PostDate.."
# % ", ".join([p.keys()[0] for p in pkg_data]))
print "There is more than one ESD product available for this model:"
print("There is more than one ESD product available for this model:")
# Init latest to be epoch start
latest_date = datetime.datetime.fromtimestamp(0)
chosen_product = None
for i, p in enumerate(pkg_data):
product = p.keys()[0]
product = firstKey(p)
postdate = p[product].get('PostDate')
print "%s: PostDate %s" % (product, postdate)
print("%s: PostDate %s" % (product, postdate))
if postdate > latest_date:
latest_date = postdate
chosen_product = product

if opts.product_id:
if opts.product_id not in [k.keys()[0] for k in pkg_data]:
if opts.product_id not in [firstKey(k) for k in pkg_data]:
sys.exit("Product specified with '--product-id %s' either doesn't exist "
"or was not found applicable to models: %s"
% (opts.product_id, ", ".join(models)))
chosen_product = opts.product_id
print "Selecting manually-chosen product %s." % chosen_product
print("Selecting manually-chosen product %s." % chosen_product)
else:
print "Selecting %s as it's the most recently posted." % chosen_product
print("Selecting %s as it's the most recently posted." % chosen_product)

for p in pkg_data:
if p.keys()[0] == chosen_product:
if firstKey(p) == chosen_product:
selected_pkg = p
pkg_data = selected_pkg

pkg_id = pkg_data.keys()[0]
pkg_url = pkg_data.values()[0]['Packages'][0]['URL']
pkg_id = firstKey(pkg_data)
pkg_url = firstValue(pkg_data)['Packages'][0]['URL']

# make a sub-dir in the output_dir here, named by product
landing_dir = os.path.join(output_dir, 'BootCamp-' + pkg_id)
if os.path.exists(landing_dir):
status("Final output path %s already exists, removing it..." % landing_dir)
if platform.system() == 'Windows':
# using rmdir /qs because shutil.rmtree dies on the Doc files with foreign language characters
subprocess.call(['cmd', '/c', 'rmdir', '/q', '/s', landing_dir])
else:
shutil.rmtree(landing_dir)

status("Making directory %s.." % landing_dir)
os.mkdir(landing_dir)
prepareLandingDir(landing_dir)

arc_workdir = tempfile.mkdtemp(prefix="bootcamp-unpack_")
pkg_dl_path = os.path.join(arc_workdir, pkg_url.split('/')[-1])
Expand All @@ -291,33 +384,25 @@ when running the installer out of 'system32'." % output_dir)
downloadFile(pkg_url, pkg_dl_path)

if platform.system() == 'Windows':
we_installed_7zip = False
sevenzip_binary = os.path.join(os.environ['SYSTEMDRIVE'] + "\\", 'Program Files', '7-Zip', '7z.exe')
# fetch and install 7-Zip
if not os.path.exists(sevenzip_binary):
tempdir = tempfile.mkdtemp()
sevenzip_msi_dl_path = os.path.join(tempdir, SEVENZIP_URL.split('/')[-1])
downloadFile(SEVENZIP_URL, sevenzip_msi_dl_path, use_requests=True)
status("Downloaded 7-zip to %s." % sevenzip_msi_dl_path)
status("We need to install 7-Zip..")
retcode = subprocess.call(['msiexec', '/qn', '/i', sevenzip_msi_dl_path])
status("7-Zip install returned exit code %s." % retcode)
we_installed_7zip = True
sevenzip_binary, we_installed_7zip, sevenzip_tempdir, sevenzip_msi_dl_path = resolveSevenZipBinary()

status("Extracting...")
# BootCamp.pkg (xar) -> Payload (gzip) -> Payload~ (cpio) -> WindowsSupport.dmg
for arc in [pkg_dl_path,
os.path.join(arc_workdir, 'Payload'),
os.path.join(arc_workdir, 'Payload~')]:
if os.path.exists(arc):
sevenzipExtract(arc)
sevenzipExtract(arc, sevenzip_binary=sevenzip_binary)
# finally, 7-Zip also extracts the tree within the DMG to the output dir
sevenzipExtract(os.path.join(arc_workdir, 'WindowsSupport.dmg'),
command='x',
out_dir=landing_dir)
out_dir=landing_dir,
sevenzip_binary=sevenzip_binary)
if we_installed_7zip:
status("Cleaning up the 7-Zip install...")
subprocess.call(['cmd', '/c', 'msiexec', '/qn', '/x', sevenzip_msi_dl_path])
if sevenzip_tempdir:
subprocess.call(['cmd', '/c', 'rmdir', '/q', '/s', sevenzip_tempdir])
if opts.install:
status("Installing Boot Camp...")
installBootcamp(findBootcampMSI(landing_dir))
Expand Down
Loading