Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Suggests:
dplyr,
magrittr,
readr,
testthat (>= 3.0.0),
testthat (>= 3.2.0),
knitr,
rmarkdown
Config/testthat/edition: 3
Expand Down
181 changes: 167 additions & 14 deletions R/duckdb.r
Original file line number Diff line number Diff line change
@@ -1,3 +1,86 @@
#' Build the argv vector for the get_dewey_urls.py subprocess
#'
#' Pure helper extracted so the R/Python boundary contract is testable
#' without spawning a subprocess. Order of positional args here MUST match
#' \code{inst/python/get_dewey_urls.py}.
#'
#' @keywords internal
#' @noRd
# Peek column names for a single URL via DuckDB. Used as a fallback when the
# Python script's schema discovery fails. Stays in lockstep with the URL set
# the caller is about to download.
#
# @keywords internal
# @noRd
peek_cols_from_url <- function(url, file_extension) {
read_fn <- ifelse(file_extension == ".snappy.parquet", "read_parquet", "read_csv")
con <- DBI::dbConnect(duckdb::duckdb())
on.exit(DBI::dbDisconnect(con), add = TRUE)
DBI::dbExecute(con, "INSTALL httpfs; LOAD httpfs;")
df <- DBI::dbGetQuery(con, glue::glue(
"SELECT * FROM {read_fn}(['{url}']) LIMIT 0"
))
colnames(df)
}

# Sentinel string for "no value provided". MUST match NONE_SENTINEL in
# inst/python/get_dewey_urls.py. Chosen so it cannot collide with any
# user-supplied value (api_key, partition_key, file_name).
DEWEYR_NULL_SENTINEL <- "__DEWEYR_NULL__"

build_get_dewey_urls_args <- function(script, api_key, data_id, file_name,
preview, partition_key_after,
partition_key_before,
python_version = "3.13") {
none <- DEWEYR_NULL_SENTINEL
c(
"run", "--python", python_version, script,
api_key,
data_id,
if (is.null(file_name)) none else as.character(file_name),
tolower(as.character(preview)),
if (is.null(partition_key_after)) none else as.character(partition_key_after),
if (is.null(partition_key_before)) none else as.character(partition_key_before)
)
}

# Reject an empty / NA / missing api_key BEFORE shelling out to uv.
# system2() drops empty-string args, which silently shifts every positional
# argv on the Python side and produces a confusing 401-on-the-wrong-URL.
validate_required_string <- function(value, arg_name) {
if (is.null(value) || !is.character(value) || length(value) != 1 ||
is.na(value) || !nzchar(value)) {
stop(arg_name, " must be a non-empty character string. ",
"Got: ", deparse(value),
if (identical(arg_name, "api_key"))
" (hint: Sys.getenv(\"DEWEY_API_KEY\") may be empty if .Renviron didn't load — try readRenviron(\"~/.Renviron\"))"
else "")
}
invisible(NULL)
}

#' Validate a partition_key argument
#'
#' Cheap sanity-check on user-supplied partition_key values before they go
#' across the R/Python boundary. Catches the common mistakes (empty string,
#' multi-element, embedded newline) while leaving date format up to the API.
#'
#' @keywords internal
#' @noRd
validate_partition_key <- function(value, arg_name) {
if (is.null(value)) return(invisible(NULL))
if (!is.character(value) || length(value) != 1 || is.na(value)) {
stop(arg_name, " must be a single character string or NULL.")
}
if (!nzchar(value)) {
stop(arg_name, " must not be an empty string. Pass NULL to skip.")
}
if (grepl("[\r\n]", value)) {
stop(arg_name, " must not contain newline characters.")
}
invisible(NULL)
}

#' Get Dewey dataset file metadata
#'
#' Calls the Dewey API via a Python script to retrieve download URLs and
Expand All @@ -10,6 +93,10 @@
#' @param preview If \code{TRUE}, returns only the first file URL instead of
#' paginating the full dataset manifest. Used internally by \code{preview_dewey()}.
#' Defaults to \code{FALSE}.
#' @param partition_key_after Optional partition key lower bound (inclusive).
#' Forwarded to deweypy. Ignored when \code{preview = TRUE}.
#' @param partition_key_before Optional partition key upper bound (inclusive).
#' Forwarded to deweypy. Ignored when \code{preview = TRUE}.
#'
#' @return A list with the following fields:
#' \describe{
Expand All @@ -23,23 +110,63 @@
#'
#' @keywords internal
#' @noRd
get_dewey_urls <- function(api_key, data_id, file_name = NULL, preview = FALSE) {
get_dewey_urls <- function(api_key, data_id, file_name = NULL, preview = FALSE,
partition_key_after = NULL, partition_key_before = NULL) {
validate_required_string(api_key, "api_key")
validate_required_string(data_id, "data_id")
if (!check_uv()) {
install_uv()
message("Restarting the terminal will increase speed of future runs")
}
data_id <- parse_url(data_id)
script <- system.file("python/get_dewey_urls.py", package = "deweyr")
args <- c(
"run", "--python", "3.13", script,
api_key,
data_id,
ifelse(is.null(file_name), "None", file_name),
tolower(as.character(preview))
args <- build_get_dewey_urls_args(
script = script,
api_key = api_key,
data_id = data_id,
file_name = file_name,
preview = preview,
partition_key_after = partition_key_after,
partition_key_before = partition_key_before
)
stderr_path <- tempfile("deweyr_stderr_", fileext = ".txt")
on.exit(unlink(stderr_path), add = TRUE)
result_raw <- system2("uv", args = args, stdout = TRUE, stderr = stderr_path)
exit_status <- attr(result_raw, "status")
err_lines <- if (file.exists(stderr_path)) readLines(stderr_path, warn = FALSE) else character()
stdout_str <- paste(result_raw, collapse = "")

# Treat empty stdout as failure — `system2(..., stdout = TRUE)` does not
# always set the `status` attribute when the command crashes early
# (e.g. uv not found on PATH after install_uv). The Python script always
# prints non-empty JSON on success.
failed <- (!is.null(exit_status) && exit_status != 0) || !nzchar(trimws(stdout_str))

if (failed) {
err_msg <- paste(err_lines, collapse = "\n")
parsed_err <- tryCatch(jsonlite::fromJSON(err_msg), error = function(e) NULL)
if (is.list(parsed_err) && !is.null(parsed_err$error)) {
stop("get_dewey_urls failed: ", parsed_err$error)
}
stop(
"get_dewey_urls failed",
if (!is.null(exit_status)) paste0(" (exit ", exit_status, ")") else "",
". Stderr:\n",
if (nzchar(err_msg)) err_msg else "(empty)"
)
}
if (length(err_lines) > 0) cat(err_lines, sep = "\n")

parsed <- tryCatch(
jsonlite::fromJSON(stdout_str),
error = function(e) {
stop(
"Failed to parse response from get_dewey_urls.py. ",
"Raw output: ", paste(result_raw, collapse = "\n")
)
}
)
result_raw <- system2("uv", args = args, stdout = TRUE, stderr = "stderr.txt")
cat(readLines("stderr.txt"), sep = "\n")
jsonlite::fromJSON(paste(result_raw, collapse = ""))
parsed
}

#' Preview a Dewey dataset
Expand Down Expand Up @@ -118,6 +245,11 @@ preview_dewey_duck <- function(api_key, data_id, limit = 10) {
#' @param select Optional vector of column indices, ranges, or names to download.
#' Accepts mixed input e.g. \code{c(1:3, 7, "CARRIER_NAME")}. The partition
#' column will always be added automatically if missing.
#' @param partition_key_after Optional character string (typically YYYY-MM-DD).
#' Pre-filters the file manifest to partitions on/after this key — drastically
#' reduces manifest size on large datasets like SafeGraph Visits.
#' @param partition_key_before Optional character string (typically YYYY-MM-DD).
#' Pre-filters the file manifest to partitions on/before this key.
#'
#' @return The path to the downloaded dataset folder, invisibly. Pipe into
#' \code{read_dewey()} to read immediately after downloading.
Expand Down Expand Up @@ -147,19 +279,40 @@ preview_dewey_duck <- function(api_key, data_id, limit = 10) {
#' select = c(1:3, "TOTAL")
#' )
#'
#' # Date-bounded download (huge speedup on Visits-scale datasets)
#' download_dewey_duck(api_key, data_id,
#' partition_key_after = "2024-01-01",
#' partition_key_before = "2024-02-01"
#' )
#'
#' # Download and read in one step
#' df <- download_dewey_duck(api_key, data_id, partition = "MONTH_DATE_PARSED") |>
#' read_dewey()
#' }
#'
#' @export
download_dewey_duck <- function(api_key, data_id, output_dir = get_download_dir(), partition, overwrite = FALSE, file_name = NULL, where = NULL, select = NULL) {
result <- get_dewey_urls(api_key, data_id, file_name = file_name)
download_dewey_duck <- function(api_key, data_id, output_dir = get_download_dir(),
partition, overwrite = FALSE, file_name = NULL,
where = NULL, select = NULL,
partition_key_after = NULL,
partition_key_before = NULL) {
validate_partition_key(partition_key_after, "partition_key_after")
validate_partition_key(partition_key_before, "partition_key_before")

result <- get_dewey_urls(
api_key, data_id,
file_name = file_name,
partition_key_after = partition_key_after,
partition_key_before = partition_key_before
)
cols <- result$cols # ✅ no second call needed

# fallback in case cols came back empty
# Fallback if Python's DuckDB schema-peek failed: query the FIRST URL we
# already have. This stays within the partition_key range — going through
# preview_dewey_duck() would peek at an unfiltered file and could mismatch
# the actual download set.
if (length(cols) == 0) {
cols <- colnames(preview_dewey_duck(api_key, data_id, limit = 0))
cols <- peek_cols_from_url(result$urls[[1]], result$file_extension)
}

if (missing(partition)) {
Expand Down
Loading