diff --git a/default.nix b/default.nix index 93f41c0..b8ce3c0 100644 --- a/default.nix +++ b/default.nix @@ -16,6 +16,9 @@ let propagatedBuildInputs = [pythonPackages.pyyaml]; }; isPy3 = pythonPackages.isPy3k or false; + + # Command to get the owner of a folder; different on linux vs darwin. + getOwner = if pkgs.stdenv.isLinux then "stat -c '%U'" else "stat -f '%Su'"; in pythonPackages.buildPythonPackage rec { @@ -28,22 +31,23 @@ pythonPackages.buildPythonPackage rec { pkgs.pv pkgs.which flask - requests2 + requests ipdb six datadiff rtyaml + python_magic ] ++ (if isPy3 then [] else [ pythonPackages.futures pythonPackages.backports_lzma pythonPackages.repoze_lru ]); checkPhase = '' - # HACK: try to detect this failure case at runtime - if ! nix-store -q --hash ${pkgs.nix} >/dev/null 2>&1; then - export NIX_REMOTE=daemon + if ${getOwner} ${pkgs.nix} >/dev/null 2>&1; then + echo "Skipping tests due to not working on root-owned nix store" + else + nosetests tests fi - nosetests tests ''; src = ./.; makeWrapperArgs = [ diff --git a/src/pynix/binary_cache/client.py b/src/pynix/binary_cache/client.py index a718b1b..9be80c2 100644 --- a/src/pynix/binary_cache/client.py +++ b/src/pynix/binary_cache/client.py @@ -6,17 +6,17 @@ import json import logging import os -from os.path import (join, exists, isdir, isfile, expanduser, basename, - getmtime) +from os.path import (join, isdir, isfile, expanduser, basename, + getmtime, splitext) import re import shutil from subprocess import (Popen, PIPE, check_output, CalledProcessError, - check_call) + check_call, call) import sys import tempfile from threading import Thread, RLock, BoundedSemaphore from six.moves.urllib_parse import urlparse -from concurrent.futures import ThreadPoolExecutor, Future, wait, as_completed +from concurrent.futures import ThreadPoolExecutor from multiprocessing import cpu_count import yaml if sys.version_info >= (3, 0): @@ -42,12 +42,13 @@ import six from pynix import __version__ -from pynix.utils import (strip_output, decode_str, NIX_STORE_PATH, +from pynix.utils import (strip_output, decode_str, NIX_STORE_PATH, Streamer, NIX_STATE_PATH, NIX_DB_PATH, nix_cmd, query_store, instantiate, tell_size, - is_path_in_store) + is_path_in_store, is_tarball) from pynix.exceptions import (CouldNotConnect, NixImportFailed, CliError, - ObjectNotBuilt, NixBuildError, NoSuchObject) + ObjectNotBuilt, NixBuildError, NoSuchObject, + NixExportFailed) from pynix.binary_cache.nix_info_caches import PathReferenceCache from pynix.narinfo import NarInfo from pynix.build import needed_to_build_multi, parse_deriv_paths @@ -61,11 +62,12 @@ # Limit of how many paths to show, so the screen doesn't flood. SHOW_PATHS_LIMIT = int(os.environ.get("SHOW_PATHS_LIMIT", 25)) + class NixCacheClient(object): """Wraps some state for sending store objects.""" def __init__(self, endpoint, dry_run=False, username=None, password=None, cache_location=None, cache_enabled=True, - max_jobs=cpu_count()): + max_jobs=cpu_count(), delete_invalid_paths=False): #: Server running servenix (string). self._endpoint = endpoint #: Base name of server (for caching). @@ -81,6 +83,8 @@ def __init__(self, endpoint, dry_run=False, username=None, self._username = None #: Ignored if username is None. self._password = password + #: If an invalid path is encountered, attempt to delete it. + self._delete_invalid_paths = delete_invalid_paths #: Set at a later point, if username is not None. self._auth = None #: Used to avoid unnecessary overhead in handshakes etc. @@ -309,7 +313,7 @@ def recur(_paths): to_send.add(path) return to_send - def _connect(self, first_time=True): + def _connect(self, first_time=True, attempts=5): """Connect to a binary cache. Serves two purposes: verifying that the client can @@ -324,6 +328,9 @@ def _connect(self, first_time=True): called, so that we can tailor the error messaging. :type first_time: ``bool`` + :param attempts: How many more times to try connecting + :type attempts: ``int`` + :return: Either None or a Session object. :rtype: ``NoneType`` or :py:class:`requests.sessions.Session` @@ -377,26 +384,34 @@ def _connect(self, first_time=True): self._auth = session.auth = auth self._session = session return self._session - elif resp.status_code == 401 and sys.stdin.isatty(): - # Authorization failed. Give the user a chance to set new auth. - msg = "\033[31mAuthorization failed!\033[0m\n" \ - if not first_time else "" - msg += "Please enter \033[1musername\033[0m" - msg += " for {}".format(self._endpoint) if first_time else "" - if self._username is not None: - msg += " (default '{}'): ".format(self._username) + elif resp.status_code == 401: + if attempts > 0: + time.sleep(2) + logging.info("Invalid response. Retrying...") + return self._connect(first_time=False, attempts=attempts-1) + elif sys.stdin.isatty(): + # Authorization failed. Give the user a chance to set new auth. + msg = "\033[31mAuthorization failed!\033[0m\n" \ + if not first_time else "" + msg += "Please enter \033[1musername\033[0m" + msg += " for {}".format(self._endpoint) if first_time else "" + if self._username is not None: + msg += " (default '{}'): ".format(self._username) + else: + msg += ": " + try: + username = six.moves.input(msg).strip() + if username != "": + self._username = username + os.environ.pop("NIX_BINARY_CACHE_PASSWORD", None) + self._password = None + except (KeyboardInterrupt, EOFError): + logging.info("\nBye!") + sys.exit() + return self._connect(first_time=False) else: - msg += ": " - try: - username = six.moves.input(msg) - if username != "": - self._username = username - os.environ.pop("NIX_BINARY_CACHE_PASSWORD", None) - self._password = None - except (KeyboardInterrupt, EOFError): - logging.info("\nBye!") - sys.exit() - return self._connect(first_time=False) + raise CouldNotConnect(self._endpoint, resp.status_code, + resp.content) else: raise CouldNotConnect(self._endpoint, resp.status_code, resp.content) @@ -424,21 +439,36 @@ def send_object(self, path, remaining_objects=None): # possible with current requests, or indeed possible in # general without knowing the file size. session = self._connect() - export = check_output(nix_cmd("nix-store", ["--export", path])) + proc = Popen(nix_cmd("nix-store", ["--export", path]), + stdout=PIPE, stderr=PIPE) + export, err = proc.communicate() + if proc.returncode != 0: + logging.error("Export of path {} failed.".format(path)) + deleted = False + if self._delete_invalid_paths is True: + logging.warn("Deleting {}. You can retry send afterwards." + .format(path)) + nix_store_args = ["--delete", path, "--ignore-liveness"] + result = call(nix_cmd("nix-store", nix_store_args)) + deleted = result == 0 + raise NixExportFailed(path, decode_str(err), deleted=deleted) # For large files, show progress when compressing if len(export) > 1000000: logging.info("Compressing {}".format(basename(path))) cmd = "pv -ptef -s {} | gzip".format(len(export)) proc = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE) data = proc.communicate(input=export)[0] + log_func = logging.info else: data = gzip.compress(export) + log_func = logging.debug url = "{}/import-path".format(self._endpoint) headers = {"Content-Type": "application/x-gzip"} + streamer = Streamer(path, data, log_func) try: logging.info("Sending {} ({} remaining)" .format(basename(path), len(remaining_objects))) - response = session.post(url, data=data, headers=headers) + response = session.post(url, data=streamer, headers=headers) response.raise_for_status() except requests.exceptions.HTTPError as err: try: @@ -495,7 +525,7 @@ def _have_fetched(self, path): """ if path in self._paths_fetched: return True - elif exists(path): + elif is_path_in_store(path): self._paths_fetched.add(path) return True else: @@ -562,11 +592,14 @@ def _fetch_ordered_paths(self, store_paths): logging.info("Finished fetching {}".format( tell_size(store_paths, "path"))) - def _fetch_single(self, path): + def _fetch_single(self, path, retries_remaining=3): """Fetch a single path.""" # Return if the path has already been fetched, or already exists. if self._have_fetched(path): return + elif retries_remaining < 0: + logging.error("Too many retries for path {}!".format(path)) + raise ObjectNotBuilt(path) # First ensure that all referenced paths have been fetched. for ref in self.get_references(path): self._finish_fetching(ref) @@ -592,7 +625,12 @@ def _fetch_single(self, path): .format(narinfo.compression)) # Once extracted, convert it into a nix export object and import. export = narinfo.nar_to_export(data) - imported_path = export.import_to_store() + try: + imported_path = export.import_to_store() + except NixImportFailed: + logging.warn("Couldn't import fetched object for " + path) + return self._fetch_single( + path, retries_remaining=(retries_remaining - 1)) self._register_as_fetched(path) def _register_as_fetched(self, path): @@ -623,11 +661,15 @@ def _finish_fetching(self, path): # Now that we have the future, wait for it to finish before returning. future.result() - def watch_store(self, ignore): + def watch_store(self, ignore, include_drvs=False, include_tarballs=False): """Watch the nix store's timestamp and sync whenever it changes. :param ignore: A list of regexes of objects to ignore when syncing. :type ignore: ``list`` of (``str`` or ``regex``) + :param include_drvs: Send derivation files to the repo. + :type include_drvs: ``bool`` + :param include_tarballs: Include tarballs. + :type include_tarballs: ``bool`` """ prev_stamp = None num_syncs = 0 @@ -645,7 +687,7 @@ def watch_store(self, ignore): logging.info("Store was modified at {}, syncing" .format(stamp.strftime("%H:%M:%S"))) try: - self.sync_store(ignore) + self.sync_store(ignore, include_drvs, include_tarballs) prev_stamp = stamp num_syncs += 1 except requests.exceptions.HTTPError as err: @@ -655,7 +697,7 @@ def watch_store(self, ignore): exit("Successfully syncronized with {} {} times." .format(self._endpoint, num_syncs)) - def sync_store(self, ignore): + def sync_store(self, ignore, include_drvs=False, include_tarballs=False): """Syncronize the local nix store to the endpoint. Reads all of the known paths in the nix SQLite database which @@ -664,23 +706,33 @@ def sync_store(self, ignore): :param ignore: A list of regexes of objects to ignore. :type ignore: ``list`` of (``str`` or ``regex``) + :param include_drvs: Include derivation files, normally not necessary. + :type include_drvs: ``bool`` + :param include_tarballs: Include tarballs. + :type include_tarballs: ``bool`` """ ignore = [re.compile(r) for r in ignore] paths = [] - with self._db_con: + with self._db_con as con: query = con.execute("SELECT path FROM ValidPaths") for result in query.fetchall(): path = result[0] - if any(ig.match(path) for ig in ignore): + if splitext(path)[1] == ".drv" and include_drvs is not True: + logging.debug("Skipping derivation {}".format(path)) + elif include_tarballs is not True and is_tarball(path): + logging.debug("Path {} appears to be a tarball, skipping" + .format(path)) + elif any(ig.match(path) for ig in ignore): logging.debug("Path {} matches an ignore regex, skipping" .format(path)) - continue - paths.append(path) + else: + paths.append(path) logging.info("Found {} paths in the store.".format(len(paths))) self.send_objects(paths) - def build_fetch(self, nix_file, attributes, verbose=False, show_trace=True, - keep_going=True, create_links=False, use_deriv_name=True): + def build_fetch(self, nix_file=None, attributes=None, nix_expr=None, + verbose=False, show_trace=True, keep_going=True, + create_links=False, use_deriv_name=True): """Given a nix file, instantiate the given attributes within the file, query the server for which files can be fetched, and then build/fetch everything. @@ -688,11 +740,8 @@ def build_fetch(self, nix_file, attributes, verbose=False, show_trace=True, :return: A dictionary mapping derivations to outputs that were built. :rtype: ``dict`` """ - logging.info("Instantiating attribute{} {} from path {}" - .format("s" if len(attributes) > 1 else "", - ", ".join(attributes), nix_file)) - deriv_paths = instantiate(nix_file, attributes=attributes, - show_trace=show_trace) + deriv_paths = instantiate(nix_file=nix_file, attributes=attributes, + nix_expr=nix_expr, show_trace=show_trace) derivs_to_outputs = parse_deriv_paths(deriv_paths) need_to_build, need_to_fetch = self.preview_build(deriv_paths) if self._dry_run is True: @@ -709,7 +758,6 @@ def build_fetch(self, nix_file, attributes, verbose=False, show_trace=True, fetch_order = self._compute_fetch_order(paths_to_fetch) # Perform the fetches. self._fetch_ordered_paths(fetch_order) - self._verify(need_to_fetch) # Build up the command for nix store to build the remaining paths. if len(need_to_build) > 0: args = ["--max-jobs", str(self._max_jobs), "--no-gc-warning", @@ -737,17 +785,6 @@ def _handle_build_failure(self, derivs_to_outputs): # TODO: report exactly which derivations succeeded/failed. raise NixBuildError() - def _verify(self, derivs_to_outputs): - """Given a derivation-output mapping, verify all paths.""" - logging.info("Verifying that we successfully created {}" - .format(tell_size(derivs_to_outputs, "store path"))) - for deriv, outputs in derivs_to_outputs.items(): - for output in outputs: - path = deriv.output_path(output) - logging.debug("Verifying path {}".format(basename(path))) - if not is_path_in_store(path, db_con=self._db_con): - raise ObjectNotBuilt(path) - def _create_symlinks(self, derivs_to_outputs, use_deriv_name): """Create symlinks to all built derivations. @@ -875,7 +912,9 @@ def _get_args(): build.add_argument("-P", "--path", default=os.getcwd(), help="Base path to evaluate.") build.add_argument("attributes", nargs="*", - help="Expressions to evaluate.") + help="Attributes to evaluate.") + build.add_argument("--nix-expr", "-E", + help="Nix expression to evaluate.") build.add_argument("-v", "--verbose", action="store_true", default=False, help="Show verbose output.") build.add_argument("--no-trace", action="store_false", dest="show_trace", @@ -911,9 +950,21 @@ def _get_args(): default=False, help="If true, reports which paths would " "be sent/fetched/built.") + subparser.add_argument("--log-to", default=None, + help="Log to this file (else stdout)") + subparser.add_argument("--log-format", help="Logging format.") + subparser.add_argument("--delete-invalid-paths", action="store_true", + default=False, + help="Attempt to delete invalid paths") for subparser in (sync, daemon): subparser.add_argument("--ignore", nargs="*", default=[], help="Regexes of store paths to ignore.") + subparser.add_argument("--include-drvs", action="store_true", + default=False, + help="Send .drv files to repo.") + subparser.add_argument("--include-tarballs", action="store_true", + default=False, + help="Send tarball files to repo.") # It doesn't make sense to have the daemon run in dry-run mode. subparser.set_defaults(dry_run=False) return parser.parse_args() @@ -926,29 +977,41 @@ def main(): elif ENDPOINT_REGEX.match(args.endpoint) is None: exit("Invalid endpoint: '{}' does not match '{}'." .format(args.endpoint, ENDPOINT_REGEX.pattern)) - log_level = getattr(logging, args.log_level.upper()) - logging.basicConfig(level=log_level, format="%(message)s") + logging.basicConfig( + level=getattr(logging, args.log_level.upper()), + format=args.log_format, + filename=args.log_to + ) # Hide noisy logging of some external libs for name in ("requests", "urllib", "urllib2", "urllib3"): logging.getLogger(name).setLevel(logging.WARNING) - max_jobs = 1 if args.one else args.max_jobs + max_jobs = 1 if getattr(args, "one", False) else args.max_jobs client = NixCacheClient(endpoint=args.endpoint, dry_run=args.dry_run, - username=args.username, max_jobs=max_jobs) + username=args.username, max_jobs=max_jobs, + delete_invalid_paths=args.delete_invalid_paths) try: if args.command == "send": + for path in args.paths: + if not is_path_in_store(path): + raise CliError("Path {} is not registered as a valid " + "path in the nix database.".format(path)) client.send_objects(args.paths) elif args.command == "sync": - client.sync_store(args.ignore) + client.sync_store(args.ignore, args.include_drvs, + args.include_tarballs) elif args.command == "daemon": - client.watch_store(args.ignore) + client.watch_store(args.ignore, args.include_drvs, + args.include_tarballs) elif args.command == "fetch": - wait(list(client.fetch_objects(args.paths).values())) + fetch_order = client._compute_fetch_order(args.paths) + client._fetch_ordered_paths(fetch_order) elif args.command == "build": keep_going = False if args.one else args.keep_going result_derivs = client.build_fetch( nix_file=args.path, attributes=args.attributes, - verbose=args.verbose, show_trace=args.show_trace, - keep_going=keep_going, create_links=args.create_links, + nix_expr=args.nix_expr, verbose=args.verbose, + show_trace=args.show_trace, keep_going=keep_going, + create_links=args.create_links, use_deriv_name=not args.generic_link_name) if args.dry_run is False and args.print_paths is True: for deriv, outputs in result_derivs.items(): diff --git a/src/pynix/build.py b/src/pynix/build.py index f9baff9..2f23cc6 100644 --- a/src/pynix/build.py +++ b/src/pynix/build.py @@ -1,10 +1,11 @@ """Build nix derivations.""" import json import sys -from os.path import exists, basename +from os.path import basename import requests from pynix.derivation import Derivation +from pynix.utils import is_path_in_store def needed_to_build(deriv, outputs=None, needed=None, need_fetch=None, existing=None, on_server=None): @@ -61,7 +62,7 @@ def needed_to_build(deriv, outputs=None, needed=None, need_fetch=None, # So then, we don't know if we need to build this derivation. # We can see by checking the outputs. for output in outputs: - if exists(deriv.output_mapping[output]): + if is_path_in_store(deriv.output_mapping[output]): if deriv not in existing: existing[deriv] = set() existing[deriv].add(output) diff --git a/src/pynix/exceptions.py b/src/pynix/exceptions.py index def71de..9cd0452 100644 --- a/src/pynix/exceptions.py +++ b/src/pynix/exceptions.py @@ -28,6 +28,9 @@ class CliError(Exception): """ EXIT_MESSAGE = None RETURN_CODE = 1 + def __init__(self, message=None, return_code=1): + self.EXIT_MESSAGE = message + self.RETURN_CODE = return_code def exit(self): _name = type(self).__name__ if self.EXIT_MESSAGE is not None: @@ -87,13 +90,26 @@ def __init__(self, err_message): BaseHTTPError.__init__(self, message=message) self.EXIT_MESSAGE = message +class NixExportFailed(NixOperationError, CliError): + """Raised when the nix-store --export command fails.""" + OPERATION = "nix-store --export" + def __init__(self, path, stderr, deleted=False): + message = "Couldn't export {}. Stderr:\n{}".format(path, stderr) + if deleted is True: + message += "\nThe path was deleted; you may retry." + self.EXIT_MESSAGE = message + + class NixInstantiationError(NixOperationError, CliError): """Raised when nix-instantiate fails.""" OPERATION = "nix-instantiate" - def __init__(self, nix_file, attributes): + def __init__(self, nix_file=None, attributes=None, nix_expr=None): self.nix_file = nix_file - self.attributes = attributes - if len(attributes) == 0: + self.nix_expr = nix_expr + self.attributes = attributes or [] + if nix_expr is not None: + message = "Couldn't evaluate expression {}".format(repr(nix_expr)) + elif len(attributes) == 0: message = "Couldn't evaluate file {}".format(nix_file) elif len(attributes) == 1: message = ("Couldn't evaluate attribute {} from file {}" diff --git a/src/pynix/narinfo.py b/src/pynix/narinfo.py index ac02a4b..ade857e 100644 --- a/src/pynix/narinfo.py +++ b/src/pynix/narinfo.py @@ -4,7 +4,7 @@ import os from os.path import join, basename, dirname import yaml -from subprocess import check_output, CalledProcessError +from subprocess import call, check_output, CalledProcessError from pynix.utils import decode_str, strip_output, nix_cmd, query_store from pynix.exceptions import NoNarGenerated, NixImportFailed @@ -291,6 +291,7 @@ def import_to_store(self): return strip_output(nix_cmd("nix-store", ["--import"]), input=self.to_bytes()) except CalledProcessError: + call(nix_cmd("nix-store", ["--delete", self.store_path])) raise NixImportFailed("See above stderr") def to_bytes(self): diff --git a/src/pynix/utils.py b/src/pynix/utils.py index a0c74c7..1185947 100644 --- a/src/pynix/utils.py +++ b/src/pynix/utils.py @@ -1,11 +1,20 @@ """Some utility functions to support store operations.""" import base64 +import sys +if sys.version_info >= (3, 0): + from io import BytesIO +else: + from StringIO import StringIO as BytesIO +import logging import os from os import getenv -from os.path import exists, join, dirname, isdir, realpath -from subprocess import check_output, PIPE, Popen, CalledProcessError +from os.path import exists, join, dirname, isdir, realpath, isfile, basename +import sqlite3 +from subprocess import call, check_output, PIPE, Popen, CalledProcessError +import time import six +import magic from pynix.exceptions import NixInstantiationError @@ -56,10 +65,16 @@ def strip_output(command, input=None, hide_stderr=False): # Load nix paths from environment if "NIX_BIN_PATH" in os.environ: NIX_BIN_PATH = os.environ["NIX_BIN_PATH"] + assert exists(join(NIX_BIN_PATH, "nix-build")), \ + "Couldn't determine a valid nix binary path. Set NIX_BIN_PATH" else: - NIX_BIN_PATH = dirname(realpath(strip_output("type -p nix-env"))) -assert exists(join(NIX_BIN_PATH, "nix-build")), \ - "Couldn't determine a valid nix binary path. Set NIX_BIN_PATH" + for bin_path in os.environ["PATH"].split(os.pathsep): + if isdir(bin_path) and "nix-env" in os.listdir(bin_path): + NIX_BIN_PATH = realpath(bin_path) + break + else: + raise RuntimeError("nix-env isn't in the PATH") + # The store path can be given explicitly, or else it will be # inferred to be 2 levels up from the bin path. E.g., if the # bin path is /foo/bar/123-nix/bin, the store directory will @@ -69,12 +84,48 @@ def strip_output(command, input=None, hide_stderr=False): "Nix store directory {} doesn't exist".format(NIX_STORE_PATH) # The state path can be given explicitly, or else it will be # inferred to be sibling to the store directory. -NIX_STATE_PATH = getenv("NIX_STATE_PATH", join(dirname(NIX_STORE_PATH), "var")) +NIX_STATE_PATH = getenv("NIX_STATE_PATH", + join(dirname(NIX_STORE_PATH), "var", "nix")) assert isdir(NIX_STATE_PATH), \ "Nix state directory {} doesn't exist".format(NIX_STATE_PATH) +# Nix reads this env variable; set it here +os.environ["NIX_STATE_DIR"] = NIX_STATE_PATH NIX_DB_PATH = getenv("NIX_DB_PATH", join(NIX_STATE_PATH, "nix/db/db.sqlite")) +# Nix also reads this variable... +os.environ["NIX_DB_DIR"] = dirname(NIX_DB_PATH) + +# This variable is true when we detect we're on a nixos linux. +if os.getenv("IS_NIXOS", "") != "": + IS_NIXOS = True +else: + result = call("nixos-version", shell=True, stdout=PIPE, stderr=PIPE) + IS_NIXOS = result == 0 or isdir("/etc/nixos") + +NIX_DB_ACCESSIBLE = None + +def connect_nix_db(): + """Attempt to connect to the nix DB, otherwise return None.""" + global NIX_DB_ACCESSIBLE + if NIX_DB_ACCESSIBLE is False: + return None + try: + connection = sqlite3.connect(NIX_DB_PATH) + if NIX_DB_ACCESSIBLE is None: + # Case: we don't know if the DB is accessible. Test it. + with connection: + query = connection.execute("select * from ValidPaths limit 1") + query.fetchall() + # Set to True so that we don't test unnecessarily later. + NIX_DB_ACCESSIBLE = True + if NIX_DB_ACCESSIBLE is True: + return connection + except Exception as e: + # An exception was raised trying to connect to the DB. + NIX_DB_ACCESSIBLE = False + return None + def nix_cmd(command_name, args=None): """Build a nix command, using the absolute path to the given nix binary. @@ -106,18 +157,32 @@ def query_store(store_path, query, hide_stderr=False): result = strip_output(command, hide_stderr=hide_stderr) return result -def instantiate(nix_file, attributes=None, show_trace=True): +def instantiate(nix_file=None, attributes=None, nix_expr=None, + show_trace=True): """Wraps a call to nix-instantiate.""" - attributes = [] if attributes is None else attributes - command = nix_cmd("nix-instantiate", [nix_file, "--no-gc-warning"]) + if nix_expr is not None: + command = nix_cmd("nix-instantiate", ["-E", nix_expr]) + logging.info("Instantiating nix expression {}" + .format(repr(nix_expr))) + elif nix_file is not None: + logging.info("Instantiating attribute{} {} from path {}" + .format("s" if len(attributes) > 1 else "", + ", ".join(attributes), nix_file)) + attributes = [] if attributes is None else attributes + command = nix_cmd("nix-instantiate", [nix_file]) + for attr in attributes: + command.extend(["-A", attr]) + else: + raise ValueError("Either an expression or a nix file must be given.") + command.append("--no-gc-warning") if show_trace is True: command.append("--show-trace") - for attr in attributes: - command.extend(["-A", attr]) try: return strip_output(command).split() except CalledProcessError as err: - six.raise_from(NixInstantiationError(nix_file, attributes), err) + six.raise_from(NixInstantiationError(nix_file=nix_file, + nix_expr=nix_expr, + attributes=attributes), err) def tell_size(obj, word, suffix="s"): """Useful when you want to write a message to the user. @@ -137,26 +202,103 @@ def tell_size(obj, word, suffix="s"): else: return "{} {}{}".format(len(obj), word, suffix) -def is_path_in_store(store_path, db_con=None): +def is_path_in_store(store_path, db_con=None, hide_stderr=True, + ignore_db_con=False): """Check if a path is in the nix store. Optionally provide a database connection which speeds things up. """ + db_con = db_con or connect_nix_db() + # Ensure path is absolute + store_path = join(NIX_STORE_PATH, store_path) # If we have a connection to the database, all we have to # do is look in the database. - if db_con is not None: + if db_con is not None and ignore_db_con is False: query = "select path from ValidPaths where path = ?" - with db_con: - results = db_con.execute(query, (store_path,)).fetchall() + try: + with db_con: + results = db_con.execute(query, (store_path,)).fetchall() + except sqlite3.OperationalError as err: + # This can happen under heavy disk load; if so fall back + # to querying with the nix-store executable. + logging.exception(err) + return is_path_in_store(store_path, + db_con=None, + hide_stderr=hide_stderr, + ignore_db_con=True) if len(results) > 0: return True else: + logging.debug("Tried to look up {} in the nix DB, not there." + .format(store_path)) return False else: # Otherwise we have to use the slower method :( Subprocess # into nix-store and execute a query. try: - query_store(store_path, "--hash", hide_stderr=True) + query_store(store_path, "--hash", hide_stderr=hide_stderr) return True except CalledProcessError: + logging.debug("Tried to use nix-store to query path {}, but " + "got an error".format(store_path)) return False + +# Mimetypes of tarball files +TARBALL_MIMETYPES = set(['application/x-gzip', 'application/x-xz', + 'application/x-bzip2', 'application/zip']) + + +def is_tarball(store_path): + """Return true if the path is a tarball, or a directory which only + contains a tarball. + :param store_path: A nix store path. + :type store_path: ``str`` + + :return: True if the store path appears to be a tarball. + :rtype: ``bool`` + """ + if isfile(store_path): + path = store_path + elif isdir(store_path) and len(os.listdir(store_path)) == 1: + path = join(store_path, os.listdir(store_path)[0]) + else: + return False + mimetype = decode_str(magic.from_file(path, mime=True)) + return mimetype in TARBALL_MIMETYPES + + +class Streamer(BytesIO): + """Wrapper around BytesIO which show progress of reads.""" + def __init__(self, path, data, log_func): + BytesIO.__init__(self, data) + self._streamed = 0 + self._len = len(data) + self._len_mb = len(data) / 1048576.0 + self._path = basename(path) + self._log_func = log_func + self._start_time = time.time() + self._last_percent_ten = None + self._last_print_time = self._start_time + + def read(self, *args, **kwargs): + """Read from the source, printing progress. + + Only prints if at least a half-second has elapsed. + """ + result = BytesIO.read(self, *args, **kwargs) + self._streamed += len(result) + bytes_per_sec = self._streamed / (time.time() - self._start_time) + percent = 100.0 * (float(self._streamed) / self._len) + percent_ten = int(percent) // 10 + if len(result) > 0: + if time.time() - self._last_print_time > 0.5: + streamed = self._streamed / 1048576.0 + self._log_func( + "{}: {:.2f}/{:.2f}MB ({:.2f}%), {:.2f} bytes/sec" + .format(self._path, streamed, self._len_mb, + percent, bytes_per_sec)) + self._last_print_time = time.time() + else: + self._log_func("{}: completed in {:2f} seconds" + .format(self._path, time.time() - self._start_time)) + return result