From a8f88dd4cf57c36f80f2ddcfcd22f86de941f77d Mon Sep 17 00:00:00 2001 From: Allen Nelson Date: Thu, 20 Apr 2017 18:07:11 +0000 Subject: [PATCH 01/16] create global db connection, use DB existence rather than just path existence --- src/pynix/build.py | 5 +++-- src/pynix/utils.py | 22 ++++++++++++++++++++-- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/src/pynix/build.py b/src/pynix/build.py index f9baff9..2f23cc6 100644 --- a/src/pynix/build.py +++ b/src/pynix/build.py @@ -1,10 +1,11 @@ """Build nix derivations.""" import json import sys -from os.path import exists, basename +from os.path import basename import requests from pynix.derivation import Derivation +from pynix.utils import is_path_in_store def needed_to_build(deriv, outputs=None, needed=None, need_fetch=None, existing=None, on_server=None): @@ -61,7 +62,7 @@ def needed_to_build(deriv, outputs=None, needed=None, need_fetch=None, # So then, we don't know if we need to build this derivation. # We can see by checking the outputs. for output in outputs: - if exists(deriv.output_mapping[output]): + if is_path_in_store(deriv.output_mapping[output]): if deriv not in existing: existing[deriv] = set() existing[deriv].add(output) diff --git a/src/pynix/utils.py b/src/pynix/utils.py index a0c74c7..ae139a7 100644 --- a/src/pynix/utils.py +++ b/src/pynix/utils.py @@ -3,7 +3,8 @@ import os from os import getenv from os.path import exists, join, dirname, isdir, realpath -from subprocess import check_output, PIPE, Popen, CalledProcessError +import sqlite3 +from subprocess import call, check_output, PIPE, Popen, CalledProcessError import six @@ -75,6 +76,21 @@ def strip_output(command, input=None, hide_stderr=False): NIX_DB_PATH = getenv("NIX_DB_PATH", join(NIX_STATE_PATH, "nix/db/db.sqlite")) +# This variable is true when we detect we're on a nixos linux. +if os.getenv("IS_NIXOS", "") != "": + IS_NIXOS = True +else: + IS_NIXOS = (call("nixos-version", shell=True, stderr=PIPE) == 0 or + isdir("/etc/nixos")) + +try: + NIX_DB_CON = sqlite3.connect(NIX_DB_PATH) + with NIX_DB_CON: + query = NIX_DB_CON.execute("select * from ValidPaths limit 1") + query.fetchall() +except Exception as e: + NIX_DB_CON = None + def nix_cmd(command_name, args=None): """Build a nix command, using the absolute path to the given nix binary. @@ -137,11 +153,13 @@ def tell_size(obj, word, suffix="s"): else: return "{} {}{}".format(len(obj), word, suffix) -def is_path_in_store(store_path, db_con=None): +def is_path_in_store(store_path, db_con=NIX_DB_CON): """Check if a path is in the nix store. Optionally provide a database connection which speeds things up. """ + # Ensure path is absolute + store_path = join(NIX_STORE_PATH, store_path) # If we have a connection to the database, all we have to # do is look in the database. if db_con is not None: From 4497f2d12f6366e08eebbbdfaa56810fcff01345 Mon Sep 17 00:00:00 2001 From: Allen Nelson Date: Fri, 28 Apr 2017 19:45:52 +0000 Subject: [PATCH 02/16] validate paths after fetching with retry logic --- src/pynix/binary_cache/client.py | 25 +++++++++------------ src/pynix/utils.py | 38 ++++++++++++++++++++++++-------- 2 files changed, 39 insertions(+), 24 deletions(-) diff --git a/src/pynix/binary_cache/client.py b/src/pynix/binary_cache/client.py index a718b1b..beef861 100644 --- a/src/pynix/binary_cache/client.py +++ b/src/pynix/binary_cache/client.py @@ -562,11 +562,14 @@ def _fetch_ordered_paths(self, store_paths): logging.info("Finished fetching {}".format( tell_size(store_paths, "path"))) - def _fetch_single(self, path): + def _fetch_single(self, path, retries_remaining=3): """Fetch a single path.""" # Return if the path has already been fetched, or already exists. if self._have_fetched(path): return + elif retries_remaining < 0: + logging.error("Too many retries for path {}!".format(path)) + raise ObjectNotBuilt(path) # First ensure that all referenced paths have been fetched. for ref in self.get_references(path): self._finish_fetching(ref) @@ -593,6 +596,10 @@ def _fetch_single(self, path): # Once extracted, convert it into a nix export object and import. export = narinfo.nar_to_export(data) imported_path = export.import_to_store() + if not is_path_in_store(imported_path): + logging.warn("Couldn't import fetched object for " + path) + return self._fetch_single( + path, retries_remaining=(retries_remaining - 1)) self._register_as_fetched(path) def _register_as_fetched(self, path): @@ -667,7 +674,7 @@ def sync_store(self, ignore): """ ignore = [re.compile(r) for r in ignore] paths = [] - with self._db_con: + with self._db_con as con: query = con.execute("SELECT path FROM ValidPaths") for result in query.fetchall(): path = result[0] @@ -709,7 +716,6 @@ def build_fetch(self, nix_file, attributes, verbose=False, show_trace=True, fetch_order = self._compute_fetch_order(paths_to_fetch) # Perform the fetches. self._fetch_ordered_paths(fetch_order) - self._verify(need_to_fetch) # Build up the command for nix store to build the remaining paths. if len(need_to_build) > 0: args = ["--max-jobs", str(self._max_jobs), "--no-gc-warning", @@ -737,17 +743,6 @@ def _handle_build_failure(self, derivs_to_outputs): # TODO: report exactly which derivations succeeded/failed. raise NixBuildError() - def _verify(self, derivs_to_outputs): - """Given a derivation-output mapping, verify all paths.""" - logging.info("Verifying that we successfully created {}" - .format(tell_size(derivs_to_outputs, "store path"))) - for deriv, outputs in derivs_to_outputs.items(): - for output in outputs: - path = deriv.output_path(output) - logging.debug("Verifying path {}".format(basename(path))) - if not is_path_in_store(path, db_con=self._db_con): - raise ObjectNotBuilt(path) - def _create_symlinks(self, derivs_to_outputs, use_deriv_name): """Create symlinks to all built derivations. @@ -931,7 +926,7 @@ def main(): # Hide noisy logging of some external libs for name in ("requests", "urllib", "urllib2", "urllib3"): logging.getLogger(name).setLevel(logging.WARNING) - max_jobs = 1 if args.one else args.max_jobs + max_jobs = 1 if getattr(args, "one", False) else args.max_jobs client = NixCacheClient(endpoint=args.endpoint, dry_run=args.dry_run, username=args.username, max_jobs=max_jobs) try: diff --git a/src/pynix/utils.py b/src/pynix/utils.py index ae139a7..5a1d149 100644 --- a/src/pynix/utils.py +++ b/src/pynix/utils.py @@ -1,5 +1,6 @@ """Some utility functions to support store operations.""" import base64 +import logging import os from os import getenv from os.path import exists, join, dirname, isdir, realpath @@ -83,13 +84,27 @@ def strip_output(command, input=None, hide_stderr=False): IS_NIXOS = (call("nixos-version", shell=True, stderr=PIPE) == 0 or isdir("/etc/nixos")) -try: - NIX_DB_CON = sqlite3.connect(NIX_DB_PATH) - with NIX_DB_CON: - query = NIX_DB_CON.execute("select * from ValidPaths limit 1") - query.fetchall() -except Exception as e: - NIX_DB_CON = None +NIX_DB_ACCESSIBLE = None + +def connect_nix_db(): + """Attempt to connect to the nix DB, otherwise return None.""" + global NIX_DB_ACCESSIBLE + if NIX_DB_ACCESSIBLE is False: + return None + try: + connection = sqlite3.connect(NIX_DB_PATH) + if NIX_DB_ACCESSIBLE is None: + # Case: we don't know if the DB is accessible. Test it. + with connection: + query = connection.execute("select * from ValidPaths limit 1") + query.fetchall() + # Set to True so that we don't test unnecessarily later. + NIX_DB_ACCESSIBLE = True + return connection + except Exception as e: + # An exception was raised trying to connect to the DB. + NIX_DB_ACCESSIBLE = False + return None def nix_cmd(command_name, args=None): """Build a nix command, using the absolute path to the given nix binary. @@ -153,11 +168,12 @@ def tell_size(obj, word, suffix="s"): else: return "{} {}{}".format(len(obj), word, suffix) -def is_path_in_store(store_path, db_con=NIX_DB_CON): +def is_path_in_store(store_path, db_con=None, hide_stderr=True): """Check if a path is in the nix store. Optionally provide a database connection which speeds things up. """ + db_con = db_con or connect_nix_db() # Ensure path is absolute store_path = join(NIX_STORE_PATH, store_path) # If we have a connection to the database, all we have to @@ -169,12 +185,16 @@ def is_path_in_store(store_path, db_con=NIX_DB_CON): if len(results) > 0: return True else: + logging.debug("Tried to look up {} in the nix DB, not there." + .format(store_path)) return False else: # Otherwise we have to use the slower method :( Subprocess # into nix-store and execute a query. try: - query_store(store_path, "--hash", hide_stderr=True) + query_store(store_path, "--hash", hide_stderr=hide_stderr) return True except CalledProcessError: + logging.debug("Tried to use nix-store to query path {}, but " + "got an error".format(store_path)) return False From 2b778e5002215ea1bb24f5de867ba95dd3ebc053 Mon Sep 17 00:00:00 2001 From: Gregory Berns-Leone Date: Fri, 28 Apr 2017 16:27:58 -0500 Subject: [PATCH 03/16] Add retry logic for auth --- src/pynix/binary_cache/client.py | 51 +++++++++++++++++++------------- 1 file changed, 31 insertions(+), 20 deletions(-) diff --git a/src/pynix/binary_cache/client.py b/src/pynix/binary_cache/client.py index beef861..4e8297e 100644 --- a/src/pynix/binary_cache/client.py +++ b/src/pynix/binary_cache/client.py @@ -309,7 +309,7 @@ def recur(_paths): to_send.add(path) return to_send - def _connect(self, first_time=True): + def _connect(self, first_time=True, attempts=5): """Connect to a binary cache. Serves two purposes: verifying that the client can @@ -324,6 +324,9 @@ def _connect(self, first_time=True): called, so that we can tailor the error messaging. :type first_time: ``bool`` + :param attempts: How many more times to try connecting + :type attempts: ``int`` + :return: Either None or a Session object. :rtype: ``NoneType`` or :py:class:`requests.sessions.Session` @@ -377,26 +380,34 @@ def _connect(self, first_time=True): self._auth = session.auth = auth self._session = session return self._session - elif resp.status_code == 401 and sys.stdin.isatty(): - # Authorization failed. Give the user a chance to set new auth. - msg = "\033[31mAuthorization failed!\033[0m\n" \ - if not first_time else "" - msg += "Please enter \033[1musername\033[0m" - msg += " for {}".format(self._endpoint) if first_time else "" - if self._username is not None: - msg += " (default '{}'): ".format(self._username) + elif resp.status_code == 401: + if attempts > 0: + time.sleep(2) + logging.info("Invalid response. Retrying...") + return self._connect(first_time=False, attempts=attempts-1) + elif sys.stdin.isatty(): + # Authorization failed. Give the user a chance to set new auth. + msg = "\033[31mAuthorization failed!\033[0m\n" \ + if not first_time else "" + msg += "Please enter \033[1musername\033[0m" + msg += " for {}".format(self._endpoint) if first_time else "" + if self._username is not None: + msg += " (default '{}'): ".format(self._username) + else: + msg += ": " + try: + username = six.moves.input(msg).strip() + if username != "": + self._username = username + os.environ.pop("NIX_BINARY_CACHE_PASSWORD", None) + self._password = None + except (KeyboardInterrupt, EOFError): + logging.info("\nBye!") + sys.exit() + return self._connect(first_time=False) else: - msg += ": " - try: - username = six.moves.input(msg) - if username != "": - self._username = username - os.environ.pop("NIX_BINARY_CACHE_PASSWORD", None) - self._password = None - except (KeyboardInterrupt, EOFError): - logging.info("\nBye!") - sys.exit() - return self._connect(first_time=False) + raise CouldNotConnect(self._endpoint, resp.status_code, + resp.content) else: raise CouldNotConnect(self._endpoint, resp.status_code, resp.content) From 72f937368064ac80ce42b0321c0ac35177722654 Mon Sep 17 00:00:00 2001 From: Allen Nelson Date: Wed, 21 Jun 2017 21:20:53 +0000 Subject: [PATCH 04/16] speed up instantiation by defaulting to nix db connection --- src/pynix/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/pynix/utils.py b/src/pynix/utils.py index 5a1d149..104bde5 100644 --- a/src/pynix/utils.py +++ b/src/pynix/utils.py @@ -100,6 +100,7 @@ def connect_nix_db(): query.fetchall() # Set to True so that we don't test unnecessarily later. NIX_DB_ACCESSIBLE = True + if NIX_DB_ACCESSIBLE is True: return connection except Exception as e: # An exception was raised trying to connect to the DB. From a294ed57cba09fae5098b928e8fa32f21714b70c Mon Sep 17 00:00:00 2001 From: Allen Nelson Date: Mon, 17 Jul 2017 18:59:00 +0000 Subject: [PATCH 05/16] use PATH variable to find nix bin path --- src/pynix/utils.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/pynix/utils.py b/src/pynix/utils.py index 104bde5..7a6301b 100644 --- a/src/pynix/utils.py +++ b/src/pynix/utils.py @@ -58,10 +58,16 @@ def strip_output(command, input=None, hide_stderr=False): # Load nix paths from environment if "NIX_BIN_PATH" in os.environ: NIX_BIN_PATH = os.environ["NIX_BIN_PATH"] + assert exists(join(NIX_BIN_PATH, "nix-build")), \ + "Couldn't determine a valid nix binary path. Set NIX_BIN_PATH" else: - NIX_BIN_PATH = dirname(realpath(strip_output("type -p nix-env"))) -assert exists(join(NIX_BIN_PATH, "nix-build")), \ - "Couldn't determine a valid nix binary path. Set NIX_BIN_PATH" + for bin_path in os.environ["PATH"].split(os.pathsep): + if isdir(bin_path) and "nix-env" in os.listdir(bin_path): + NIX_BIN_PATH = realpath(bin_path) + break + else: + raise RuntimeError("nix-env isn't in the PATH") + # The store path can be given explicitly, or else it will be # inferred to be 2 levels up from the bin path. E.g., if the # bin path is /foo/bar/123-nix/bin, the store directory will From 21a349d212c73d1e5740c6803a1ff29eb423a9e1 Mon Sep 17 00:00:00 2001 From: Allen Nelson Date: Tue, 18 Jul 2017 16:45:53 +0000 Subject: [PATCH 06/16] fall back to using query_store when sqlite db fails --- src/pynix/utils.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/src/pynix/utils.py b/src/pynix/utils.py index 104bde5..4731f84 100644 --- a/src/pynix/utils.py +++ b/src/pynix/utils.py @@ -169,7 +169,8 @@ def tell_size(obj, word, suffix="s"): else: return "{} {}{}".format(len(obj), word, suffix) -def is_path_in_store(store_path, db_con=None, hide_stderr=True): +def is_path_in_store(store_path, db_con=None, hide_stderr=True, + ignore_db_con=False): """Check if a path is in the nix store. Optionally provide a database connection which speeds things up. @@ -179,10 +180,19 @@ def is_path_in_store(store_path, db_con=None, hide_stderr=True): store_path = join(NIX_STORE_PATH, store_path) # If we have a connection to the database, all we have to # do is look in the database. - if db_con is not None: + if db_con is not None and ignore_db_con is False: query = "select path from ValidPaths where path = ?" - with db_con: - results = db_con.execute(query, (store_path,)).fetchall() + try: + with db_con: + results = db_con.execute(query, (store_path,)).fetchall() + except sqlite3.OperationalError as err: + # This can happen under heavy disk load; if so fall back + # to querying with the nix-store executable. + logging.exception(err) + return is_path_in_store(store_path, + db_con=None, + hide_stderr=hide_stderr, + ignore_db_con=True) if len(results) > 0: return True else: From 9d5f063db93147eafb96e05998d278ccd9e22c77 Mon Sep 17 00:00:00 2001 From: Allen Nelson Date: Thu, 20 Jul 2017 11:21:04 -0500 Subject: [PATCH 07/16] rm a path that is not valid --- src/pynix/narinfo.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/pynix/narinfo.py b/src/pynix/narinfo.py index ac02a4b..ade857e 100644 --- a/src/pynix/narinfo.py +++ b/src/pynix/narinfo.py @@ -4,7 +4,7 @@ import os from os.path import join, basename, dirname import yaml -from subprocess import check_output, CalledProcessError +from subprocess import call, check_output, CalledProcessError from pynix.utils import decode_str, strip_output, nix_cmd, query_store from pynix.exceptions import NoNarGenerated, NixImportFailed @@ -291,6 +291,7 @@ def import_to_store(self): return strip_output(nix_cmd("nix-store", ["--import"]), input=self.to_bytes()) except CalledProcessError: + call(nix_cmd("nix-store", ["--delete", self.store_path])) raise NixImportFailed("See above stderr") def to_bytes(self): From 703fac4e508574a824de9447f4eebe3da0bc59ef Mon Sep 17 00:00:00 2001 From: Allen Nelson Date: Tue, 1 Aug 2017 18:56:39 +0000 Subject: [PATCH 08/16] implement fetch command, catch NixImportFailed --- src/pynix/binary_cache/client.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/pynix/binary_cache/client.py b/src/pynix/binary_cache/client.py index 4e8297e..b3bb3e6 100644 --- a/src/pynix/binary_cache/client.py +++ b/src/pynix/binary_cache/client.py @@ -16,7 +16,7 @@ import tempfile from threading import Thread, RLock, BoundedSemaphore from six.moves.urllib_parse import urlparse -from concurrent.futures import ThreadPoolExecutor, Future, wait, as_completed +from concurrent.futures import ThreadPoolExecutor from multiprocessing import cpu_count import yaml if sys.version_info >= (3, 0): @@ -606,8 +606,9 @@ def _fetch_single(self, path, retries_remaining=3): .format(narinfo.compression)) # Once extracted, convert it into a nix export object and import. export = narinfo.nar_to_export(data) - imported_path = export.import_to_store() - if not is_path_in_store(imported_path): + try: + imported_path = export.import_to_store() + except NixImportFailed: logging.warn("Couldn't import fetched object for " + path) return self._fetch_single( path, retries_remaining=(retries_remaining - 1)) @@ -948,7 +949,8 @@ def main(): elif args.command == "daemon": client.watch_store(args.ignore) elif args.command == "fetch": - wait(list(client.fetch_objects(args.paths).values())) + fetch_order = client._compute_fetch_order(args.paths) + client._fetch_ordered_paths(fetch_order) elif args.command == "build": keep_going = False if args.one else args.keep_going result_derivs = client.build_fetch( From b29bb6e4840d2ecc18ba4af976cbd565c936eb48 Mon Sep 17 00:00:00 2001 From: Allen Nelson Date: Wed, 16 Aug 2017 21:17:15 +0000 Subject: [PATCH 09/16] ignore drvs and tarballs, set some env variables for nix --- default.nix | 1 + src/pynix/binary_cache/client.py | 42 ++++++++++++++++++++++++-------- src/pynix/utils.py | 31 ++++++++++++++++++++++- 3 files changed, 63 insertions(+), 11 deletions(-) diff --git a/default.nix b/default.nix index 93f41c0..1930a52 100644 --- a/default.nix +++ b/default.nix @@ -33,6 +33,7 @@ pythonPackages.buildPythonPackage rec { six datadiff rtyaml + python_magic ] ++ (if isPy3 then [] else [ pythonPackages.futures pythonPackages.backports_lzma diff --git a/src/pynix/binary_cache/client.py b/src/pynix/binary_cache/client.py index b3bb3e6..a5b4499 100644 --- a/src/pynix/binary_cache/client.py +++ b/src/pynix/binary_cache/client.py @@ -7,7 +7,7 @@ import logging import os from os.path import (join, exists, isdir, isfile, expanduser, basename, - getmtime) + getmtime, splitext) import re import shutil from subprocess import (Popen, PIPE, check_output, CalledProcessError, @@ -45,7 +45,7 @@ from pynix.utils import (strip_output, decode_str, NIX_STORE_PATH, NIX_STATE_PATH, NIX_DB_PATH, nix_cmd, query_store, instantiate, tell_size, - is_path_in_store) + is_path_in_store, is_tarball) from pynix.exceptions import (CouldNotConnect, NixImportFailed, CliError, ObjectNotBuilt, NixBuildError, NoSuchObject) from pynix.binary_cache.nix_info_caches import PathReferenceCache @@ -61,6 +61,7 @@ # Limit of how many paths to show, so the screen doesn't flood. SHOW_PATHS_LIMIT = int(os.environ.get("SHOW_PATHS_LIMIT", 25)) + class NixCacheClient(object): """Wraps some state for sending store objects.""" def __init__(self, endpoint, dry_run=False, username=None, @@ -642,11 +643,15 @@ def _finish_fetching(self, path): # Now that we have the future, wait for it to finish before returning. future.result() - def watch_store(self, ignore): + def watch_store(self, ignore, include_drvs=False, include_tarballs=False): """Watch the nix store's timestamp and sync whenever it changes. :param ignore: A list of regexes of objects to ignore when syncing. :type ignore: ``list`` of (``str`` or ``regex``) + :param include_drvs: Send derivation files to the repo. + :type include_drvs: ``bool`` + :param include_tarballs: Include tarballs. + :type include_tarballs: ``bool`` """ prev_stamp = None num_syncs = 0 @@ -664,7 +669,7 @@ def watch_store(self, ignore): logging.info("Store was modified at {}, syncing" .format(stamp.strftime("%H:%M:%S"))) try: - self.sync_store(ignore) + self.sync_store(ignore, include_drvs, include_tarballs) prev_stamp = stamp num_syncs += 1 except requests.exceptions.HTTPError as err: @@ -674,7 +679,7 @@ def watch_store(self, ignore): exit("Successfully syncronized with {} {} times." .format(self._endpoint, num_syncs)) - def sync_store(self, ignore): + def sync_store(self, ignore, include_drvs=False, include_tarballs=False): """Syncronize the local nix store to the endpoint. Reads all of the known paths in the nix SQLite database which @@ -683,6 +688,10 @@ def sync_store(self, ignore): :param ignore: A list of regexes of objects to ignore. :type ignore: ``list`` of (``str`` or ``regex``) + :param include_drvs: Include derivation files, normally not necessary. + :type include_drvs: ``bool`` + :param include_tarballs: Include tarballs. + :type include_tarballs: ``bool`` """ ignore = [re.compile(r) for r in ignore] paths = [] @@ -690,11 +699,16 @@ def sync_store(self, ignore): query = con.execute("SELECT path FROM ValidPaths") for result in query.fetchall(): path = result[0] - if any(ig.match(path) for ig in ignore): + if splitext(path)[1] == ".drv" and include_drvs is not True: + logging.debug("Skipping derivation {}".format(path)) + elif include_tarballs is not True and is_tarball(path): + logging.debug("Path {} appears to be a tarball, skipping" + .format(path)) + elif any(ig.match(path) for ig in ignore): logging.debug("Path {} matches an ignore regex, skipping" .format(path)) - continue - paths.append(path) + else: + paths.append(path) logging.info("Found {} paths in the store.".format(len(paths))) self.send_objects(paths) @@ -921,6 +935,12 @@ def _get_args(): for subparser in (sync, daemon): subparser.add_argument("--ignore", nargs="*", default=[], help="Regexes of store paths to ignore.") + subparser.add_argument("--include-drvs", action="store_true", + default=False, + help="Send .drv files to repo.") + subparser.add_argument("--include-tarballs", action="store_true", + default=False, + help="Send tarball files to repo.") # It doesn't make sense to have the daemon run in dry-run mode. subparser.set_defaults(dry_run=False) return parser.parse_args() @@ -945,9 +965,11 @@ def main(): if args.command == "send": client.send_objects(args.paths) elif args.command == "sync": - client.sync_store(args.ignore) + client.sync_store(args.ignore, args.include_drvs, + args.include_tarballs) elif args.command == "daemon": - client.watch_store(args.ignore) + client.watch_store(args.ignore, args.include_drvs, + args.include_tarballs) elif args.command == "fetch": fetch_order = client._compute_fetch_order(args.paths) client._fetch_ordered_paths(fetch_order) diff --git a/src/pynix/utils.py b/src/pynix/utils.py index dfe7e73..65c51c6 100644 --- a/src/pynix/utils.py +++ b/src/pynix/utils.py @@ -3,11 +3,12 @@ import logging import os from os import getenv -from os.path import exists, join, dirname, isdir, realpath +from os.path import exists, join, dirname, isdir, realpath, isfile import sqlite3 from subprocess import call, check_output, PIPE, Popen, CalledProcessError import six +import magic from pynix.exceptions import NixInstantiationError @@ -80,9 +81,14 @@ def strip_output(command, input=None, hide_stderr=False): NIX_STATE_PATH = getenv("NIX_STATE_PATH", join(dirname(NIX_STORE_PATH), "var")) assert isdir(NIX_STATE_PATH), \ "Nix state directory {} doesn't exist".format(NIX_STATE_PATH) +# Nix reads this env variable; set it here +os.environ["NIX_STATE_DIR"] = NIX_STATE_PATH NIX_DB_PATH = getenv("NIX_DB_PATH", join(NIX_STATE_PATH, "nix/db/db.sqlite")) +# Nix also reads this variable... +os.environ["NIX_DB_DIR"] = dirname(NIX_DB_PATH) + # This variable is true when we detect we're on a nixos linux. if os.getenv("IS_NIXOS", "") != "": IS_NIXOS = True @@ -215,3 +221,26 @@ def is_path_in_store(store_path, db_con=None, hide_stderr=True, logging.debug("Tried to use nix-store to query path {}, but " "got an error".format(store_path)) return False + +# Mimetypes of tarball files +TARBALL_MIMETYPES = set(['application/x-gzip', 'application/x-xz', + 'application/x-bzip2', 'application/zip']) + + +def is_tarball(store_path): + """Return true if the path is a tarball, or a directory which only + contains a tarball. + :param store_path: A nix store path. + :type store_path: ``str`` + + :return: True if the store path appears to be a tarball. + :rtype: ``bool`` + """ + if isfile(store_path): + path = store_path + elif isdir(store_path) and len(os.listdir(store_path)) == 1: + path = join(store_path, os.listdir(store_path)[0]) + else: + return False + mimetype = decode_str(magic.from_file(path, mime=True)) + return mimetype in TARBALL_MIMETYPES From 47105f04fd9b52fa118ceeaab8c7f9fbee3820b3 Mon Sep 17 00:00:00 2001 From: Allen Nelson Date: Thu, 17 Aug 2017 22:37:06 +0000 Subject: [PATCH 10/16] add ability to build expressions --- src/pynix/binary_cache/client.py | 21 +++++++++++---------- src/pynix/exceptions.py | 9 ++++++--- src/pynix/utils.py | 26 ++++++++++++++++++++------ 3 files changed, 37 insertions(+), 19 deletions(-) diff --git a/src/pynix/binary_cache/client.py b/src/pynix/binary_cache/client.py index b3bb3e6..b2eab98 100644 --- a/src/pynix/binary_cache/client.py +++ b/src/pynix/binary_cache/client.py @@ -698,8 +698,9 @@ def sync_store(self, ignore): logging.info("Found {} paths in the store.".format(len(paths))) self.send_objects(paths) - def build_fetch(self, nix_file, attributes, verbose=False, show_trace=True, - keep_going=True, create_links=False, use_deriv_name=True): + def build_fetch(self, nix_file=None, attributes=None, nix_expr=None, + verbose=False, show_trace=True, keep_going=True, + create_links=False, use_deriv_name=True): """Given a nix file, instantiate the given attributes within the file, query the server for which files can be fetched, and then build/fetch everything. @@ -707,11 +708,8 @@ def build_fetch(self, nix_file, attributes, verbose=False, show_trace=True, :return: A dictionary mapping derivations to outputs that were built. :rtype: ``dict`` """ - logging.info("Instantiating attribute{} {} from path {}" - .format("s" if len(attributes) > 1 else "", - ", ".join(attributes), nix_file)) - deriv_paths = instantiate(nix_file, attributes=attributes, - show_trace=show_trace) + deriv_paths = instantiate(nix_file=nix_file, attributes=attributes, + nix_expr=nix_expr, show_trace=show_trace) derivs_to_outputs = parse_deriv_paths(deriv_paths) need_to_build, need_to_fetch = self.preview_build(deriv_paths) if self._dry_run is True: @@ -882,7 +880,9 @@ def _get_args(): build.add_argument("-P", "--path", default=os.getcwd(), help="Base path to evaluate.") build.add_argument("attributes", nargs="*", - help="Expressions to evaluate.") + help="Attributes to evaluate.") + build.add_argument("--nix-expr", "-E", + help="Nix expression to evaluate.") build.add_argument("-v", "--verbose", action="store_true", default=False, help="Show verbose output.") build.add_argument("--no-trace", action="store_false", dest="show_trace", @@ -955,8 +955,9 @@ def main(): keep_going = False if args.one else args.keep_going result_derivs = client.build_fetch( nix_file=args.path, attributes=args.attributes, - verbose=args.verbose, show_trace=args.show_trace, - keep_going=keep_going, create_links=args.create_links, + nix_expr=args.nix_expr, verbose=args.verbose, + show_trace=args.show_trace, keep_going=keep_going, + create_links=args.create_links, use_deriv_name=not args.generic_link_name) if args.dry_run is False and args.print_paths is True: for deriv, outputs in result_derivs.items(): diff --git a/src/pynix/exceptions.py b/src/pynix/exceptions.py index def71de..1764360 100644 --- a/src/pynix/exceptions.py +++ b/src/pynix/exceptions.py @@ -90,10 +90,13 @@ def __init__(self, err_message): class NixInstantiationError(NixOperationError, CliError): """Raised when nix-instantiate fails.""" OPERATION = "nix-instantiate" - def __init__(self, nix_file, attributes): + def __init__(self, nix_file=None, attributes=None, nix_expr=None): self.nix_file = nix_file - self.attributes = attributes - if len(attributes) == 0: + self.nix_expr = nix_expr + self.attributes = attributes or [] + if nix_expr is not None: + message = "Couldn't evaluate expression {}".format(repr(nix_expr)) + elif len(attributes) == 0: message = "Couldn't evaluate file {}".format(nix_file) elif len(attributes) == 1: message = ("Couldn't evaluate attribute {} from file {}" diff --git a/src/pynix/utils.py b/src/pynix/utils.py index dfe7e73..71baa0e 100644 --- a/src/pynix/utils.py +++ b/src/pynix/utils.py @@ -144,18 +144,32 @@ def query_store(store_path, query, hide_stderr=False): result = strip_output(command, hide_stderr=hide_stderr) return result -def instantiate(nix_file, attributes=None, show_trace=True): +def instantiate(nix_file=None, attributes=None, nix_expr=None, + show_trace=True): """Wraps a call to nix-instantiate.""" - attributes = [] if attributes is None else attributes - command = nix_cmd("nix-instantiate", [nix_file, "--no-gc-warning"]) + if nix_expr is not None: + command = nix_cmd("nix-instantiate", ["-E", nix_expr]) + logging.info("Instantiating nix expression {}" + .format(repr(nix_expr))) + elif nix_file is not None: + logging.info("Instantiating attribute{} {} from path {}" + .format("s" if len(attributes) > 1 else "", + ", ".join(attributes), nix_file)) + attributes = [] if attributes is None else attributes + command = nix_cmd("nix-instantiate", [nix_file]) + for attr in attributes: + command.extend(["-A", attr]) + else: + raise ValueError("Either an expression or a nix file must be given.") + command.append("--no-gc-warning") if show_trace is True: command.append("--show-trace") - for attr in attributes: - command.extend(["-A", attr]) try: return strip_output(command).split() except CalledProcessError as err: - six.raise_from(NixInstantiationError(nix_file, attributes), err) + six.raise_from(NixInstantiationError(nix_file=nix_file, + nix_expr=nix_expr, + attributes=attributes), err) def tell_size(obj, word, suffix="s"): """Useful when you want to write a message to the user. From 12a4e13f0b0e489d12410b992ff15b5b5930d115 Mon Sep 17 00:00:00 2001 From: Allen Nelson Date: Thu, 31 Aug 2017 20:59:48 +0000 Subject: [PATCH 11/16] stuff happening --- src/pynix/binary_cache/client.py | 50 ++++++++++++++++++++++++++------ src/pynix/exceptions.py | 13 +++++++++ src/pynix/utils.py | 49 +++++++++++++++++++++++++++++-- 3 files changed, 100 insertions(+), 12 deletions(-) diff --git a/src/pynix/binary_cache/client.py b/src/pynix/binary_cache/client.py index c946a6f..c265b15 100644 --- a/src/pynix/binary_cache/client.py +++ b/src/pynix/binary_cache/client.py @@ -11,7 +11,7 @@ import re import shutil from subprocess import (Popen, PIPE, check_output, CalledProcessError, - check_call) + check_call, call) import sys import tempfile from threading import Thread, RLock, BoundedSemaphore @@ -42,12 +42,13 @@ import six from pynix import __version__ -from pynix.utils import (strip_output, decode_str, NIX_STORE_PATH, +from pynix.utils import (strip_output, decode_str, NIX_STORE_PATH, Streamer, NIX_STATE_PATH, NIX_DB_PATH, nix_cmd, query_store, instantiate, tell_size, is_path_in_store, is_tarball) from pynix.exceptions import (CouldNotConnect, NixImportFailed, CliError, - ObjectNotBuilt, NixBuildError, NoSuchObject) + ObjectNotBuilt, NixBuildError, NoSuchObject, + NixExportFailed) from pynix.binary_cache.nix_info_caches import PathReferenceCache from pynix.narinfo import NarInfo from pynix.build import needed_to_build_multi, parse_deriv_paths @@ -66,7 +67,7 @@ class NixCacheClient(object): """Wraps some state for sending store objects.""" def __init__(self, endpoint, dry_run=False, username=None, password=None, cache_location=None, cache_enabled=True, - max_jobs=cpu_count()): + max_jobs=cpu_count(), delete_invalid_paths=False): #: Server running servenix (string). self._endpoint = endpoint #: Base name of server (for caching). @@ -82,6 +83,8 @@ def __init__(self, endpoint, dry_run=False, username=None, self._username = None #: Ignored if username is None. self._password = password + #: If an invalid path is encountered, attempt to delete it. + self._delete_invalid_paths = delete_invalid_paths #: Set at a later point, if username is not None. self._auth = None #: Used to avoid unnecessary overhead in handshakes etc. @@ -436,21 +439,36 @@ def send_object(self, path, remaining_objects=None): # possible with current requests, or indeed possible in # general without knowing the file size. session = self._connect() - export = check_output(nix_cmd("nix-store", ["--export", path])) + proc = Popen(nix_cmd("nix-store", ["--export", path]), + stdout=PIPE, stderr=PIPE) + export, err = proc.communicate() + if proc.returncode != 0: + logging.error("Export of path {} failed.".format(path)) + deleted = False + if self._delete_invalid_paths is True: + logging.warn("Deleting {}. You can retry send afterwards." + .format(path)) + nix_store_args = ["--delete", path, "--ignore-liveness"] + result = call(nix_cmd("nix-store", nix_store_args)) + deleted = result == 0 + raise NixExportFailed(path, decode_str(err), deleted=deleted) # For large files, show progress when compressing if len(export) > 1000000: logging.info("Compressing {}".format(basename(path))) cmd = "pv -ptef -s {} | gzip".format(len(export)) proc = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE) data = proc.communicate(input=export)[0] + log_func = logging.info else: data = gzip.compress(export) + log_func = logging.debug url = "{}/import-path".format(self._endpoint) headers = {"Content-Type": "application/x-gzip"} + streamer = Streamer(path, data, log_func) try: logging.info("Sending {} ({} remaining)" .format(basename(path), len(remaining_objects))) - response = session.post(url, data=data, headers=headers) + response = session.post(url, data=streamer, headers=headers) response.raise_for_status() except requests.exceptions.HTTPError as err: try: @@ -932,6 +950,12 @@ def _get_args(): default=False, help="If true, reports which paths would " "be sent/fetched/built.") + subparser.add_argument("--log-to", default=None, + help="Log to this file (else stdout)") + subparser.add_argument("--log-format", help="Logging format.") + subparser.add_argument("--delete-invalid-paths", action="store_true", + default=False, + help="Attempt to delete invalid paths") for subparser in (sync, daemon): subparser.add_argument("--ignore", nargs="*", default=[], help="Regexes of store paths to ignore.") @@ -953,16 +977,24 @@ def main(): elif ENDPOINT_REGEX.match(args.endpoint) is None: exit("Invalid endpoint: '{}' does not match '{}'." .format(args.endpoint, ENDPOINT_REGEX.pattern)) - log_level = getattr(logging, args.log_level.upper()) - logging.basicConfig(level=log_level, format="%(message)s") + logging.basicConfig( + level=getattr(logging, args.log_level.upper()), + format=args.log_format, + filename=args.log_to + ) # Hide noisy logging of some external libs for name in ("requests", "urllib", "urllib2", "urllib3"): logging.getLogger(name).setLevel(logging.WARNING) max_jobs = 1 if getattr(args, "one", False) else args.max_jobs client = NixCacheClient(endpoint=args.endpoint, dry_run=args.dry_run, - username=args.username, max_jobs=max_jobs) + username=args.username, max_jobs=max_jobs, + delete_invalid_paths=args.delete_invalid_paths) try: if args.command == "send": + for path in args.paths: + if not is_path_in_store(path): + raise CliError("Path {} is not registered as a valid " + "path in the nix database.".format(path)) client.send_objects(args.paths) elif args.command == "sync": client.sync_store(args.ignore, args.include_drvs, diff --git a/src/pynix/exceptions.py b/src/pynix/exceptions.py index 1764360..9cd0452 100644 --- a/src/pynix/exceptions.py +++ b/src/pynix/exceptions.py @@ -28,6 +28,9 @@ class CliError(Exception): """ EXIT_MESSAGE = None RETURN_CODE = 1 + def __init__(self, message=None, return_code=1): + self.EXIT_MESSAGE = message + self.RETURN_CODE = return_code def exit(self): _name = type(self).__name__ if self.EXIT_MESSAGE is not None: @@ -87,6 +90,16 @@ def __init__(self, err_message): BaseHTTPError.__init__(self, message=message) self.EXIT_MESSAGE = message +class NixExportFailed(NixOperationError, CliError): + """Raised when the nix-store --export command fails.""" + OPERATION = "nix-store --export" + def __init__(self, path, stderr, deleted=False): + message = "Couldn't export {}. Stderr:\n{}".format(path, stderr) + if deleted is True: + message += "\nThe path was deleted; you may retry." + self.EXIT_MESSAGE = message + + class NixInstantiationError(NixOperationError, CliError): """Raised when nix-instantiate fails.""" OPERATION = "nix-instantiate" diff --git a/src/pynix/utils.py b/src/pynix/utils.py index 1191ad5..ff30224 100644 --- a/src/pynix/utils.py +++ b/src/pynix/utils.py @@ -1,11 +1,17 @@ """Some utility functions to support store operations.""" import base64 +import sys +if sys.version_info >= (3, 0): + from io import BytesIO +else: + from StringIO import StringIO as BytesIO import logging import os from os import getenv -from os.path import exists, join, dirname, isdir, realpath, isfile +from os.path import exists, join, dirname, isdir, realpath, isfile, basename import sqlite3 from subprocess import call, check_output, PIPE, Popen, CalledProcessError +import time import six import magic @@ -93,8 +99,8 @@ def strip_output(command, input=None, hide_stderr=False): if os.getenv("IS_NIXOS", "") != "": IS_NIXOS = True else: - IS_NIXOS = (call("nixos-version", shell=True, stderr=PIPE) == 0 or - isdir("/etc/nixos")) + result = call("nixos-version", shell=True, stdout=PIPE, stderr=PIPE) + IS_NIXOS = result == 0 or isdir("/etc/nixos") NIX_DB_ACCESSIBLE = None @@ -258,3 +264,40 @@ def is_tarball(store_path): return False mimetype = decode_str(magic.from_file(path, mime=True)) return mimetype in TARBALL_MIMETYPES + + +class Streamer(BytesIO): + """Wrapper around BytesIO which show progress of reads.""" + def __init__(self, path, data, log_func): + BytesIO.__init__(self, data) + self._streamed = 0 + self._len = len(data) + self._len_mb = len(data) / 1048576.0 + self._path = basename(path) + self._log_func = log_func + self._start_time = time.time() + self._last_percent_ten = None + self._last_print_time = self._start_time + + def read(self, *args, **kwargs): + """Read from the source, printing progress. + + Only prints if at least a half-second has elapsed. + """ + result = BytesIO.read(self, *args, **kwargs) + self._streamed += len(result) + bytes_per_sec = self._streamed / (time.time() - self._start_time) + percent = 100.0 * (float(self._streamed) / self._len) + percent_ten = int(percent) // 10 + if len(result) > 0: + if time.time() - self._last_print_time > 0.5: + streamed = self._streamed / 1048576.0 + self._log_func( + "{}: {:.2f}/{:.2f}MB ({:.2f}%), {:.2f} bytes/sec" + .format(self._path, streamed, self._len_mb, + percent, bytes_per_sec)) + self._last_print_time = time.time() + else: + self._log_func("{}: completed in {:2f} seconds" + .format(self._path, time.time() - self._start_time)) + return result From 2ba641f76ae6e88ce6eb8c4cd89ce16e9399c5cc Mon Sep 17 00:00:00 2001 From: Allen Nelson Date: Tue, 5 Sep 2017 13:58:42 -0500 Subject: [PATCH 12/16] skip tests on nixos --- default.nix | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/default.nix b/default.nix index 1930a52..34a3ed2 100644 --- a/default.nix +++ b/default.nix @@ -16,6 +16,9 @@ let propagatedBuildInputs = [pythonPackages.pyyaml]; }; isPy3 = pythonPackages.isPy3k or false; + + # Command to get the owner of a folder; different on linux vs darwin. + getOwner = if pkgs.stdenv.isLinux then "stat -c '%U'" else "stat -f '%Su'"; in pythonPackages.buildPythonPackage rec { @@ -40,11 +43,11 @@ pythonPackages.buildPythonPackage rec { pythonPackages.repoze_lru ]); checkPhase = '' - # HACK: try to detect this failure case at runtime - if ! nix-store -q --hash ${pkgs.nix} >/dev/null 2>&1; then - export NIX_REMOTE=daemon + if ${getOwner} ${pkgs.nix} >/dev/null 2>&1; then + echo "Skipping tests due to not working on root-owned nix store" + else + nosetests tests fi - nosetests tests ''; src = ./.; makeWrapperArgs = [ From 0b62fc41855d992c33773818d0e9c1459f6c313f Mon Sep 17 00:00:00 2001 From: Allen Nelson Date: Thu, 7 Sep 2017 12:34:41 -0500 Subject: [PATCH 13/16] fix indentation --- default.nix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/default.nix b/default.nix index 34a3ed2..3d8d377 100644 --- a/default.nix +++ b/default.nix @@ -43,7 +43,7 @@ pythonPackages.buildPythonPackage rec { pythonPackages.repoze_lru ]); checkPhase = '' - if ${getOwner} ${pkgs.nix} >/dev/null 2>&1; then + if ${getOwner} ${pkgs.nix} >/dev/null 2>&1; then echo "Skipping tests due to not working on root-owned nix store" else nosetests tests From a25fa6e79ad108a3a6ce7b9b81856ee57082e2d1 Mon Sep 17 00:00:00 2001 From: Allen Nelson Date: Fri, 22 Sep 2017 11:01:58 -0500 Subject: [PATCH 14/16] bug fix, use is_path_in_store instead of exists --- src/pynix/binary_cache/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pynix/binary_cache/client.py b/src/pynix/binary_cache/client.py index c265b15..9be80c2 100644 --- a/src/pynix/binary_cache/client.py +++ b/src/pynix/binary_cache/client.py @@ -6,7 +6,7 @@ import json import logging import os -from os.path import (join, exists, isdir, isfile, expanduser, basename, +from os.path import (join, isdir, isfile, expanduser, basename, getmtime, splitext) import re import shutil @@ -525,7 +525,7 @@ def _have_fetched(self, path): """ if path in self._paths_fetched: return True - elif exists(path): + elif is_path_in_store(path): self._paths_fetched.add(path) return True else: From c1a4460244124d5471fb17ae1c6c7bfc415c5da9 Mon Sep 17 00:00:00 2001 From: Allen Nelson Date: Fri, 29 Dec 2017 10:46:33 -0600 Subject: [PATCH 15/16] include the /nix directory in the NIX_STATE_PATH variable --- src/pynix/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/pynix/utils.py b/src/pynix/utils.py index ff30224..1185947 100644 --- a/src/pynix/utils.py +++ b/src/pynix/utils.py @@ -84,7 +84,8 @@ def strip_output(command, input=None, hide_stderr=False): "Nix store directory {} doesn't exist".format(NIX_STORE_PATH) # The state path can be given explicitly, or else it will be # inferred to be sibling to the store directory. -NIX_STATE_PATH = getenv("NIX_STATE_PATH", join(dirname(NIX_STORE_PATH), "var")) +NIX_STATE_PATH = getenv("NIX_STATE_PATH", + join(dirname(NIX_STORE_PATH), "var", "nix")) assert isdir(NIX_STATE_PATH), \ "Nix state directory {} doesn't exist".format(NIX_STATE_PATH) # Nix reads this env variable; set it here From 8e8a7234ffa0596ed175d07a5b3a02c94ef5b1e6 Mon Sep 17 00:00:00 2001 From: Allen Nelson Date: Wed, 10 Jan 2018 15:16:49 -0600 Subject: [PATCH 16/16] use requests instead of requests2 --- default.nix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/default.nix b/default.nix index 3d8d377..b8ce3c0 100644 --- a/default.nix +++ b/default.nix @@ -31,7 +31,7 @@ pythonPackages.buildPythonPackage rec { pkgs.pv pkgs.which flask - requests2 + requests ipdb six datadiff