From 5ebc1bf1d5e886f09023fa61109574bb0acab28b Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Wed, 10 Jun 2026 14:54:37 +0200 Subject: [PATCH 1/2] pull replication: attachment bodies (phase 2) Fetch each missing revision via the multipart open_doc path and reassemble the document together with its attachment bodies, handed to the target as {Doc, #{AttName => Bin}}. Bytes are kept as delivered (encoding recorded in the doc's _attachments). The target behaviour contract is unchanged from phase 1. - couchbeam_replicator: fetch_rev uses {revs,true} + attachments via stream_doc - couchbeam: fix open_doc/3 and stream_doc/1 specs to include the multipart returns ({ok,{multipart,_}} and {doc, Doc, doc_stream()}) so dialyzer passes - e2e: pull_replication_attachments asserts the body is replicated verbatim and the _attachments stub is preserved - guide + CHANGELOG updated --- CHANGELOG.md | 6 ++--- guides/replication.md | 5 ++-- src/couchbeam.erl | 5 ++-- src/couchbeam_replicator.erl | 33 +++++++++++++++++++++---- test/couchbeam_integration_SUITE.erl | 37 ++++++++++++++++++++++++++-- 5 files changed, 72 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 96af3b6..6cf0a87 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,9 +7,9 @@ All notable changes to this project will be documented in this file. ### Added - `couchbeam_replicator`: a `gen_statem` pull-replication endpoint that imports - documents (with full revision history) from a source CouchDB into a pluggable - local target behaviour, with resumable checkpoints. `couchbeam_replicator_ets` - is an ets/dets reference target. Pull only; attachment bodies are a follow-up. + documents (with full revision history and attachment bodies) from a source + CouchDB into a pluggable local target behaviour, with resumable checkpoints. + `couchbeam_replicator_ets` is an ets/dets reference target. Pull only. See `guides/replication.md`. ### Changed diff --git a/guides/replication.md b/guides/replication.md index f8ae8a3..727509a 100644 --- a/guides/replication.md +++ b/guides/replication.md @@ -81,8 +81,9 @@ init(Args) -> {ok, State} | {error, term()}. revs_diff(IdRevs, State) -> {ok, Missing, State}. %% IdRevs :: [{DocId, [Rev]}]; return the subset not yet stored. write_docs(Entries, State) -> {ok, State} | {error, term()}. - %% Entries :: [{Doc, Atts}]; Doc is a map with _id/_rev/_revisions/ - %% (_deleted); Atts is #{} for now (attachment bodies come in a later phase). + %% Entries :: [{Doc, Atts}]; Doc is a map with _id/_rev/_revisions/(_deleted)/ + %% _attachments; Atts :: #{AttName => Bin} holds the attachment bodies as + %% delivered (the doc's _attachments metadata records content type/encoding). read_checkpoint(ReplId, State) -> {ok, Seq | nil, State}. write_checkpoint(ReplId, Seq, State) -> {ok, State}. terminate(Reason, State) -> any(). %% optional diff --git a/src/couchbeam.erl b/src/couchbeam.erl index 16593e7..420c51b 100644 --- a/src/couchbeam.erl +++ b/src/couchbeam.erl @@ -406,7 +406,8 @@ open_doc(Db, DocId) -> %% @doc open a document %% Params is a list of query argument. Have a look in CouchDb API --spec open_doc(db(), binary() | string(), list()) -> {ok, map()} | {error, term()}. +-spec open_doc(db(), binary() | string(), list()) -> + {ok, map()} | {ok, {multipart, doc_stream()}} | {error, term()}. open_doc(#db{server=Server, options=Opts}=Db, DocId, Params) -> DocId1 = couchbeam_util:encode_docid(DocId), @@ -490,7 +491,7 @@ decode_stream_doc(Ref) -> %% when you get `{ok, {multipart, State}}' from the function %% `couchbeam:open_doc/3'. -spec stream_doc(doc_stream()) -> - {doc, doc()} + {doc, doc(), doc_stream()} | {att, Name :: binary(), doc_stream()} | {att_body, Name :: binary(), Chunk :: binary(), doc_stream()} | {att_eof, Name :: binary(), doc_stream()} diff --git a/src/couchbeam_replicator.erl b/src/couchbeam_replicator.erl index d59898b..f76f739 100644 --- a/src/couchbeam_replicator.erl +++ b/src/couchbeam_replicator.erl @@ -10,8 +10,9 @@ %%% Replication is resumable: the target persists the last processed source %%% sequence under a stable replication id. %%% -%%% Pull only (CouchDB -> app). Attachment bodies are not fetched yet; each -%%% target entry is `{Doc, #{}}'. +%%% Pull only (CouchDB -> app). Each target entry is `{Doc, Atts}' where Atts +%%% maps attachment name to body bytes as delivered by CouchDB (the doc's +%%% `_attachments' metadata records content type and any encoding, e.g. gzip). -module(couchbeam_replicator). -behaviour(gen_statem). @@ -243,8 +244,7 @@ changes_to_idrevs(Changes) -> [maps:get(<<"rev">>, R) || R <- maps:get(<<"changes">>, C, [])]} || C <- Changes]. -%% Fetch each missing revision with its revision history (Phase 1: no -%% attachment bodies, so every entry carries `#{}'). +%% Fetch each missing revision with its revision history and attachment bodies. fetch_missing(Source, Missing) -> try Entries = lists:flatmap( @@ -256,14 +256,37 @@ fetch_missing(Source, Missing) -> throw:{fetch_error, Reason} -> {error, Reason} end. +%% A revision with attachments comes back as a multipart stream; one without +%% attachments (or a deletion) comes back as a plain JSON document. fetch_rev(Source, DocId, Rev) -> - case couchbeam:open_doc(Source, DocId, [{rev, Rev}, {revs, true}]) of + case couchbeam:open_doc(Source, DocId, + [{rev, Rev}, {revs, true}, {"attachments", true}]) of + {ok, {multipart, State}} -> + collect_multipart(State, undefined, <<>>, #{}); {ok, Doc} when is_map(Doc) -> {Doc, #{}}; {error, Reason} -> throw({fetch_error, {DocId, Rev, Reason}}) end. +%% Drive couchbeam:stream_doc/1 to reassemble the document and its attachment +%% bodies (kept as delivered; encoding is recorded in the doc's _attachments). +collect_multipart(State, Doc, Buf, Atts) -> + case couchbeam:stream_doc(State) of + {doc, NewDoc, NState} -> + collect_multipart(NState, NewDoc, Buf, Atts); + {att, _Name, NState} -> + collect_multipart(NState, Doc, <<>>, Atts); + {att_body, _Name, Chunk, NState} -> + collect_multipart(NState, Doc, <>, Atts); + {att_eof, Name, NState} -> + collect_multipart(NState, Doc, <<>>, Atts#{Name => Buf}); + eof -> + {Doc, Atts}; + {error, Reason} -> + throw({fetch_error, {multipart, Reason}}) + end. + %%==================================================================== %% Helpers %%==================================================================== diff --git a/test/couchbeam_integration_SUITE.erl b/test/couchbeam_integration_SUITE.erl index 803fa16..eeaa78f 100644 --- a/test/couchbeam_integration_SUITE.erl +++ b/test/couchbeam_integration_SUITE.erl @@ -100,7 +100,8 @@ %% Test cases - Pull replication endpoint (couchbeam_replicator) -export([pull_replication_basic/1, - pull_replication_resume/1]). + pull_replication_resume/1, + pull_replication_attachments/1]). %% Test cases - DB management operations -export([compact_database/1, @@ -222,7 +223,8 @@ groups() -> ]}, {pull_replication_ops, [sequence], [ pull_replication_basic, - pull_replication_resume + pull_replication_resume, + pull_replication_attachments ]}, {db_management_ops, [sequence], [ compact_database, @@ -1770,6 +1772,37 @@ pull_replication_resume(Config) -> couchbeam:delete_db(Db) end. +%% Attachment bodies are replicated alongside the document. +pull_replication_attachments(Config) -> + Server = ?config(server, Config), + DbName = iolist_to_binary([<<"couchbeam_pullatts_">>, + integer_to_binary(erlang:system_time(millisecond))]), + {ok, Db} = couchbeam:create_db(Server, DbName), + try + Content = <<"attachment-body-payload-0123456789">>, + {ok, D1} = couchbeam:save_doc(Db, #{<<"_id">> => <<"att1">>, <<"k">> => 1}), + Rev0 = couchbeam_doc:get_rev(D1), + %% octet-stream so CouchDB returns the bytes verbatim (text/* is gzipped) + {ok, _} = couchbeam:put_attachment(Db, <<"att1">>, <<"blob.bin">>, Content, + [{rev, Rev0}, {content_type, <<"application/octet-stream">>}]), + + Tab = new_repl_table(pull_atts), + {ok, _} = run_pull_oneshot(Db, Tab, <<"test-atts">>), + + [Rev] = couchbeam_replicator_ets:leaf_revs(Tab, <<"att1">>), + {Doc, Atts} = couchbeam_replicator_ets:get_doc(Tab, <<"att1">>, Rev), + %% body replicated verbatim + Content = maps:get(<<"blob.bin">>, Atts), + %% attachment metadata kept on the document + Stub = maps:get(<<"blob.bin">>, maps:get(<<"_attachments">>, Doc)), + <<"application/octet-stream">> = maps:get(<<"content_type">>, Stub), + + ct:pal("Pull replication attachments passed"), + ok + after + couchbeam:delete_db(Db) + end. + %% Create a fresh named ets table owned by the test process so it outlives the %% one-shot replicator. new_repl_table(Prefix) -> From a1a6a2a32759bec1543548212f28eb80c683d6c3 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Wed, 10 Jun 2026 15:08:00 +0200 Subject: [PATCH 2/2] test: e2e coverage for pull replication conflicts and continuous mode - pull_replication_conflicts: a doc with two conflicting leaf revisions (built via bulk_docs new_edits=false) replicates all leaves with history - pull_replication_continuous: continuous mode imports a doc added while the replicator is running, then stops cleanly --- test/couchbeam_integration_SUITE.erl | 80 +++++++++++++++++++++++++++- 1 file changed, 78 insertions(+), 2 deletions(-) diff --git a/test/couchbeam_integration_SUITE.erl b/test/couchbeam_integration_SUITE.erl index eeaa78f..33997cb 100644 --- a/test/couchbeam_integration_SUITE.erl +++ b/test/couchbeam_integration_SUITE.erl @@ -101,7 +101,9 @@ %% Test cases - Pull replication endpoint (couchbeam_replicator) -export([pull_replication_basic/1, pull_replication_resume/1, - pull_replication_attachments/1]). + pull_replication_attachments/1, + pull_replication_conflicts/1, + pull_replication_continuous/1]). %% Test cases - DB management operations -export([compact_database/1, @@ -224,7 +226,9 @@ groups() -> {pull_replication_ops, [sequence], [ pull_replication_basic, pull_replication_resume, - pull_replication_attachments + pull_replication_attachments, + pull_replication_conflicts, + pull_replication_continuous ]}, {db_management_ops, [sequence], [ compact_database, @@ -1803,6 +1807,78 @@ pull_replication_attachments(Config) -> couchbeam:delete_db(Db) end. +%% A conflicted document (multiple leaf revisions) replicates all leaves. +pull_replication_conflicts(Config) -> + Server = ?config(server, Config), + DbName = iolist_to_binary([<<"couchbeam_pullconf_">>, + integer_to_binary(erlang:system_time(millisecond))]), + {ok, Db} = couchbeam:create_db(Server, DbName), + try + %% build a rev tree with two conflicting gen-2 leaves sharing one root + Root = #{<<"_id">> => <<"conf">>, <<"_rev">> => <<"1-aaa">>, + <<"_revisions">> => #{<<"start">> => 1, <<"ids">> => [<<"aaa">>]}, + <<"v">> => 0}, + LeafA = #{<<"_id">> => <<"conf">>, <<"_rev">> => <<"2-bbb">>, + <<"_revisions">> => #{<<"start">> => 2, <<"ids">> => [<<"bbb">>, <<"aaa">>]}, + <<"v">> => 1}, + LeafB = #{<<"_id">> => <<"conf">>, <<"_rev">> => <<"2-ccc">>, + <<"_revisions">> => #{<<"start">> => 2, <<"ids">> => [<<"ccc">>, <<"aaa">>]}, + <<"v">> => 2}, + {ok, _} = couchbeam:save_docs(Db, [Root, LeafA, LeafB], [{new_edits, false}]), + + Tab = new_repl_table(pull_conf), + {ok, _} = run_pull_oneshot(Db, Tab, <<"test-conf">>), + + [<<"2-bbb">>, <<"2-ccc">>] = + lists:sort(couchbeam_replicator_ets:leaf_revs(Tab, <<"conf">>)), + {DA, #{}} = couchbeam_replicator_ets:get_doc(Tab, <<"conf">>, <<"2-bbb">>), + 2 = length(maps:get(<<"ids">>, maps:get(<<"_revisions">>, DA))), + + ct:pal("Pull replication conflicts passed"), + ok + after + couchbeam:delete_db(Db) + end. + +%% Continuous mode keeps importing changes added while it runs. +pull_replication_continuous(Config) -> + Server = ?config(server, Config), + DbName = iolist_to_binary([<<"couchbeam_pullcont_">>, + integer_to_binary(erlang:system_time(millisecond))]), + {ok, Db} = couchbeam:create_db(Server, DbName), + try + {ok, _} = couchbeam:save_doc(Db, #{<<"_id">> => <<"c1">>, <<"v">> => 1}), + Tab = new_repl_table(pull_cont), + {ok, Pid} = couchbeam_replicator:start_link( + Db, couchbeam_replicator_ets, + #{table => Tab, backend => ets}, + [{mode, continuous}, {interval, 300}, {repl_id, <<"test-cont">>}]), + try + ok = wait_until(fun() -> + couchbeam_replicator_ets:leaf_revs(Tab, <<"c1">>) =/= [] + end, 15000), + %% add a doc while the replication is running + {ok, _} = couchbeam:save_doc(Db, #{<<"_id">> => <<"c2">>, <<"v">> => 1}), + ok = wait_until(fun() -> + couchbeam_replicator_ets:leaf_revs(Tab, <<"c2">>) =/= [] + end, 15000) + after + couchbeam_replicator:stop(Pid) + end, + ct:pal("Pull replication continuous passed"), + ok + after + couchbeam:delete_db(Db) + end. + +wait_until(Fun, Timeout) when Timeout =< 0 -> + case Fun() of true -> ok; _ -> {error, timeout} end; +wait_until(Fun, Timeout) -> + case Fun() of + true -> ok; + _ -> timer:sleep(100), wait_until(Fun, Timeout - 100) + end. + %% Create a fresh named ets table owned by the test process so it outlives the %% one-shot replicator. new_repl_table(Prefix) ->