diff --git a/CHANGELOG.md b/CHANGELOG.md index d5a16f52..96af3b6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file. ## [2.1.0] - 2026-06-09 +### Added + +- `couchbeam_replicator`: a `gen_statem` pull-replication endpoint that imports + documents (with full revision history) from a source CouchDB into a pluggable + local target behaviour, with resumable checkpoints. `couchbeam_replicator_ets` + is an ets/dets reference target. Pull only; attachment bodies are a follow-up. + See `guides/replication.md`. + ### Changed - Updated to hackney 4.2.2. Response bodies are now read eagerly: a normal diff --git a/guides/replication.md b/guides/replication.md new file mode 100644 index 00000000..f8ae8a35 --- /dev/null +++ b/guides/replication.md @@ -0,0 +1,110 @@ +# Pull replication endpoint + +`couchbeam_replicator` is a `gen_statem` that pulls documents from a source +CouchDB into a local store of your choice, preserving each document's full +revision history. It is *pull only* (CouchDB to the app) and can be stopped and +resumed. + +The local store is a module implementing the `couchbeam_replicator` behaviour. +`couchbeam_replicator_ets` is a reference target backed by an ets or dets table; +the same behaviour is how you would import into another store (for example +barrel_docdb). + +## Quick start + +```erlang +{ok, Server} = {ok, couchbeam:server_connection("http://127.0.0.1:5984", [])}, +{ok, Src} = couchbeam:open_db(Server, <<"mydb">>), + +%% a persistent (dets) target so the import and checkpoint survive a restart +{ok, Pid} = couchbeam_replicator:start_link( + Src, couchbeam_replicator_ets, + #{table => my_import, backend => dets, file => "my_import.dets"}, + [{repl_id, <<"mydb->my_import">>}, {notify, self()}]), + +receive {Pid, done} -> ok end. +``` + +Running the identical `start_link/4` again (after a restart, or to pick up new +changes) resumes from the stored checkpoint. + +## API + +``` +couchbeam_replicator:start_link(Source, TargetMod, TargetArgs, Opts) -> {ok, Pid} +couchbeam_replicator:status(Pid) -> {ok, #{state, repl_id, since, docs_written}} +couchbeam_replicator:pause(Pid) -> ok +couchbeam_replicator:resume(Pid) -> ok +couchbeam_replicator:stop(Pid) -> ok +``` + +Options (`Opts`): + +| Option | Default | Meaning | +| --- | --- | --- | +| `{mode, oneshot \| continuous}` | `oneshot` | stop when caught up, or keep polling | +| `{batch_size, N}` | `100` | changes per batch | +| `{interval, Ms}` | `5000` | continuous poll interval | +| `{repl_id, Bin}` | hash of source URL + target module | stable id used for resume | +| `{notify, Pid}` | none | receive progress messages | +| `{changes_options, L}` | `[]` | extra `_changes` query options (e.g. a filter) | + +Progress messages sent to `{notify, Pid}` are tagged with the replicator pid: +`{Pid, {checkpoint, Seq}}`, `{Pid, {docs_written, N}}`, `{Pid, done}`, +`{Pid, {error, Reason}}`. + +## How resume works + +Resume needs no `_local` document on the source; the target owns the checkpoint. + +1. Each replication has a stable `repl_id` (pass `{repl_id, Bin}`, or it defaults + to a hash of the source URL and the target module). +2. After each batch is written, the engine calls + `write_checkpoint(ReplId, LastSeq, State)`; `Seq` is CouchDB's opaque + `last_seq`. +3. On start, `read_checkpoint(ReplId, State)` returns the last seq (or `nil`), and + the changes feed continues from there. +4. Every batch goes through `revs_diff` before fetching, so replaying a change is + a no-op: a stale checkpoint costs a few redundant checks, never duplicates. + +Use the `dets` backend (or your own persistent store) for resume across restarts; +`ets` resumes within the node's lifetime. + +## Writing a target + +A target module implements: + +```erlang +-behaviour(couchbeam_replicator). + +init(Args) -> {ok, State} | {error, term()}. +revs_diff(IdRevs, State) -> {ok, Missing, State}. + %% IdRevs :: [{DocId, [Rev]}]; return the subset not yet stored. +write_docs(Entries, State) -> {ok, State} | {error, term()}. + %% Entries :: [{Doc, Atts}]; Doc is a map with _id/_rev/_revisions/ + %% (_deleted); Atts is #{} for now (attachment bodies come in a later phase). +read_checkpoint(ReplId, State) -> {ok, Seq | nil, State}. +write_checkpoint(ReplId, Seq, State) -> {ok, State}. +terminate(Reason, State) -> any(). %% optional +``` + +Each stored document keeps its `_revisions` field, so the full revision history +is available to reconstruct the document's revision tree. + +## ets/dets reference target + +`couchbeam_replicator_ets` stores, in a `set` table: + +``` +{{doc, DocId, Rev}, {Doc, Atts}} +{{leafs, DocId}, [Rev]} +{{checkpoint, ReplId}, Seq} +``` + +`init/1` args: `#{table => Name, backend => ets | dets, file => Path}` +(`backend` defaults to `ets`). Inspect a table with +`couchbeam_replicator_ets:get_doc/3`, `leaf_revs/2`, and `checkpoint/2`. + +An ets table created by the target is owned by the replicator process and is +deleted when a one-shot run finishes. To keep the result (or to resume later), +create the named table yourself first (it is reused) or use the `dets` backend. diff --git a/rebar.config b/rebar.config index c757ebf3..17da22ad 100644 --- a/rebar.config +++ b/rebar.config @@ -34,6 +34,7 @@ {"guides/MIGRATION.md", #{title => <<"Migration Guide">>}}, {"guides/changes.md", #{title => <<"Changes Feed">>}}, {"guides/views.md", #{title => <<"Views">>}}, + {"guides/replication.md", #{title => <<"Pull Replication">>}}, {"LICENSE", #{title => <<"License">>}} ]}, {main, "README.md"}, @@ -43,6 +44,7 @@ {groups_for_modules, [ {"Core", [couchbeam, couchbeam_doc, couchbeam_attachments]}, {"Views & Changes", [couchbeam_view, couchbeam_changes]}, + {"Replication", [couchbeam_replicator, couchbeam_replicator_ets]}, {"Utilities", [couchbeam_ejson, couchbeam_httpc, couchbeam_util, couchbeam_uuids]}, {"Application", [couchbeam_app, couchbeam_sup]} ]} diff --git a/src/couchbeam_replicator.erl b/src/couchbeam_replicator.erl new file mode 100644 index 00000000..d59898bf --- /dev/null +++ b/src/couchbeam_replicator.erl @@ -0,0 +1,285 @@ +%%% -*- erlang -*- +%%% +%%% This file is part of couchbeam released under the MIT license. +%%% See the NOTICE for more information. +%%% +%%% A client-side CouchDB *pull* replicator: it reads the changes feed and +%%% document revisions from a source CouchDB and hands them to a pluggable +%%% local target (a module implementing the `couchbeam_replicator' behaviour, +%%% e.g. `couchbeam_replicator_ets'). Document revision history is preserved. +%%% Replication is resumable: the target persists the last processed source +%%% sequence under a stable replication id. +%%% +%%% Pull only (CouchDB -> app). Attachment bodies are not fetched yet; each +%%% target entry is `{Doc, #{}}'. + +-module(couchbeam_replicator). +-behaviour(gen_statem). + +%% public API +-export([start_link/4, + stop/1, + status/1, + pause/1, + resume/1]). + +%% gen_statem callbacks +-export([callback_mode/0, init/1, terminate/3, code_change/4]). +-export([starting/3, replicating/3, paused/3]). + +-include("couchbeam.hrl"). + +-type repl_opt() :: {mode, oneshot | continuous} + | {batch_size, pos_integer()} + | {interval, pos_integer()} + | {repl_id, binary()} + | {notify, pid()} + | {changes_options, list()}. +-export_type([repl_opt/0]). + +%% Target behaviour: the local store the source is replicated into. +-callback init(Args :: term()) -> + {ok, TState :: term()} | {error, term()}. +-callback revs_diff(IdRevs :: [{DocId :: binary(), [Rev :: binary()]}], TState :: term()) -> + {ok, Missing :: [{DocId :: binary(), [Rev :: binary()]}], TState :: term()}. +-callback write_docs(Entries :: [{Doc :: map(), Atts :: #{binary() => binary()}}], TState :: term()) -> + {ok, TState :: term()} | {error, term()}. +-callback read_checkpoint(ReplId :: binary(), TState :: term()) -> + {ok, Seq :: term() | nil, TState :: term()}. +-callback write_checkpoint(ReplId :: binary(), Seq :: term(), TState :: term()) -> + {ok, TState :: term()}. +-callback terminate(Reason :: term(), TState :: term()) -> any(). +-optional_callbacks([terminate/2]). + +-record(data, { + source :: db(), + target_mod :: module(), + target_state :: term(), + repl_id :: binary(), + mode = oneshot :: oneshot | continuous, + batch_size = 100 :: pos_integer(), + interval = 5000 :: pos_integer(), + changes_options = [] :: list(), + notify :: pid() | undefined, + since = 0 :: term(), + docs_written = 0 :: non_neg_integer() +}). + +%%==================================================================== +%% Public API +%%==================================================================== + +%% @doc Start a pull replication from Source into TargetMod. +-spec start_link(Source :: db(), TargetMod :: module(), + TargetArgs :: term(), Opts :: [repl_opt()]) -> + {ok, pid()} | {error, term()}. +start_link(Source, TargetMod, TargetArgs, Opts) -> + gen_statem:start_link(?MODULE, {Source, TargetMod, TargetArgs, Opts}, []). + +%% @doc Stop the replicator. +-spec stop(pid()) -> ok. +stop(Pid) -> + gen_statem:stop(Pid). + +%% @doc Current replication status. +-spec status(pid()) -> + {ok, #{state := atom(), repl_id := binary(), + since := term(), docs_written := non_neg_integer()}}. +status(Pid) -> + gen_statem:call(Pid, status). + +%% @doc Pause a running replication. +-spec pause(pid()) -> ok. +pause(Pid) -> + gen_statem:call(Pid, pause). + +%% @doc Resume a paused replication. +-spec resume(pid()) -> ok. +resume(Pid) -> + gen_statem:call(Pid, resume). + +%%==================================================================== +%% gen_statem callbacks +%%==================================================================== + +callback_mode() -> + state_functions. + +init({Source, TargetMod, TargetArgs, Opts}) -> + ReplId = case proplists:get_value(repl_id, Opts) of + undefined -> default_repl_id(Source, TargetMod); + Id when is_binary(Id) -> Id + end, + case TargetMod:init(TargetArgs) of + {ok, TState} -> + Data = #data{source = Source, + target_mod = TargetMod, + target_state = TState, + repl_id = ReplId, + mode = proplists:get_value(mode, Opts, oneshot), + batch_size = proplists:get_value(batch_size, Opts, 100), + interval = proplists:get_value(interval, Opts, 5000), + changes_options = proplists:get_value(changes_options, Opts, []), + notify = proplists:get_value(notify, Opts)}, + {ok, starting, Data, {next_event, internal, init}}; + {error, Reason} -> + {stop, Reason} + end. + +terminate(Reason, _State, #data{target_mod = TMod, target_state = TS}) -> + case erlang:function_exported(TMod, terminate, 2) of + true -> try TMod:terminate(Reason, TS) catch _:_ -> ok end; + false -> ok + end, + ok. + +code_change(_OldVsn, State, Data, _Extra) -> + {ok, State, Data}. + +%%==================================================================== +%% States +%%==================================================================== + +%% starting: read the source info and the target checkpoint, then replicate. +starting(internal, init, Data) -> + #data{source = Source, target_mod = TMod, target_state = TS, + repl_id = ReplId} = Data, + case couchbeam:db_info(Source) of + {ok, _Info} -> + {ok, Checkpoint, TS1} = TMod:read_checkpoint(ReplId, TS), + Since = case Checkpoint of nil -> 0; S -> S end, + Data1 = Data#data{target_state = TS1, since = Since}, + {next_state, replicating, Data1, {next_event, internal, pull}}; + {error, Reason} -> + notify(Data, {error, Reason}), + {stop, {source_unavailable, Reason}} + end; +starting(EventType, EventContent, Data) -> + handle_common(starting, EventType, EventContent, Data). + +%% replicating: pull one batch at a time until caught up. +replicating(internal, pull, Data) -> + case pull_batch(Data) of + {caught_up, Data1} -> + case Data1#data.mode of + oneshot -> + notify(Data1, done), + {stop, normal, Data1}; + continuous -> + {keep_state, Data1, {state_timeout, Data1#data.interval, pull}} + end; + {more, Data1} -> + {keep_state, Data1, {next_event, internal, pull}}; + {error, Reason, Data1} -> + notify(Data1, {error, Reason}), + {stop, {replication_error, Reason}, Data1} + end; +replicating(state_timeout, pull, _Data) -> + {keep_state_and_data, {next_event, internal, pull}}; +replicating({call, From}, pause, Data) -> + {next_state, paused, Data, {reply, From, ok}}; +replicating(EventType, EventContent, Data) -> + handle_common(replicating, EventType, EventContent, Data). + +%% paused: do nothing until resumed. +paused({call, From}, resume, Data) -> + {next_state, replicating, Data, + [{reply, From, ok}, {next_event, internal, pull}]}; +paused(EventType, EventContent, Data) -> + handle_common(paused, EventType, EventContent, Data). + +%%==================================================================== +%% Common event handling +%%==================================================================== + +handle_common(State, {call, From}, status, Data) -> + {keep_state_and_data, {reply, From, {ok, status_map(State, Data)}}}; +handle_common(_State, {call, From}, stop, Data) -> + {stop_and_reply, normal, {reply, From, ok}, Data}; +handle_common(_State, {call, From}, _Req, _Data) -> + {keep_state_and_data, {reply, From, {error, unsupported}}}; +handle_common(_State, _Type, _Content, _Data) -> + keep_state_and_data. + +%%==================================================================== +%% Replication batch +%%==================================================================== + +pull_batch(Data) -> + #data{source = Source, since = Since, batch_size = N, + changes_options = Extra, target_mod = TMod, + target_state = TS, repl_id = ReplId} = Data, + Opts = [{since, Since}, {style, all_docs}, {limit, N} | Extra], + case couchbeam_changes:follow_once(Source, Opts) of + {ok, LastSeq, []} -> + {ok, TS1} = TMod:write_checkpoint(ReplId, LastSeq, TS), + notify(Data, {checkpoint, LastSeq}), + {caught_up, Data#data{since = LastSeq, target_state = TS1}}; + {ok, LastSeq, Changes} -> + IdRevs = changes_to_idrevs(Changes), + {ok, Missing, TS1} = TMod:revs_diff(IdRevs, TS), + case fetch_missing(Source, Missing) of + {ok, Entries} -> + case TMod:write_docs(Entries, TS1) of + {ok, TS2} -> + {ok, TS3} = TMod:write_checkpoint(ReplId, LastSeq, TS2), + Written = Data#data.docs_written + length(Entries), + notify(Data, {docs_written, length(Entries)}), + notify(Data, {checkpoint, LastSeq}), + {more, Data#data{since = LastSeq, target_state = TS3, + docs_written = Written}}; + {error, Reason} -> + {error, Reason, Data#data{target_state = TS1}} + end; + {error, Reason} -> + {error, Reason, Data#data{target_state = TS1}} + end; + {error, Reason} -> + {error, Reason, Data} + end. + +changes_to_idrevs(Changes) -> + [{maps:get(<<"id">>, C), + [maps:get(<<"rev">>, R) || R <- maps:get(<<"changes">>, C, [])]} + || C <- Changes]. + +%% Fetch each missing revision with its revision history (Phase 1: no +%% attachment bodies, so every entry carries `#{}'). +fetch_missing(Source, Missing) -> + try + Entries = lists:flatmap( + fun({DocId, Revs}) -> + [fetch_rev(Source, DocId, Rev) || Rev <- Revs] + end, Missing), + {ok, Entries} + catch + throw:{fetch_error, Reason} -> {error, Reason} + end. + +fetch_rev(Source, DocId, Rev) -> + case couchbeam:open_doc(Source, DocId, [{rev, Rev}, {revs, true}]) of + {ok, Doc} when is_map(Doc) -> + {Doc, #{}}; + {error, Reason} -> + throw({fetch_error, {DocId, Rev, Reason}}) + end. + +%%==================================================================== +%% Helpers +%%==================================================================== + +default_repl_id(#db{server = Server} = Source, TargetMod) -> + ServerUrl = couchbeam_httpc:server_url(Server), + DbUrl = couchbeam_httpc:db_url(Source), + Base = iolist_to_binary([ServerUrl, "/", DbUrl, "|", + atom_to_binary(TargetMod, utf8)]), + binary:encode_hex(crypto:hash(sha256, Base), lowercase). + +status_map(State, #data{repl_id = ReplId, since = Since, docs_written = N}) -> + #{state => State, repl_id => ReplId, since => Since, docs_written => N}. + +notify(#data{notify = undefined}, _Msg) -> + ok; +notify(#data{notify = Pid}, Msg) -> + Pid ! {self(), Msg}, + ok. diff --git a/src/couchbeam_replicator_ets.erl b/src/couchbeam_replicator_ets.erl new file mode 100644 index 00000000..cc2f061d --- /dev/null +++ b/src/couchbeam_replicator_ets.erl @@ -0,0 +1,141 @@ +%%% -*- erlang -*- +%%% +%%% This file is part of couchbeam released under the MIT license. +%%% See the NOTICE for more information. +%%% +%%% Reference `couchbeam_replicator' target backed by an ets or dets table. +%%% It stores every replicated revision (preserving `_revisions' history), +%%% the leaf revisions per document, and the replication checkpoint. +%%% +%%% Stored entries (a `set' table): +%%% {{doc, DocId, Rev}, {Doc, Atts}} +%%% {{leafs, DocId}, [Rev]} +%%% {{checkpoint, ReplId}, Seq} +%%% +%%% init/1 args is a map: +%%% #{table => atom(), %% table name (required) +%%% backend => ets | dets, %% default ets +%%% file => string()} %% dets file (default "