From 5eead5f159128485891660706f478f13d51a1541 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Wed, 10 Jun 2026 12:16:32 +0200 Subject: [PATCH] add pull replication endpoint (gen_statem) - phase 1 couchbeam_replicator is a gen_statem that pulls documents and their full revision history from a source CouchDB into a pluggable local target behaviour. Pull only, resumable via a target-stored checkpoint. Attachment bodies are deferred to phase 2 (target entries are {Doc, #{}} for now). - couchbeam_replicator: engine + target behaviour (revs_diff, write_docs, read/write_checkpoint); oneshot and continuous modes; status/pause/resume - couchbeam_replicator_ets: ets/dets reference target preserving _revisions - guides/replication.md, ex_doc Replication group - e2e group pull_replication_ops (docs + history + deletion + resume), wired into run-e2e.sh and the CI CouchDB matrix - CHANGELOG --- CHANGELOG.md | 8 + guides/replication.md | 110 +++++++++++ rebar.config | 2 + src/couchbeam_replicator.erl | 285 +++++++++++++++++++++++++++ src/couchbeam_replicator_ets.erl | 141 +++++++++++++ support/run-e2e.sh | 2 +- test/couchbeam_integration_SUITE.erl | 114 +++++++++++ 7 files changed, 661 insertions(+), 1 deletion(-) create mode 100644 guides/replication.md create mode 100644 src/couchbeam_replicator.erl create mode 100644 src/couchbeam_replicator_ets.erl diff --git a/CHANGELOG.md b/CHANGELOG.md index d5a16f52..96af3b6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file. ## [2.1.0] - 2026-06-09 +### Added + +- `couchbeam_replicator`: a `gen_statem` pull-replication endpoint that imports + documents (with full revision history) from a source CouchDB into a pluggable + local target behaviour, with resumable checkpoints. `couchbeam_replicator_ets` + is an ets/dets reference target. Pull only; attachment bodies are a follow-up. + See `guides/replication.md`. + ### Changed - Updated to hackney 4.2.2. Response bodies are now read eagerly: a normal diff --git a/guides/replication.md b/guides/replication.md new file mode 100644 index 00000000..f8ae8a35 --- /dev/null +++ b/guides/replication.md @@ -0,0 +1,110 @@ +# Pull replication endpoint + +`couchbeam_replicator` is a `gen_statem` that pulls documents from a source +CouchDB into a local store of your choice, preserving each document's full +revision history. It is *pull only* (CouchDB to the app) and can be stopped and +resumed. + +The local store is a module implementing the `couchbeam_replicator` behaviour. +`couchbeam_replicator_ets` is a reference target backed by an ets or dets table; +the same behaviour is how you would import into another store (for example +barrel_docdb). + +## Quick start + +```erlang +{ok, Server} = {ok, couchbeam:server_connection("http://127.0.0.1:5984", [])}, +{ok, Src} = couchbeam:open_db(Server, <<"mydb">>), + +%% a persistent (dets) target so the import and checkpoint survive a restart +{ok, Pid} = couchbeam_replicator:start_link( + Src, couchbeam_replicator_ets, + #{table => my_import, backend => dets, file => "my_import.dets"}, + [{repl_id, <<"mydb->my_import">>}, {notify, self()}]), + +receive {Pid, done} -> ok end. +``` + +Running the identical `start_link/4` again (after a restart, or to pick up new +changes) resumes from the stored checkpoint. + +## API + +``` +couchbeam_replicator:start_link(Source, TargetMod, TargetArgs, Opts) -> {ok, Pid} +couchbeam_replicator:status(Pid) -> {ok, #{state, repl_id, since, docs_written}} +couchbeam_replicator:pause(Pid) -> ok +couchbeam_replicator:resume(Pid) -> ok +couchbeam_replicator:stop(Pid) -> ok +``` + +Options (`Opts`): + +| Option | Default | Meaning | +| --- | --- | --- | +| `{mode, oneshot \| continuous}` | `oneshot` | stop when caught up, or keep polling | +| `{batch_size, N}` | `100` | changes per batch | +| `{interval, Ms}` | `5000` | continuous poll interval | +| `{repl_id, Bin}` | hash of source URL + target module | stable id used for resume | +| `{notify, Pid}` | none | receive progress messages | +| `{changes_options, L}` | `[]` | extra `_changes` query options (e.g. a filter) | + +Progress messages sent to `{notify, Pid}` are tagged with the replicator pid: +`{Pid, {checkpoint, Seq}}`, `{Pid, {docs_written, N}}`, `{Pid, done}`, +`{Pid, {error, Reason}}`. + +## How resume works + +Resume needs no `_local` document on the source; the target owns the checkpoint. + +1. Each replication has a stable `repl_id` (pass `{repl_id, Bin}`, or it defaults + to a hash of the source URL and the target module). +2. After each batch is written, the engine calls + `write_checkpoint(ReplId, LastSeq, State)`; `Seq` is CouchDB's opaque + `last_seq`. +3. On start, `read_checkpoint(ReplId, State)` returns the last seq (or `nil`), and + the changes feed continues from there. +4. Every batch goes through `revs_diff` before fetching, so replaying a change is + a no-op: a stale checkpoint costs a few redundant checks, never duplicates. + +Use the `dets` backend (or your own persistent store) for resume across restarts; +`ets` resumes within the node's lifetime. + +## Writing a target + +A target module implements: + +```erlang +-behaviour(couchbeam_replicator). + +init(Args) -> {ok, State} | {error, term()}. +revs_diff(IdRevs, State) -> {ok, Missing, State}. + %% IdRevs :: [{DocId, [Rev]}]; return the subset not yet stored. +write_docs(Entries, State) -> {ok, State} | {error, term()}. + %% Entries :: [{Doc, Atts}]; Doc is a map with _id/_rev/_revisions/ + %% (_deleted); Atts is #{} for now (attachment bodies come in a later phase). +read_checkpoint(ReplId, State) -> {ok, Seq | nil, State}. +write_checkpoint(ReplId, Seq, State) -> {ok, State}. +terminate(Reason, State) -> any(). %% optional +``` + +Each stored document keeps its `_revisions` field, so the full revision history +is available to reconstruct the document's revision tree. + +## ets/dets reference target + +`couchbeam_replicator_ets` stores, in a `set` table: + +``` +{{doc, DocId, Rev}, {Doc, Atts}} +{{leafs, DocId}, [Rev]} +{{checkpoint, ReplId}, Seq} +``` + +`init/1` args: `#{table => Name, backend => ets | dets, file => Path}` +(`backend` defaults to `ets`). Inspect a table with +`couchbeam_replicator_ets:get_doc/3`, `leaf_revs/2`, and `checkpoint/2`. + +An ets table created by the target is owned by the replicator process and is +deleted when a one-shot run finishes. To keep the result (or to resume later), +create the named table yourself first (it is reused) or use the `dets` backend. diff --git a/rebar.config b/rebar.config index c757ebf3..17da22ad 100644 --- a/rebar.config +++ b/rebar.config @@ -34,6 +34,7 @@ {"guides/MIGRATION.md", #{title => <<"Migration Guide">>}}, {"guides/changes.md", #{title => <<"Changes Feed">>}}, {"guides/views.md", #{title => <<"Views">>}}, + {"guides/replication.md", #{title => <<"Pull Replication">>}}, {"LICENSE", #{title => <<"License">>}} ]}, {main, "README.md"}, @@ -43,6 +44,7 @@ {groups_for_modules, [ {"Core", [couchbeam, couchbeam_doc, couchbeam_attachments]}, {"Views & Changes", [couchbeam_view, couchbeam_changes]}, + {"Replication", [couchbeam_replicator, couchbeam_replicator_ets]}, {"Utilities", [couchbeam_ejson, couchbeam_httpc, couchbeam_util, couchbeam_uuids]}, {"Application", [couchbeam_app, couchbeam_sup]} ]} diff --git a/src/couchbeam_replicator.erl b/src/couchbeam_replicator.erl new file mode 100644 index 00000000..d59898bf --- /dev/null +++ b/src/couchbeam_replicator.erl @@ -0,0 +1,285 @@ +%%% -*- erlang -*- +%%% +%%% This file is part of couchbeam released under the MIT license. +%%% See the NOTICE for more information. +%%% +%%% A client-side CouchDB *pull* replicator: it reads the changes feed and +%%% document revisions from a source CouchDB and hands them to a pluggable +%%% local target (a module implementing the `couchbeam_replicator' behaviour, +%%% e.g. `couchbeam_replicator_ets'). Document revision history is preserved. +%%% Replication is resumable: the target persists the last processed source +%%% sequence under a stable replication id. +%%% +%%% Pull only (CouchDB -> app). Attachment bodies are not fetched yet; each +%%% target entry is `{Doc, #{}}'. + +-module(couchbeam_replicator). +-behaviour(gen_statem). + +%% public API +-export([start_link/4, + stop/1, + status/1, + pause/1, + resume/1]). + +%% gen_statem callbacks +-export([callback_mode/0, init/1, terminate/3, code_change/4]). +-export([starting/3, replicating/3, paused/3]). + +-include("couchbeam.hrl"). + +-type repl_opt() :: {mode, oneshot | continuous} + | {batch_size, pos_integer()} + | {interval, pos_integer()} + | {repl_id, binary()} + | {notify, pid()} + | {changes_options, list()}. +-export_type([repl_opt/0]). + +%% Target behaviour: the local store the source is replicated into. +-callback init(Args :: term()) -> + {ok, TState :: term()} | {error, term()}. +-callback revs_diff(IdRevs :: [{DocId :: binary(), [Rev :: binary()]}], TState :: term()) -> + {ok, Missing :: [{DocId :: binary(), [Rev :: binary()]}], TState :: term()}. +-callback write_docs(Entries :: [{Doc :: map(), Atts :: #{binary() => binary()}}], TState :: term()) -> + {ok, TState :: term()} | {error, term()}. +-callback read_checkpoint(ReplId :: binary(), TState :: term()) -> + {ok, Seq :: term() | nil, TState :: term()}. +-callback write_checkpoint(ReplId :: binary(), Seq :: term(), TState :: term()) -> + {ok, TState :: term()}. +-callback terminate(Reason :: term(), TState :: term()) -> any(). +-optional_callbacks([terminate/2]). + +-record(data, { + source :: db(), + target_mod :: module(), + target_state :: term(), + repl_id :: binary(), + mode = oneshot :: oneshot | continuous, + batch_size = 100 :: pos_integer(), + interval = 5000 :: pos_integer(), + changes_options = [] :: list(), + notify :: pid() | undefined, + since = 0 :: term(), + docs_written = 0 :: non_neg_integer() +}). + +%%==================================================================== +%% Public API +%%==================================================================== + +%% @doc Start a pull replication from Source into TargetMod. +-spec start_link(Source :: db(), TargetMod :: module(), + TargetArgs :: term(), Opts :: [repl_opt()]) -> + {ok, pid()} | {error, term()}. +start_link(Source, TargetMod, TargetArgs, Opts) -> + gen_statem:start_link(?MODULE, {Source, TargetMod, TargetArgs, Opts}, []). + +%% @doc Stop the replicator. +-spec stop(pid()) -> ok. +stop(Pid) -> + gen_statem:stop(Pid). + +%% @doc Current replication status. +-spec status(pid()) -> + {ok, #{state := atom(), repl_id := binary(), + since := term(), docs_written := non_neg_integer()}}. +status(Pid) -> + gen_statem:call(Pid, status). + +%% @doc Pause a running replication. +-spec pause(pid()) -> ok. +pause(Pid) -> + gen_statem:call(Pid, pause). + +%% @doc Resume a paused replication. +-spec resume(pid()) -> ok. +resume(Pid) -> + gen_statem:call(Pid, resume). + +%%==================================================================== +%% gen_statem callbacks +%%==================================================================== + +callback_mode() -> + state_functions. + +init({Source, TargetMod, TargetArgs, Opts}) -> + ReplId = case proplists:get_value(repl_id, Opts) of + undefined -> default_repl_id(Source, TargetMod); + Id when is_binary(Id) -> Id + end, + case TargetMod:init(TargetArgs) of + {ok, TState} -> + Data = #data{source = Source, + target_mod = TargetMod, + target_state = TState, + repl_id = ReplId, + mode = proplists:get_value(mode, Opts, oneshot), + batch_size = proplists:get_value(batch_size, Opts, 100), + interval = proplists:get_value(interval, Opts, 5000), + changes_options = proplists:get_value(changes_options, Opts, []), + notify = proplists:get_value(notify, Opts)}, + {ok, starting, Data, {next_event, internal, init}}; + {error, Reason} -> + {stop, Reason} + end. + +terminate(Reason, _State, #data{target_mod = TMod, target_state = TS}) -> + case erlang:function_exported(TMod, terminate, 2) of + true -> try TMod:terminate(Reason, TS) catch _:_ -> ok end; + false -> ok + end, + ok. + +code_change(_OldVsn, State, Data, _Extra) -> + {ok, State, Data}. + +%%==================================================================== +%% States +%%==================================================================== + +%% starting: read the source info and the target checkpoint, then replicate. +starting(internal, init, Data) -> + #data{source = Source, target_mod = TMod, target_state = TS, + repl_id = ReplId} = Data, + case couchbeam:db_info(Source) of + {ok, _Info} -> + {ok, Checkpoint, TS1} = TMod:read_checkpoint(ReplId, TS), + Since = case Checkpoint of nil -> 0; S -> S end, + Data1 = Data#data{target_state = TS1, since = Since}, + {next_state, replicating, Data1, {next_event, internal, pull}}; + {error, Reason} -> + notify(Data, {error, Reason}), + {stop, {source_unavailable, Reason}} + end; +starting(EventType, EventContent, Data) -> + handle_common(starting, EventType, EventContent, Data). + +%% replicating: pull one batch at a time until caught up. +replicating(internal, pull, Data) -> + case pull_batch(Data) of + {caught_up, Data1} -> + case Data1#data.mode of + oneshot -> + notify(Data1, done), + {stop, normal, Data1}; + continuous -> + {keep_state, Data1, {state_timeout, Data1#data.interval, pull}} + end; + {more, Data1} -> + {keep_state, Data1, {next_event, internal, pull}}; + {error, Reason, Data1} -> + notify(Data1, {error, Reason}), + {stop, {replication_error, Reason}, Data1} + end; +replicating(state_timeout, pull, _Data) -> + {keep_state_and_data, {next_event, internal, pull}}; +replicating({call, From}, pause, Data) -> + {next_state, paused, Data, {reply, From, ok}}; +replicating(EventType, EventContent, Data) -> + handle_common(replicating, EventType, EventContent, Data). + +%% paused: do nothing until resumed. +paused({call, From}, resume, Data) -> + {next_state, replicating, Data, + [{reply, From, ok}, {next_event, internal, pull}]}; +paused(EventType, EventContent, Data) -> + handle_common(paused, EventType, EventContent, Data). + +%%==================================================================== +%% Common event handling +%%==================================================================== + +handle_common(State, {call, From}, status, Data) -> + {keep_state_and_data, {reply, From, {ok, status_map(State, Data)}}}; +handle_common(_State, {call, From}, stop, Data) -> + {stop_and_reply, normal, {reply, From, ok}, Data}; +handle_common(_State, {call, From}, _Req, _Data) -> + {keep_state_and_data, {reply, From, {error, unsupported}}}; +handle_common(_State, _Type, _Content, _Data) -> + keep_state_and_data. + +%%==================================================================== +%% Replication batch +%%==================================================================== + +pull_batch(Data) -> + #data{source = Source, since = Since, batch_size = N, + changes_options = Extra, target_mod = TMod, + target_state = TS, repl_id = ReplId} = Data, + Opts = [{since, Since}, {style, all_docs}, {limit, N} | Extra], + case couchbeam_changes:follow_once(Source, Opts) of + {ok, LastSeq, []} -> + {ok, TS1} = TMod:write_checkpoint(ReplId, LastSeq, TS), + notify(Data, {checkpoint, LastSeq}), + {caught_up, Data#data{since = LastSeq, target_state = TS1}}; + {ok, LastSeq, Changes} -> + IdRevs = changes_to_idrevs(Changes), + {ok, Missing, TS1} = TMod:revs_diff(IdRevs, TS), + case fetch_missing(Source, Missing) of + {ok, Entries} -> + case TMod:write_docs(Entries, TS1) of + {ok, TS2} -> + {ok, TS3} = TMod:write_checkpoint(ReplId, LastSeq, TS2), + Written = Data#data.docs_written + length(Entries), + notify(Data, {docs_written, length(Entries)}), + notify(Data, {checkpoint, LastSeq}), + {more, Data#data{since = LastSeq, target_state = TS3, + docs_written = Written}}; + {error, Reason} -> + {error, Reason, Data#data{target_state = TS1}} + end; + {error, Reason} -> + {error, Reason, Data#data{target_state = TS1}} + end; + {error, Reason} -> + {error, Reason, Data} + end. + +changes_to_idrevs(Changes) -> + [{maps:get(<<"id">>, C), + [maps:get(<<"rev">>, R) || R <- maps:get(<<"changes">>, C, [])]} + || C <- Changes]. + +%% Fetch each missing revision with its revision history (Phase 1: no +%% attachment bodies, so every entry carries `#{}'). +fetch_missing(Source, Missing) -> + try + Entries = lists:flatmap( + fun({DocId, Revs}) -> + [fetch_rev(Source, DocId, Rev) || Rev <- Revs] + end, Missing), + {ok, Entries} + catch + throw:{fetch_error, Reason} -> {error, Reason} + end. + +fetch_rev(Source, DocId, Rev) -> + case couchbeam:open_doc(Source, DocId, [{rev, Rev}, {revs, true}]) of + {ok, Doc} when is_map(Doc) -> + {Doc, #{}}; + {error, Reason} -> + throw({fetch_error, {DocId, Rev, Reason}}) + end. + +%%==================================================================== +%% Helpers +%%==================================================================== + +default_repl_id(#db{server = Server} = Source, TargetMod) -> + ServerUrl = couchbeam_httpc:server_url(Server), + DbUrl = couchbeam_httpc:db_url(Source), + Base = iolist_to_binary([ServerUrl, "/", DbUrl, "|", + atom_to_binary(TargetMod, utf8)]), + binary:encode_hex(crypto:hash(sha256, Base), lowercase). + +status_map(State, #data{repl_id = ReplId, since = Since, docs_written = N}) -> + #{state => State, repl_id => ReplId, since => Since, docs_written => N}. + +notify(#data{notify = undefined}, _Msg) -> + ok; +notify(#data{notify = Pid}, Msg) -> + Pid ! {self(), Msg}, + ok. diff --git a/src/couchbeam_replicator_ets.erl b/src/couchbeam_replicator_ets.erl new file mode 100644 index 00000000..cc2f061d --- /dev/null +++ b/src/couchbeam_replicator_ets.erl @@ -0,0 +1,141 @@ +%%% -*- erlang -*- +%%% +%%% This file is part of couchbeam released under the MIT license. +%%% See the NOTICE for more information. +%%% +%%% Reference `couchbeam_replicator' target backed by an ets or dets table. +%%% It stores every replicated revision (preserving `_revisions' history), +%%% the leaf revisions per document, and the replication checkpoint. +%%% +%%% Stored entries (a `set' table): +%%% {{doc, DocId, Rev}, {Doc, Atts}} +%%% {{leafs, DocId}, [Rev]} +%%% {{checkpoint, ReplId}, Seq} +%%% +%%% init/1 args is a map: +%%% #{table => atom(), %% table name (required) +%%% backend => ets | dets, %% default ets +%%% file => string()} %% dets file (default ".dets") +%%% +%%% Note: an ets table created here is owned by the replicator process and is +%%% deleted when it stops. For a one-shot run whose results must outlive the +%%% replicator (or to resume later), create the named table yourself first +%%% (init/1 reuses an existing table) or use the dets backend. + +-module(couchbeam_replicator_ets). +-behaviour(couchbeam_replicator). + +%% couchbeam_replicator callbacks +-export([init/1, revs_diff/2, write_docs/2, + read_checkpoint/2, write_checkpoint/3, terminate/2]). + +%% inspection helpers (operate on a table name/ref directly) +-export([get_doc/3, leaf_revs/2, checkpoint/2]). + +-record(st, {backend = ets :: ets | dets, tab :: atom()}). + +%%==================================================================== +%% Behaviour callbacks +%%==================================================================== + +init(Args) when is_map(Args) -> + Backend = maps:get(backend, Args, ets), + Name = maps:get(table, Args), + case Backend of + ets -> + Tab = case ets:info(Name) of + undefined -> ets:new(Name, [named_table, public, set]); + _ -> Name + end, + {ok, #st{backend = ets, tab = Tab}}; + dets -> + File = maps:get(file, Args, atom_to_list(Name) ++ ".dets"), + case dets:open_file(Name, [{file, File}, {type, set}]) of + {ok, Tab} -> {ok, #st{backend = dets, tab = Tab}}; + {error, Reason} -> {error, Reason} + end + end. + +revs_diff(IdRevs, #st{} = St) -> + Missing = lists:filtermap( + fun({DocId, Revs}) -> + Miss = [R || R <- Revs, not has_key(St, {doc, DocId, R})], + case Miss of + [] -> false; + _ -> {true, {DocId, Miss}} + end + end, IdRevs), + {ok, Missing, St}. + +write_docs(Entries, #st{} = St) -> + lists:foreach( + fun({Doc, Atts}) -> + DocId = maps:get(<<"_id">>, Doc), + Rev = maps:get(<<"_rev">>, Doc), + insert(St, {doc, DocId, Rev}, {Doc, Atts}), + Leafs = lookup(St, {leafs, DocId}, []), + insert(St, {leafs, DocId}, lists:usort([Rev | Leafs])) + end, Entries), + {ok, St}. + +read_checkpoint(ReplId, #st{} = St) -> + {ok, lookup(St, {checkpoint, ReplId}, nil), St}. + +write_checkpoint(ReplId, Seq, #st{} = St) -> + insert(St, {checkpoint, ReplId}, Seq), + {ok, St}. + +terminate(_Reason, #st{backend = dets, tab = Tab}) -> + dets:close(Tab); +terminate(_Reason, _St) -> + ok. + +%%==================================================================== +%% Inspection helpers (for users/tests). `Tab' is the table name. +%%==================================================================== + +%% @doc Return the stored {Doc, Atts} for a revision, or undefined. +get_doc(Tab, DocId, Rev) -> + case ets_or_dets_lookup(Tab, {doc, DocId, Rev}) of + [{_, V}] -> V; + [] -> undefined + end. + +%% @doc Return the leaf revisions stored for a document. +leaf_revs(Tab, DocId) -> + case ets_or_dets_lookup(Tab, {leafs, DocId}) of + [{_, Revs}] -> Revs; + [] -> [] + end. + +%% @doc Return the stored checkpoint sequence for a replication id, or nil. +checkpoint(Tab, ReplId) -> + case ets_or_dets_lookup(Tab, {checkpoint, ReplId}) of + [{_, Seq}] -> Seq; + [] -> nil + end. + +%%==================================================================== +%% Internal +%%==================================================================== + +insert(#st{backend = ets, tab = T}, K, V) -> ets:insert(T, {K, V}); +insert(#st{backend = dets, tab = T}, K, V) -> ok = dets:insert(T, {K, V}). + +lookup(St, K, Default) -> + case do_lookup(St, K) of + [{_, V}] -> V; + [] -> Default + end. + +do_lookup(#st{backend = ets, tab = T}, K) -> ets:lookup(T, K); +do_lookup(#st{backend = dets, tab = T}, K) -> dets:lookup(T, K). + +has_key(St, K) -> + do_lookup(St, K) =/= []. + +ets_or_dets_lookup(Tab, K) -> + case ets:info(Tab) of + undefined -> dets:lookup(Tab, K); + _ -> ets:lookup(Tab, K) + end. diff --git a/support/run-e2e.sh b/support/run-e2e.sh index 426ae6ca..d4d271da 100755 --- a/support/run-e2e.sh +++ b/support/run-e2e.sh @@ -19,7 +19,7 @@ export COUCHDB_URL COUCHDB_USER COUCHDB_PASS e2e_groups="server_ops database_ops document_ops bulk_ops attachment_ops \ view_ops design_ops changes_ops error_handling \ view_streaming_ops changes_streaming_ops replication_ops \ - db_management_ops mango_ops uuid_ops" + pull_replication_ops db_management_ops mango_ops uuid_ops" for group in $e2e_groups; do echo "=== e2e group: ${group} ===" diff --git a/test/couchbeam_integration_SUITE.erl b/test/couchbeam_integration_SUITE.erl index 2a11ab38..803fa168 100644 --- a/test/couchbeam_integration_SUITE.erl +++ b/test/couchbeam_integration_SUITE.erl @@ -98,6 +98,10 @@ replicate_continuous/1, replicate_filtered/1]). +%% Test cases - Pull replication endpoint (couchbeam_replicator) +-export([pull_replication_basic/1, + pull_replication_resume/1]). + %% Test cases - DB management operations -export([compact_database/1, compact_design_view/1, @@ -133,6 +137,7 @@ all() -> {group, view_streaming_ops}, {group, changes_streaming_ops}, {group, replication_ops}, + {group, pull_replication_ops}, {group, db_management_ops}, {group, mango_ops}, {group, uuid_ops}]. @@ -214,6 +219,10 @@ groups() -> replicate_map_object, replicate_continuous, replicate_filtered + ]}, + {pull_replication_ops, [sequence], [ + pull_replication_basic, + pull_replication_resume ]}, {db_management_ops, [sequence], [ compact_database, @@ -1682,6 +1691,111 @@ replicate_filtered(Config) -> ct:pal("Filtered replication created"), ok. +%%==================================================================== +%% Pull replication endpoint tests (couchbeam_replicator) +%%==================================================================== + +%% Pull docs (with revision history) and a deletion from a real CouchDB into +%% an ets-backed couchbeam_replicator target. +pull_replication_basic(Config) -> + Db = ?config(db, Config), + + %% docA: two revisions + {ok, A1} = couchbeam:save_doc(Db, #{<<"_id">> => <<"docA">>, <<"n">> => 1}), + {ok, A2} = couchbeam:save_doc(Db, A1#{<<"n">> => 2}), + SrcRevA = couchbeam_doc:get_rev(A2), + %% docC: single revision + {ok, _} = couchbeam:save_doc(Db, #{<<"_id">> => <<"docC">>, <<"v">> => true}), + %% docB: created then deleted + {ok, B1} = couchbeam:save_doc(Db, #{<<"_id">> => <<"docB">>, <<"x">> => 1}), + {ok, _} = couchbeam:delete_doc(Db, B1), + + Tab = new_repl_table(pull_basic), + {ok, _} = run_pull_oneshot(Db, Tab, <<"test-basic">>), + + %% docA: one leaf carrying its 2-revision history + [SrcRevA] = couchbeam_replicator_ets:leaf_revs(Tab, <<"docA">>), + {DocA, #{}} = couchbeam_replicator_ets:get_doc(Tab, <<"docA">>, SrcRevA), + #{<<"start">> := 2, <<"ids">> := IdsA} = maps:get(<<"_revisions">>, DocA), + 2 = length(IdsA), + 2 = maps:get(<<"n">>, DocA), + + %% docC present + [RevC] = couchbeam_replicator_ets:leaf_revs(Tab, <<"docC">>), + {_DocC, #{}} = couchbeam_replicator_ets:get_doc(Tab, <<"docC">>, RevC), + + %% docB: deletion replicated as a tombstone + [RevB] = couchbeam_replicator_ets:leaf_revs(Tab, <<"docB">>), + {DocB, #{}} = couchbeam_replicator_ets:get_doc(Tab, <<"docB">>, RevB), + true = maps:get(<<"_deleted">>, DocB), + + ct:pal("Pull replication basic passed"), + ok. + +%% A second run with the same repl_id resumes from the stored checkpoint and +%% imports only the changes added since. Uses its own db for an exact count. +pull_replication_resume(Config) -> + Server = ?config(server, Config), + DbName = iolist_to_binary([<<"couchbeam_pullresume_">>, + integer_to_binary(erlang:system_time(millisecond))]), + {ok, Db} = couchbeam:create_db(Server, DbName), + try + Tab = new_repl_table(pull_resume), + ReplId = <<"test-resume">>, + + {ok, _} = couchbeam:save_doc(Db, #{<<"_id">> => <<"r1">>, <<"v">> => 1}), + {ok, _} = couchbeam:save_doc(Db, #{<<"_id">> => <<"r2">>, <<"v">> => 1}), + + {ok, Written1} = run_pull_oneshot(Db, Tab, ReplId), + 2 = Written1, + Seq1 = couchbeam_replicator_ets:checkpoint(Tab, ReplId), + true = (Seq1 =/= nil), + [_] = couchbeam_replicator_ets:leaf_revs(Tab, <<"r1">>), + [_] = couchbeam_replicator_ets:leaf_revs(Tab, <<"r2">>), + + %% add two more docs and resume + {ok, _} = couchbeam:save_doc(Db, #{<<"_id">> => <<"r3">>, <<"v">> => 1}), + {ok, _} = couchbeam:save_doc(Db, #{<<"_id">> => <<"r4">>, <<"v">> => 1}), + + {ok, Written2} = run_pull_oneshot(Db, Tab, ReplId), + 2 = Written2, %% only the two new docs + Seq2 = couchbeam_replicator_ets:checkpoint(Tab, ReplId), + true = (Seq2 =/= Seq1), + [_] = couchbeam_replicator_ets:leaf_revs(Tab, <<"r3">>), + [_] = couchbeam_replicator_ets:leaf_revs(Tab, <<"r4">>), + + ct:pal("Pull replication resume passed"), + ok + after + couchbeam:delete_db(Db) + end. + +%% Create a fresh named ets table owned by the test process so it outlives the +%% one-shot replicator. +new_repl_table(Prefix) -> + Tab = list_to_atom(atom_to_list(Prefix) ++ "_" ++ + integer_to_list(erlang:unique_integer([positive]))), + ets:new(Tab, [named_table, public, set]), + Tab. + +%% Run a one-shot pull replication into Tab and return {ok, DocsWritten}. +run_pull_oneshot(Source, Tab, ReplId) -> + {ok, Pid} = couchbeam_replicator:start_link( + Source, couchbeam_replicator_ets, + #{table => Tab, backend => ets}, + [{repl_id, ReplId}, {notify, self()}]), + collect_pull(Pid, 0). + +collect_pull(Pid, Written) -> + receive + {Pid, {docs_written, N}} -> collect_pull(Pid, Written + N); + {Pid, {checkpoint, _Seq}} -> collect_pull(Pid, Written); + {Pid, done} -> {ok, Written}; + {Pid, {error, Reason}} -> ct:fail("pull replication error: ~p", [Reason]) + after 30000 -> + ct:fail("pull replication timeout") + end. + %%==================================================================== %% Database management tests %%====================================================================