diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index db916cb8..99f4a19b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -70,7 +70,7 @@ jobs: uses: prefix-dev/setup-pixi@v0.8.3 - name: Run tests via pixi tasks - run: pixi run test + run: pixi run -e default test - name: Build conda package run: pixi build @@ -78,3 +78,71 @@ jobs: - name: Test R package working-directory: R/package run: pixi run test + + # ─── S3 integration tests (MinIO) ────────────────────────────────────────── + # Uses MinIO to emulate S3 — no real AWS account or license required. + # Runs on every push/PR automatically. + test-s3: + name: S3 integration tests (MinIO) + runs-on: ubuntu-latest + + env: + AWS_ENDPOINT_URL: http://localhost:9000 + AWS_DEFAULT_REGION: us-east-1 + AWS_ACCESS_KEY_ID: minioadmin + AWS_SECRET_ACCESS_KEY: minioadmin + # example.16bits.bgen has 199 variants, 500 samples + BGEN_S3_EXPECTED_VARIANTS: "199" + BGEN_S3_EXPECTED_SAMPLES: "500" + BGEN_S3_TEST_BUCKET: bgen-test + + steps: + - uses: actions/checkout@v4 + + - name: Start MinIO + run: | + docker run -d --name minio \ + -p 9000:9000 \ + -e MINIO_ROOT_USER=minioadmin \ + -e MINIO_ROOT_PASSWORD=minioadmin \ + quay.io/minio/minio:latest server /data + # Wait up to 60s for MinIO to become healthy + for i in $(seq 1 30); do + curl -sf http://localhost:9000/minio/health/live && echo "MinIO ready" && break + sleep 2 + done + + - name: Set up pixi + uses: prefix-dev/setup-pixi@v0.8.3 + + # Create two buckets: + # bgen-test — authenticated access (default credentials) + # bgen-public — anonymous access (bucket policy allows public reads) + - name: Create bucket and upload example BGEN files + run: | + # awscli is pre-installed on ubuntu-latest runners + aws s3 mb "s3://${BGEN_S3_TEST_BUCKET}" --region us-east-1 + + KEY="ci-test/example.16bits.bgen" + aws s3 cp example/example.16bits.bgen "s3://${BGEN_S3_TEST_BUCKET}/${KEY}" + + # Public-access bucket: set a bucket policy allowing anonymous GetObject/HeadObject. + # MinIO (and S3) honour this without requiring per-object ACLs. + aws s3 mb "s3://bgen-public" --region us-east-1 + PUBLIC_KEY="example.16bits.bgen" + aws s3 cp example/example.16bits.bgen "s3://bgen-public/${PUBLIC_KEY}" + aws s3api put-bucket-policy --bucket bgen-public --policy '{ + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Principal": "*", + "Action": ["s3:GetObject"], + "Resource": ["arn:aws:s3:::bgen-public/*"] + }] + }' + + echo "BGEN_S3_TEST_URI=s3://${BGEN_S3_TEST_BUCKET}/${KEY}" >> "$GITHUB_ENV" + echo "BGEN_S3_PUBLIC_TEST_URI=s3://bgen-public/${PUBLIC_KEY}" >> "$GITHUB_ENV" + + - name: Build S3-enabled bgen and run integration tests + run: pixi run -e s3 --frozen test-s3 diff --git a/CMakeLists.txt b/CMakeLists.txt index 68bbfaf3..e06cc929 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,6 +5,7 @@ project(bgen VERSION 1.1.7 LANGUAGES C CXX) option(BGEN_BUILD_EXAMPLES "Build example programs" OFF) option(BGEN_BUILD_TESTS "Build tests" ON) option(BGEN_BUILD_TOOLS "Build command-line tools (bgenix, cat-bgen, edit-bgen)" ON) +option(BGEN_WITH_S3 "Enable AWS S3 remote file support" OFF) option(BUILD_SHARED_LIBS "Build shared libraries" OFF) # ─── Language standards ────────────────────────────────────────────────────── @@ -34,6 +35,11 @@ find_package(Boost REQUIRED COMPONENTS chrono ) +# ─── Optional: AWS S3 support ──────────────────────────────────────────────── +if(BGEN_WITH_S3) + find_package(AWSSDK REQUIRED COMPONENTS s3) +endif() + # ─── Generated version header ─────────────────────────────────────────────── configure_file( "${CMAKE_CURRENT_SOURCE_DIR}/cmake/bgen_revision.hpp.in" @@ -46,13 +52,18 @@ add_subdirectory(db) add_subdirectory(appcontext) # ─── Core bgen library ─────────────────────────────────────────────────────── -add_library(bgen +set(BGEN_SOURCES src/bgen.cpp src/IndexQuery.cpp src/MissingValue.cpp src/View.cpp src/zlib.cpp ) +if(BGEN_WITH_S3) + list(APPEND BGEN_SOURCES src/S3StreamBuf.cpp) +endif() + +add_library(bgen ${BGEN_SOURCES}) add_library(bgen::bgen ALIAS bgen) target_include_directories(bgen @@ -63,6 +74,9 @@ target_include_directories(bgen ${CMAKE_CURRENT_BINARY_DIR} ) target_compile_features(bgen PUBLIC cxx_std_17) +if(BGEN_WITH_S3) + target_compile_definitions(bgen PUBLIC BGEN_WITH_S3=1) +endif() target_link_libraries(bgen PUBLIC ZLIB::ZLIB @@ -72,6 +86,7 @@ target_link_libraries(bgen Boost::filesystem PRIVATE bgen::db + $<$:${AWSSDK_LINK_LIBRARIES}> ) # ─── Command-line tools ────────────────────────────────────────────────────── diff --git a/README.md b/README.md index 7f55d7d3..2d0217d2 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ | `std::random_shuffle` | used in tests | replaced with `std::shuffle` (C++17) | | CMake package config | none | `find_package(bgen)` works for downstream projects | | R package | bundled in build dir | self-contained in `R/package/` with its own `pixi.toml` | +| Remote files | local filesystem only | **AWS S3** support via range requests (`s3://…`) | --- @@ -31,6 +32,7 @@ - **[edit-bgen](https://bitbucket.org/gavinband/bgen/wiki/edit-bgen)** — edit BGEN file metadata - **[rbgen](R/package/)** — R package (separate pixi environment, not bundled in the conda package) - **[Example programs](example/)** — `bgen_to_vcf`, `count_alleles`, etc. +- **[AWS S3 support](#aws-s3-support)** — read BGEN files directly from S3 using `s3://bucket/key` URIs --- @@ -62,6 +64,12 @@ cmake --build build --parallel ctest --test-dir build --output-on-failure ``` +To also enable S3 support, add `-DBGEN_WITH_S3=ON` and ensure the AWS SDK for C++ is findable: + +```bash +cmake -S . -B build -G Ninja -DCMAKE_BUILD_TYPE=Release -DBGEN_WITH_S3=ON +``` + Install to a prefix: ```bash @@ -78,6 +86,77 @@ target_link_libraries(my_target PRIVATE bgen::bgen) --- +## AWS S3 support + +The library can read BGEN files directly from AWS S3 without downloading them first. +It uses [HTTP range requests](https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests) via the +[AWS SDK for C++](https://github.com/aws/aws-sdk-cpp), so only the blocks actually needed are fetched. + +### Enabling + +S3 support is opt-in. Pass `-DBGEN_WITH_S3=ON` to CMake and make sure the AWS SDK is on your `CMAKE_PREFIX_PATH`: + +```bash +# With pixi (installs aws-sdk-cpp automatically): +pixi run -e s3 configure +pixi run -e s3 build + +# Or with CMake directly (requires aws-sdk-cpp on the prefix path): +cmake -S . -B build -G Ninja \ + -DCMAKE_BUILD_TYPE=Release \ + -DBGEN_WITH_S3=ON +cmake --build build --parallel +``` + +### Usage + +Pass an `s3://bucket/key` URI anywhere a filename is accepted: + +```cpp +// C++ API +auto view = genfile::bgen::View::create("s3://my-bucket/cohort.bgen"); +while (view->read_variant(&snpid, &rsid, &chr, &pos, &alleles)) { + view->read_genotype_data_block(setter); +} +``` + +```bash +# Command-line tools +bgenix -g s3://my-bucket/cohort.bgen -list +cat-bgen -g s3://my-bucket/part1.bgen s3://my-bucket/part2.bgen -og merged.bgen +``` + +### Authentication + +Credentials are resolved by the AWS SDK's default provider chain in this order: + +1. Environment variables — `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` / `AWS_SESSION_TOKEN` +2. `~/.aws/credentials` and `~/.aws/config` +3. EC2/ECS/EKS instance metadata + +The AWS region is picked up from `AWS_DEFAULT_REGION` or `~/.aws/config`. +You can also set it programmatically when constructing a stream directly: + +```cpp +#include "genfile/bgen/S3StreamBuf.hpp" +auto stream = genfile::bgen::make_s3_istream("s3://my-bucket/cohort.bgen", "eu-west-1"); +``` + +### Tuning + +The default read block size is **1 MB**. For high-latency connections or very large genotype blocks, +construct an `S3StreamBuf` directly with a larger block size: + +```cpp +auto buf = std::make_unique( + "my-bucket", "cohort.bgen", + /* region = */ "us-east-1", + /* block_size = */ 8 * 1024 * 1024 // 8 MB +); +``` + +--- + ## R package The R package lives in [`R/package/`](R/package/) and has its own pixi environment: @@ -103,5 +182,6 @@ If you use this library, its tools, or example programs, please cite the origina Released under the [Boost Software License v1.0](LICENSE_1_0.txt) — a permissive open-source license compatible with many others. This repository also uses [SQLite](https://www.sqlite.org/copyright.html) (public domain), -[Boost](https://www.boost.org/users/license.html) (Boost Software License), and -[zstandard](https://github.com/facebook/zstd/blob/dev/LICENSE) (BSD). +[Boost](https://www.boost.org/users/license.html) (Boost Software License), +[zstandard](https://github.com/facebook/zstd/blob/dev/LICENSE) (BSD), and optionally the +[AWS SDK for C++](https://github.com/aws/aws-sdk-cpp/blob/main/LICENSE) (Apache 2.0, only when built with `-DBGEN_WITH_S3=ON`). diff --git a/cmake/bgenConfig.cmake.in b/cmake/bgenConfig.cmake.in index d6f64553..eb52ba1b 100644 --- a/cmake/bgenConfig.cmake.in +++ b/cmake/bgenConfig.cmake.in @@ -14,6 +14,10 @@ endif() cmake_policy(SET CMP0167 NEW) find_dependency(Boost COMPONENTS filesystem thread date_time timer chrono) +if(@BGEN_WITH_S3@) + find_dependency(AWSSDK COMPONENTS s3) +endif() + include("${CMAKE_CURRENT_LIST_DIR}/bgenTargets.cmake") check_required_components(bgen) diff --git a/genfile/include/genfile/bgen/S3StreamBuf.hpp b/genfile/include/genfile/bgen/S3StreamBuf.hpp new file mode 100644 index 00000000..7d5e5aa5 --- /dev/null +++ b/genfile/include/genfile/bgen/S3StreamBuf.hpp @@ -0,0 +1,78 @@ +// Copyright Julianus Pfeuffer 2024. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#ifndef GENFILE_BGEN_S3STREAMBUF_HPP +#define GENFILE_BGEN_S3STREAMBUF_HPP + +#include +#include +#include +#include +#include +#include + +namespace genfile { +namespace bgen { + +/// Parse an S3 URI of the form s3://bucket/key into its components. +/// Returns true if the URI was successfully parsed. +bool parse_s3_uri(std::string const& uri, std::string& bucket, std::string& key); + +/// Returns true if the given filename looks like an S3 URI (starts with "s3://"). +bool is_s3_uri(std::string const& filename); + +/// A std::streambuf implementation that reads from AWS S3 using range requests. +/// Supports seeking (seekg) and buffered block-based reads for efficiency. +class S3StreamBuf : public std::streambuf { +public: + /// Construct an S3StreamBuf for the given bucket and key. + /// block_size controls the granularity of range GET requests (default 1 MB). + S3StreamBuf(std::string const& bucket, std::string const& key, + std::string const& region = "", + std::size_t block_size = 1024 * 1024); + + ~S3StreamBuf() override; + + /// Get the total size of the S3 object in bytes. + std::int64_t object_size() const; + +protected: + // std::streambuf overrides + int_type underflow() override; + pos_type seekoff(off_type off, std::ios_base::seekdir dir, + std::ios_base::openmode which = std::ios_base::in) override; + pos_type seekpos(pos_type pos, + std::ios_base::openmode which = std::ios_base::in) override; + std::streamsize showmanyc() override; + +private: + void fetch_block(std::int64_t block_index); + std::int64_t current_position() const; + + struct Impl; + std::unique_ptr m_impl; + + std::string m_bucket; + std::string m_key; + std::size_t m_block_size; + std::int64_t m_object_size; + std::int64_t m_position; // logical position in the stream + + // Buffer for the currently loaded block + std::vector m_buffer; + std::int64_t m_buffer_block_index; // which block is loaded (-1 = none) +}; + +/// Create an std::istream that reads from an S3 URI. +/// The URI must be of the form s3://bucket/key. +/// Optionally specify the AWS region (otherwise uses SDK defaults/env). +std::unique_ptr make_s3_istream( + std::string const& s3_uri, + std::string const& region = ""); + +} // namespace bgen +} // namespace genfile + +#endif diff --git a/genfile/include/genfile/zlib.hpp b/genfile/include/genfile/zlib.hpp index f1082fdc..6f1f934a 100644 --- a/genfile/include/genfile/zlib.hpp +++ b/genfile/include/genfile/zlib.hpp @@ -77,7 +77,7 @@ namespace genfile { void zstd_uncompress( byte_t const* begin, byte_t const* const end, std::vector< T >* dest ) { std::size_t const source_size = ( end - begin ) ; std::size_t const dest_size = dest->size() * sizeof( T ) ; - std::size_t const uncompressed_size = ZSTD_getDecompressedSize( reinterpret_cast< void const* >( begin ), source_size ) ; + std::size_t const uncompressed_size = ZSTD_getFrameContentSize( reinterpret_cast< void const* >( begin ), source_size ) ; std::size_t const result = ZSTD_decompress( reinterpret_cast< void* >( &dest->operator[]( 0 ) ), dest_size, diff --git a/pixi.toml b/pixi.toml index 82b48c1c..d159a748 100644 --- a/pixi.toml +++ b/pixi.toml @@ -17,6 +17,17 @@ configure = "cmake -S . -B build -G Ninja -DCMAKE_BUILD_TYPE=Release -DCMAKE_PRE build = { cmd = "cmake --build build --parallel", depends-on = ["configure"] } test = { cmd = "ctest --test-dir build --output-on-failure -V", depends-on = ["build"] } +[feature.s3.dependencies] +aws-sdk-cpp = "*" + +[feature.s3.tasks] +configure = "cmake -S . -B build -G Ninja -DCMAKE_BUILD_TYPE=Release -DCMAKE_PREFIX_PATH=$CONDA_PREFIX -DBGEN_BUILD_TESTS=ON -DBGEN_WITH_S3=ON" +test-s3 = { cmd = "ctest --test-dir build --output-on-failure -V -R test_s3_bgen", depends-on = ["build"] } + +[environments] +default = { solve-group = "default" } +s3 = { features = ["s3"], solve-group = "default" } + [package] name = "bgen" version = "1.1.7" diff --git a/src/S3StreamBuf.cpp b/src/S3StreamBuf.cpp new file mode 100644 index 00000000..8deecdb7 --- /dev/null +++ b/src/S3StreamBuf.cpp @@ -0,0 +1,288 @@ +// Copyright Julianus Pfeuffer 2024. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include "genfile/bgen/S3StreamBuf.hpp" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace genfile { +namespace bgen { + +// ─── S3 URI parsing ───────────────────────────────────────────────────────── + +bool is_s3_uri(std::string const& filename) { + return filename.size() > 5 && filename.substr(0, 5) == "s3://"; +} + +bool parse_s3_uri(std::string const& uri, std::string& bucket, std::string& key) { + if (!is_s3_uri(uri)) { + return false; + } + // s3://bucket/key + std::string remainder = uri.substr(5); + auto slash_pos = remainder.find('/'); + if (slash_pos == std::string::npos || slash_pos == 0) { + return false; + } + bucket = remainder.substr(0, slash_pos); + key = remainder.substr(slash_pos + 1); + if (key.empty()) { + return false; + } + return true; +} + +// ─── AWS SDK lifecycle (init once) ────────────────────────────────────────── + +namespace { + struct AwsInitGuard { + AwsInitGuard() { + Aws::SDKOptions options; + Aws::InitAPI(options); + } + ~AwsInitGuard() { + Aws::SDKOptions options; + Aws::ShutdownAPI(options); + } + }; + + AwsInitGuard& ensure_aws_initialized() { + static AwsInitGuard guard; + return guard; + } +} // anonymous namespace + +// ─── S3StreamBuf implementation ───────────────────────────────────────────── + +struct S3StreamBuf::Impl { + std::shared_ptr client; +}; + +S3StreamBuf::S3StreamBuf( + std::string const& bucket, + std::string const& key, + std::string const& region, + std::size_t block_size +) + : m_impl(std::make_unique()) + , m_bucket(bucket) + , m_key(key) + , m_block_size(block_size) + , m_object_size(-1) + , m_position(0) + , m_buffer_block_index(-1) +{ + ensure_aws_initialized(); + + Aws::Client::ClientConfiguration config; + if (!region.empty()) { + config.region = region; + } else { + // Respect AWS_DEFAULT_REGION / AWS_REGION env vars + const char* env_region = std::getenv("AWS_DEFAULT_REGION"); + if (!env_region) env_region = std::getenv("AWS_REGION"); + if (env_region && env_region[0] != '\0') { + config.region = env_region; + } + } + + // Support AWS_ENDPOINT_URL (e.g. for LocalStack or compatible services) + const char* endpoint_url = std::getenv("AWS_ENDPOINT_URL"); + bool path_style = false; + if (endpoint_url && endpoint_url[0] != '\0') { + config.endpointOverride = endpoint_url; + path_style = true; // custom endpoints need path-style (not virtual-hosted) addressing + } + + // Support AWS_NO_SIGN_REQUEST=1 for public / anonymous buckets. + // Wrap empty AWSCredentials in a SimpleAWSCredentialsProvider (the S3Client + // constructor takes a CredentialsProvider, not a raw AWSCredentials object). + const char* no_sign = std::getenv("AWS_NO_SIGN_REQUEST"); + if (no_sign && (std::string(no_sign) == "1" || std::string(no_sign) == "true")) { + auto anon_provider = std::make_shared( + Aws::Auth::AWSCredentials("", "")); + m_impl->client = std::make_shared( + anon_provider, config, + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::RequestDependent, + !path_style); + } else { + m_impl->client = std::make_shared( + config, + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::RequestDependent, + !path_style); + } + + // HEAD request to get object size + Aws::S3::Model::HeadObjectRequest head_request; + head_request.SetBucket(m_bucket); + head_request.SetKey(m_key); + + auto outcome = m_impl->client->HeadObject(head_request); + if (!outcome.IsSuccess()) { + throw std::runtime_error( + "S3StreamBuf: failed to HEAD s3://" + m_bucket + "/" + m_key + + " — " + outcome.GetError().GetMessage()); + } + m_object_size = outcome.GetResult().GetContentLength(); +} + +S3StreamBuf::~S3StreamBuf() = default; + +std::int64_t S3StreamBuf::object_size() const { + return m_object_size; +} + +std::int64_t S3StreamBuf::current_position() const { + return m_position; +} + +void S3StreamBuf::fetch_block(std::int64_t block_index) { + if (block_index == m_buffer_block_index) { + return; // already loaded + } + + std::int64_t start = block_index * static_cast(m_block_size); + std::int64_t end = std::min(start + static_cast(m_block_size) - 1, + m_object_size - 1); + + // Build range string: "bytes=start-end" + std::ostringstream range_ss; + range_ss << "bytes=" << start << "-" << end; + + Aws::S3::Model::GetObjectRequest request; + request.SetBucket(m_bucket); + request.SetKey(m_key); + request.SetRange(range_ss.str()); + + auto outcome = m_impl->client->GetObject(request); + if (!outcome.IsSuccess()) { + throw std::runtime_error( + "S3StreamBuf: failed to GET range " + range_ss.str() + + " from s3://" + m_bucket + "/" + m_key + + " — " + outcome.GetError().GetMessage()); + } + + auto& body = outcome.GetResult().GetBody(); + std::int64_t bytes_expected = end - start + 1; + m_buffer.resize(static_cast(bytes_expected)); + body.read(m_buffer.data(), bytes_expected); + std::streamsize bytes_read = body.gcount(); + m_buffer.resize(static_cast(bytes_read)); + + m_buffer_block_index = block_index; +} + +S3StreamBuf::int_type S3StreamBuf::underflow() { + if (m_position >= m_object_size) { + return traits_type::eof(); + } + + std::int64_t block_index = m_position / static_cast(m_block_size); + fetch_block(block_index); + + std::int64_t offset_in_block = m_position - block_index * static_cast(m_block_size); + std::int64_t available = static_cast(m_buffer.size()) - offset_in_block; + + if (available <= 0) { + return traits_type::eof(); + } + + // Set up the get area + char* begin = m_buffer.data() + offset_in_block; + char* end = m_buffer.data() + m_buffer.size(); + setg(begin, begin, end); + + return traits_type::to_int_type(*gptr()); +} + +S3StreamBuf::pos_type S3StreamBuf::seekoff( + off_type off, std::ios_base::seekdir dir, + std::ios_base::openmode which) +{ + if (!(which & std::ios_base::in)) { + return pos_type(off_type(-1)); + } + + std::int64_t new_pos; + switch (dir) { + case std::ios_base::beg: + new_pos = off; + break; + case std::ios_base::cur: + // Current position = position + (gptr - eback) offset within buffer + new_pos = m_position + (gptr() - eback()) + off; + break; + case std::ios_base::end: + new_pos = m_object_size + off; + break; + default: + return pos_type(off_type(-1)); + } + + if (new_pos < 0 || new_pos > m_object_size) { + return pos_type(off_type(-1)); + } + + m_position = new_pos; + // Invalidate the get area so next read triggers underflow + setg(nullptr, nullptr, nullptr); + + return pos_type(new_pos); +} + +S3StreamBuf::pos_type S3StreamBuf::seekpos( + pos_type pos, std::ios_base::openmode which) +{ + return seekoff(off_type(pos), std::ios_base::beg, which); +} + +std::streamsize S3StreamBuf::showmanyc() { + std::int64_t remaining = m_object_size - m_position; + if (gptr() && gptr() < egptr()) { + remaining = m_object_size - (m_position + (gptr() - eback())); + } + return static_cast(std::max(0, remaining)); +} + +// ─── Factory function ─────────────────────────────────────────────────────── + +std::unique_ptr make_s3_istream( + std::string const& s3_uri, + std::string const& region) +{ + std::string bucket, key; + if (!parse_s3_uri(s3_uri, bucket, key)) { + throw std::invalid_argument( + "make_s3_istream: invalid S3 URI: " + s3_uri); + } + + auto streambuf = std::make_unique(bucket, key, region); + + // Create istream that owns the streambuf + auto stream = std::make_unique(streambuf.get()); + // Transfer ownership of streambuf via a custom deleter isn't straightforward; + // instead we use a small wrapper class. + struct S3IStream : public std::istream { + std::unique_ptr buf; + S3IStream(std::unique_ptr b) + : std::istream(b.get()), buf(std::move(b)) {} + }; + + return std::make_unique(std::move(streambuf)); +} + +} // namespace bgen +} // namespace genfile diff --git a/src/View.cpp b/src/View.cpp index 5176ea41..796fac70 100644 --- a/src/View.cpp +++ b/src/View.cpp @@ -16,6 +16,9 @@ #include "genfile/bgen/bgen.hpp" #include "genfile/bgen/IndexQuery.hpp" #include "genfile/bgen/View.hpp" +#if BGEN_WITH_S3 +#include "genfile/bgen/S3StreamBuf.hpp" +#endif namespace genfile { namespace bgen { @@ -165,18 +168,34 @@ namespace genfile { // Open the bgen file, read header data and gather metadata. void View::setup( std::string const& filename ) { m_file_metadata.filename = filename ; - m_file_metadata.last_write_time = boost::filesystem::last_write_time( filename ) ; - - // Open the stream - m_stream.reset( - new std::ifstream( filename.c_str(), std::ifstream::binary ) - ) ; - if( !*m_stream ) { - throw std::invalid_argument( filename ) ; - } - // get file size +#if BGEN_WITH_S3 + if( genfile::bgen::is_s3_uri( filename ) ) { + // S3 path: use S3 streaming + m_stream = genfile::bgen::make_s3_istream( filename ) ; + if( !*m_stream ) { + throw std::invalid_argument( filename ) ; + } + // Get file size via seek + m_stream->seekg( 0, std::ios::end ) ; + m_file_metadata.size = m_stream->tellg() ; + m_stream->seekg( 0, std::ios::beg ) ; + m_file_metadata.last_write_time = 0 ; // not available for S3 + } else +#endif { + // Local file path + m_file_metadata.last_write_time = boost::filesystem::last_write_time( filename ) ; + + // Open the stream + m_stream.reset( + new std::ifstream( filename.c_str(), std::ifstream::binary ) + ) ; + if( !*m_stream ) { + throw std::invalid_argument( filename ) ; + } + + // get file size std::streampos origin = m_stream->tellg() ; m_stream->seekg( 0, std::ios::end ) ; m_file_metadata.size = m_stream->tellg() - origin ; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index f1dd91b3..33700c3e 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -19,3 +19,22 @@ target_link_libraries(test_bgen PRIVATE ) add_test(NAME test_bgen COMMAND test_bgen) + +# ─── S3 integration tests (only when BGEN_WITH_S3=ON) ─────────────────────── +if(BGEN_WITH_S3) + add_executable(test_s3_bgen + integration/test_s3_bgen.cpp + ) + target_include_directories(test_s3_bgen PRIVATE + unit # reuse catch.hpp from the unit test directory + ${CMAKE_BINARY_DIR} + ) + target_link_libraries(test_s3_bgen PRIVATE + bgen::bgen + bgen::db + bgen::appcontext + Boost::filesystem + ) + # The test passes even when env vars are not set (it skips with WARN). + add_test(NAME test_s3_bgen COMMAND test_s3_bgen) +endif() diff --git a/test/integration/test_s3_bgen.cpp b/test/integration/test_s3_bgen.cpp new file mode 100644 index 00000000..a38ecd84 --- /dev/null +++ b/test/integration/test_s3_bgen.cpp @@ -0,0 +1,197 @@ +// S3 integration test for the bgen S3 streaming layer. +// +// Environment variables (all optional; test is skipped when absent): +// +// BGEN_S3_TEST_URI +// s3://bucket/path/to/file.bgen — accessed with default credential chain +// (AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY / instance profile / etc.) +// +// BGEN_S3_PUBLIC_TEST_URI +// s3://bucket/path/to/file.bgen — accessed with AWS_NO_SIGN_REQUEST=1 +// Must be in a bucket / with an object ACL that allows anonymous reads. +// +// Expected BGEN properties (can be overridden via env): +// BGEN_S3_EXPECTED_VARIANTS number of variants (default: 199) +// BGEN_S3_EXPECTED_SAMPLES number of samples (default: 500) +// +// To run: +// BGEN_S3_TEST_URI=s3://mybucket/example.16bits.bgen \ +// ctest --test-dir build -R test_s3_bgen -V +// +// For public-bucket anonymous access: +// BGEN_S3_PUBLIC_TEST_URI=s3://mybucket/example.16bits.bgen \ +// ctest --test-dir build -R test_s3_bgen -V + +#define CATCH_CONFIG_NO_POSIX_SIGNALS +#define CATCH_CONFIG_MAIN +#include "catch.hpp" + +#include "genfile/bgen/S3StreamBuf.hpp" +#include "genfile/bgen/bgen.hpp" + +#include +#include +#include +#include +#include + +// ─── Helpers ───────────────────────────────────────────────────────────────── + +namespace { + +// Holds what check_bgen_header returns. +struct BgenHeader { + genfile::bgen::Context ctx; + std::streamoff data_start; // = 4 + file_offset, byte position of first variant +}; + +// Read and verify a BGEN header from the given stream. +BgenHeader check_bgen_header(std::istream& stream, + uint32_t expected_variants, + uint32_t expected_samples) +{ + uint32_t offset = 0; + genfile::bgen::read_offset(stream, &offset); + REQUIRE(offset > 0); + + genfile::bgen::Context ctx; + genfile::bgen::read_header_block(stream, &ctx); + + REQUIRE(ctx.number_of_variants == expected_variants); + REQUIRE(ctx.number_of_samples == expected_samples); + REQUIRE((ctx.flags & 0x3u) != 0); // layout bits set + + return { ctx, static_cast(4 + offset) }; +} + +// Helper: read one variant's identifying data + skip genotype block. +void skip_one_variant(std::istream& stream, genfile::bgen::Context const& ctx) +{ + std::string snpid, rsid, chrom, allele0, allele1; + uint32_t position = 0; + + bool ok = genfile::bgen::read_snp_identifying_data( + stream, ctx, + &snpid, &rsid, &chrom, &position, + &allele0, &allele1 + ); + REQUIRE(ok); + REQUIRE(!chrom.empty()); + REQUIRE(position > 0); + + genfile::bgen::ignore_genotype_data_block(stream, ctx); +} + +// Parse uint32 from an env var (returns default_val if absent/invalid). +uint32_t env_uint32(const char* name, uint32_t default_val) +{ + const char* v = std::getenv(name); + if (!v || v[0] == '\0') return default_val; + try { return static_cast(std::stoul(v)); } + catch (...) { return default_val; } +} + +} // anonymous namespace + +// ───────────────────────────────────────────────────────────────────────────── +// Test 1 — authenticated (default credential chain) +// ───────────────────────────────────────────────────────────────────────────── +TEST_CASE("S3 authenticated read: BGEN header and first variant", "[s3][auth]") +{ + const char* uri_cstr = std::getenv("BGEN_S3_TEST_URI"); + if (!uri_cstr || uri_cstr[0] == '\0') { + WARN("Skipped — set BGEN_S3_TEST_URI to enable this test"); + return; + } + std::string uri(uri_cstr); + uint32_t exp_variants = env_uint32("BGEN_S3_EXPECTED_VARIANTS", 199); + uint32_t exp_samples = env_uint32("BGEN_S3_EXPECTED_SAMPLES", 500); + + INFO("URI: " << uri); + + std::unique_ptr stream; + REQUIRE_NOTHROW(stream = genfile::bgen::make_s3_istream(uri)); + REQUIRE(stream != nullptr); + REQUIRE(stream->good()); + + auto hdr = check_bgen_header(*stream, exp_variants, exp_samples); + + // Seek to first variant and read it + stream->seekg(hdr.data_start); + skip_one_variant(*stream, hdr.ctx); +} + +// ───────────────────────────────────────────────────────────────────────────── +// Test 2 — anonymous / public bucket (AWS_NO_SIGN_REQUEST=1) +// ───────────────────────────────────────────────────────────────────────────── +TEST_CASE("S3 anonymous read: BGEN header and first variant", "[s3][anon]") +{ + const char* uri_cstr = std::getenv("BGEN_S3_PUBLIC_TEST_URI"); + if (!uri_cstr || uri_cstr[0] == '\0') { + WARN("Skipped — set BGEN_S3_PUBLIC_TEST_URI to enable this test"); + return; + } + std::string uri(uri_cstr); + uint32_t exp_variants = env_uint32("BGEN_S3_EXPECTED_VARIANTS", 199); + uint32_t exp_samples = env_uint32("BGEN_S3_EXPECTED_SAMPLES", 500); + + INFO("URI: " << uri); + + // Enable anonymous (unsigned) requests for this test +#ifndef _WIN32 + ::setenv("AWS_NO_SIGN_REQUEST", "1", 1); +#endif + + std::unique_ptr stream; + REQUIRE_NOTHROW(stream = genfile::bgen::make_s3_istream(uri)); + REQUIRE(stream != nullptr); + REQUIRE(stream->good()); + + auto hdr = check_bgen_header(*stream, exp_variants, exp_samples); + + stream->seekg(hdr.data_start); + skip_one_variant(*stream, hdr.ctx); + +#ifndef _WIN32 + ::unsetenv("AWS_NO_SIGN_REQUEST"); +#endif +} + +// ───────────────────────────────────────────────────────────────────────────── +// Test 3 — seeking / random access +// ───────────────────────────────────────────────────────────────────────────── +TEST_CASE("S3 authenticated read: seeking works correctly", "[s3][auth][seek]") +{ + const char* uri_cstr = std::getenv("BGEN_S3_TEST_URI"); + if (!uri_cstr || uri_cstr[0] == '\0') { + WARN("Skipped — set BGEN_S3_TEST_URI to enable this test"); + return; + } + + std::unique_ptr stream; + REQUIRE_NOTHROW(stream = genfile::bgen::make_s3_istream(std::string(uri_cstr))); + + REQUIRE(stream->tellg() == std::streampos(0)); + + // Read 4 bytes (the offset field) + char buf[4] = {}; + stream->read(buf, 4); + REQUIRE(stream->gcount() == 4); + REQUIRE(stream->tellg() == std::streampos(4)); + + // Seek back to 0 and re-read — must get same bytes + stream->seekg(0, std::ios_base::beg); + REQUIRE(stream->tellg() == std::streampos(0)); + + char buf2[4] = {}; + stream->read(buf2, 4); + REQUIRE(stream->gcount() == 4); + for (int i = 0; i < 4; ++i) { + REQUIRE(static_cast(buf[i]) == + static_cast(buf2[i])); + } + + // Seek to end and verify size is non-zero + stream->seekg(0, std::ios_base::end); + REQUIRE(stream->tellg() > std::streampos(0)); +}