Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ All notable changes to this project will be documented in this file.
### Added

- `couchbeam_replicator`: a `gen_statem` pull-replication endpoint that imports
documents (with full revision history) from a source CouchDB into a pluggable
local target behaviour, with resumable checkpoints. `couchbeam_replicator_ets`
is an ets/dets reference target. Pull only; attachment bodies are a follow-up.
documents (with full revision history and attachment bodies) from a source
CouchDB into a pluggable local target behaviour, with resumable checkpoints.
`couchbeam_replicator_ets` is an ets/dets reference target. Pull only.
See `guides/replication.md`.

### Changed
Expand Down
5 changes: 3 additions & 2 deletions guides/replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ init(Args) -> {ok, State} | {error, term()}.
revs_diff(IdRevs, State) -> {ok, Missing, State}.
%% IdRevs :: [{DocId, [Rev]}]; return the subset not yet stored.
write_docs(Entries, State) -> {ok, State} | {error, term()}.
%% Entries :: [{Doc, Atts}]; Doc is a map with _id/_rev/_revisions/
%% (_deleted); Atts is #{} for now (attachment bodies come in a later phase).
%% Entries :: [{Doc, Atts}]; Doc is a map with _id/_rev/_revisions/(_deleted)/
%% _attachments; Atts :: #{AttName => Bin} holds the attachment bodies as
%% delivered (the doc's _attachments metadata records content type/encoding).
read_checkpoint(ReplId, State) -> {ok, Seq | nil, State}.
write_checkpoint(ReplId, Seq, State) -> {ok, State}.
terminate(Reason, State) -> any(). %% optional
Expand Down
5 changes: 3 additions & 2 deletions src/couchbeam.erl
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,8 @@ open_doc(Db, DocId) ->

%% @doc open a document
%% Params is a list of query argument. Have a look in CouchDb API
-spec open_doc(db(), binary() | string(), list()) -> {ok, map()} | {error, term()}.
-spec open_doc(db(), binary() | string(), list()) ->
{ok, map()} | {ok, {multipart, doc_stream()}} | {error, term()}.
open_doc(#db{server=Server, options=Opts}=Db, DocId, Params) ->
DocId1 = couchbeam_util:encode_docid(DocId),

Expand Down Expand Up @@ -490,7 +491,7 @@ decode_stream_doc(Ref) ->
%% when you get `{ok, {multipart, State}}' from the function
%% `couchbeam:open_doc/3'.
-spec stream_doc(doc_stream()) ->
{doc, doc()}
{doc, doc(), doc_stream()}
| {att, Name :: binary(), doc_stream()}
| {att_body, Name :: binary(), Chunk :: binary(), doc_stream()}
| {att_eof, Name :: binary(), doc_stream()}
Expand Down
33 changes: 28 additions & 5 deletions src/couchbeam_replicator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
%%% Replication is resumable: the target persists the last processed source
%%% sequence under a stable replication id.
%%%
%%% Pull only (CouchDB -> app). Attachment bodies are not fetched yet; each
%%% target entry is `{Doc, #{}}'.
%%% Pull only (CouchDB -> app). Each target entry is `{Doc, Atts}' where Atts
%%% maps attachment name to body bytes as delivered by CouchDB (the doc's
%%% `_attachments' metadata records content type and any encoding, e.g. gzip).

-module(couchbeam_replicator).
-behaviour(gen_statem).
Expand Down Expand Up @@ -243,8 +244,7 @@ changes_to_idrevs(Changes) ->
[maps:get(<<"rev">>, R) || R <- maps:get(<<"changes">>, C, [])]}
|| C <- Changes].

%% Fetch each missing revision with its revision history (Phase 1: no
%% attachment bodies, so every entry carries `#{}').
%% Fetch each missing revision with its revision history and attachment bodies.
fetch_missing(Source, Missing) ->
try
Entries = lists:flatmap(
Expand All @@ -256,14 +256,37 @@ fetch_missing(Source, Missing) ->
throw:{fetch_error, Reason} -> {error, Reason}
end.

%% A revision with attachments comes back as a multipart stream; one without
%% attachments (or a deletion) comes back as a plain JSON document.
fetch_rev(Source, DocId, Rev) ->
case couchbeam:open_doc(Source, DocId, [{rev, Rev}, {revs, true}]) of
case couchbeam:open_doc(Source, DocId,
[{rev, Rev}, {revs, true}, {"attachments", true}]) of
{ok, {multipart, State}} ->
collect_multipart(State, undefined, <<>>, #{});
{ok, Doc} when is_map(Doc) ->
{Doc, #{}};
{error, Reason} ->
throw({fetch_error, {DocId, Rev, Reason}})
end.

%% Drive couchbeam:stream_doc/1 to reassemble the document and its attachment
%% bodies (kept as delivered; encoding is recorded in the doc's _attachments).
collect_multipart(State, Doc, Buf, Atts) ->
case couchbeam:stream_doc(State) of
{doc, NewDoc, NState} ->
collect_multipart(NState, NewDoc, Buf, Atts);
{att, _Name, NState} ->
collect_multipart(NState, Doc, <<>>, Atts);
{att_body, _Name, Chunk, NState} ->
collect_multipart(NState, Doc, <<Buf/binary, Chunk/binary>>, Atts);
{att_eof, Name, NState} ->
collect_multipart(NState, Doc, <<>>, Atts#{Name => Buf});
eof ->
{Doc, Atts};
{error, Reason} ->
throw({fetch_error, {multipart, Reason}})
end.

%%====================================================================
%% Helpers
%%====================================================================
Expand Down
113 changes: 111 additions & 2 deletions test/couchbeam_integration_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@

%% Test cases - Pull replication endpoint (couchbeam_replicator)
-export([pull_replication_basic/1,
pull_replication_resume/1]).
pull_replication_resume/1,
pull_replication_attachments/1,
pull_replication_conflicts/1,
pull_replication_continuous/1]).

%% Test cases - DB management operations
-export([compact_database/1,
Expand Down Expand Up @@ -222,7 +225,10 @@ groups() ->
]},
{pull_replication_ops, [sequence], [
pull_replication_basic,
pull_replication_resume
pull_replication_resume,
pull_replication_attachments,
pull_replication_conflicts,
pull_replication_continuous
]},
{db_management_ops, [sequence], [
compact_database,
Expand Down Expand Up @@ -1770,6 +1776,109 @@ pull_replication_resume(Config) ->
couchbeam:delete_db(Db)
end.

%% Attachment bodies are replicated alongside the document.
pull_replication_attachments(Config) ->
Server = ?config(server, Config),
DbName = iolist_to_binary([<<"couchbeam_pullatts_">>,
integer_to_binary(erlang:system_time(millisecond))]),
{ok, Db} = couchbeam:create_db(Server, DbName),
try
Content = <<"attachment-body-payload-0123456789">>,
{ok, D1} = couchbeam:save_doc(Db, #{<<"_id">> => <<"att1">>, <<"k">> => 1}),
Rev0 = couchbeam_doc:get_rev(D1),
%% octet-stream so CouchDB returns the bytes verbatim (text/* is gzipped)
{ok, _} = couchbeam:put_attachment(Db, <<"att1">>, <<"blob.bin">>, Content,
[{rev, Rev0}, {content_type, <<"application/octet-stream">>}]),

Tab = new_repl_table(pull_atts),
{ok, _} = run_pull_oneshot(Db, Tab, <<"test-atts">>),

[Rev] = couchbeam_replicator_ets:leaf_revs(Tab, <<"att1">>),
{Doc, Atts} = couchbeam_replicator_ets:get_doc(Tab, <<"att1">>, Rev),
%% body replicated verbatim
Content = maps:get(<<"blob.bin">>, Atts),
%% attachment metadata kept on the document
Stub = maps:get(<<"blob.bin">>, maps:get(<<"_attachments">>, Doc)),
<<"application/octet-stream">> = maps:get(<<"content_type">>, Stub),

ct:pal("Pull replication attachments passed"),
ok
after
couchbeam:delete_db(Db)
end.

%% A conflicted document (multiple leaf revisions) replicates all leaves.
pull_replication_conflicts(Config) ->
Server = ?config(server, Config),
DbName = iolist_to_binary([<<"couchbeam_pullconf_">>,
integer_to_binary(erlang:system_time(millisecond))]),
{ok, Db} = couchbeam:create_db(Server, DbName),
try
%% build a rev tree with two conflicting gen-2 leaves sharing one root
Root = #{<<"_id">> => <<"conf">>, <<"_rev">> => <<"1-aaa">>,
<<"_revisions">> => #{<<"start">> => 1, <<"ids">> => [<<"aaa">>]},
<<"v">> => 0},
LeafA = #{<<"_id">> => <<"conf">>, <<"_rev">> => <<"2-bbb">>,
<<"_revisions">> => #{<<"start">> => 2, <<"ids">> => [<<"bbb">>, <<"aaa">>]},
<<"v">> => 1},
LeafB = #{<<"_id">> => <<"conf">>, <<"_rev">> => <<"2-ccc">>,
<<"_revisions">> => #{<<"start">> => 2, <<"ids">> => [<<"ccc">>, <<"aaa">>]},
<<"v">> => 2},
{ok, _} = couchbeam:save_docs(Db, [Root, LeafA, LeafB], [{new_edits, false}]),

Tab = new_repl_table(pull_conf),
{ok, _} = run_pull_oneshot(Db, Tab, <<"test-conf">>),

[<<"2-bbb">>, <<"2-ccc">>] =
lists:sort(couchbeam_replicator_ets:leaf_revs(Tab, <<"conf">>)),
{DA, #{}} = couchbeam_replicator_ets:get_doc(Tab, <<"conf">>, <<"2-bbb">>),
2 = length(maps:get(<<"ids">>, maps:get(<<"_revisions">>, DA))),

ct:pal("Pull replication conflicts passed"),
ok
after
couchbeam:delete_db(Db)
end.

%% Continuous mode keeps importing changes added while it runs.
pull_replication_continuous(Config) ->
Server = ?config(server, Config),
DbName = iolist_to_binary([<<"couchbeam_pullcont_">>,
integer_to_binary(erlang:system_time(millisecond))]),
{ok, Db} = couchbeam:create_db(Server, DbName),
try
{ok, _} = couchbeam:save_doc(Db, #{<<"_id">> => <<"c1">>, <<"v">> => 1}),
Tab = new_repl_table(pull_cont),
{ok, Pid} = couchbeam_replicator:start_link(
Db, couchbeam_replicator_ets,
#{table => Tab, backend => ets},
[{mode, continuous}, {interval, 300}, {repl_id, <<"test-cont">>}]),
try
ok = wait_until(fun() ->
couchbeam_replicator_ets:leaf_revs(Tab, <<"c1">>) =/= []
end, 15000),
%% add a doc while the replication is running
{ok, _} = couchbeam:save_doc(Db, #{<<"_id">> => <<"c2">>, <<"v">> => 1}),
ok = wait_until(fun() ->
couchbeam_replicator_ets:leaf_revs(Tab, <<"c2">>) =/= []
end, 15000)
after
couchbeam_replicator:stop(Pid)
end,
ct:pal("Pull replication continuous passed"),
ok
after
couchbeam:delete_db(Db)
end.

wait_until(Fun, Timeout) when Timeout =< 0 ->
case Fun() of true -> ok; _ -> {error, timeout} end;
wait_until(Fun, Timeout) ->
case Fun() of
true -> ok;
_ -> timer:sleep(100), wait_until(Fun, Timeout - 100)
end.

%% Create a fresh named ets table owned by the test process so it outlives the
%% one-shot replicator.
new_repl_table(Prefix) ->
Expand Down
Loading