2 %% Copyright (C) 2006 Damien Katz
4 %% This program is free software; you can redistribute it and/or
5 %% modify it under the terms of the GNU General Public License
6 %% as published by the Free Software Foundation; either version 2
7 %% of the License, or (at your option) any later version.
9 %% This program is distributed in the hope that it will be useful,
10 %% but WITHOUT ANY WARRANTY; without even the implied warranty of
11 %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 %% GNU General Public License for more details.
14 %% You should have received a copy of the GNU General Public License
15 %% along with this program; if not, write to the Free Software
16 %% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20 -include("couch_db.hrl").
22 -export([replicate
/2, test
/0]).
28 replicate(DbNameA
, DbNameB
) ->
34 {ok
, Db
} = couch_server:open(DbNameA
),
42 {ok
, Db2
} = couch_server:open(DbNameB
),
45 NewSeqNumB
= pull_rep(DbA
, DbB
),
46 NewSeqNumA
= pull_rep(DbB
, DbA
).
50 pull_rep(DbTarget
, DbSource
) ->
51 enum_docs_since(DbSource
, 0,
52 fun(#doc_info
{update_seq
=Seq
}=SrcDocInfo
, _
) ->
53 maybe_save_doc(DbTarget
, DbSource
, SrcDocInfo
, 0),
58 %io:format(_Fmt, _Args),
62 %io:format("~p~n", [_Msg]),
65 do_http_request(Url
, Action
, XmlBody
) ->
71 {Url
, [], "application/xml; charset=utf-8", XmlBody
}
73 {ok
, {{_
, ResponseCode
,_
},_Headers
, ResponseBody
}} = http:request(Action
, Request
, [], []),
75 ResponseCode
>= 200, ResponseCode
< 500 ->
76 mod_couch:parse_response(ResponseBody
)
80 enum_docs_since(DbUrl
, StartSeq
, InFun
, InAcc
) when is_list(DbUrl
) ->
81 Url
= DbUrl
++ "$all_docs_by_update_seq?startkey=" ++ integer_to_list(StartSeq
),
82 case do_http_request(Url
, get, []) of
83 {docinfo_by_update_seq
, DocInfos
} ->
85 (catch lists:foldl(fun(DocInfo
, LocalAcc
) ->
86 case InFun(DocInfo
, LocalAcc
) of
90 throw({stop
, LocalAcc2
})
103 enum_docs_since(DbSource
, StartSeq
, Fun
, Acc
) ->
104 couch_db:enum_docs_since(DbSource
, StartSeq
, Fun
, Acc
).
106 get_doc_info(DbUrl
, DocId
) when is_list(DbUrl
) ->
107 Url
= DbUrl
++ DocId
++ "?info=true",
108 case do_http_request(Url
, get, []) of
109 {doc_info
, DocInfo
} ->
111 {error
, {not_found
, missing
}} ->
116 get_doc_info(Db
, DocId
) ->
117 couch_db:get_doc_info(Db
, DocId
).
119 save_doc(DbUrl
, #doc
{uuid
=DocId
}=Doc
, Options
) when is_list(DbUrl
) ->
121 lists:map(fun(suppress_new_rev
) ->
122 "suppress_new_rev=true";
123 ({overwrite_rev
, Rev
}) ->
124 "overwrite_rev=" ++ couch_util:revid_to_list(Rev
)
126 Xml
= mod_couch:doc_to_xml(Doc
),
127 Url
= DbUrl
++ DocId
++ "?" ++ couch_util:implode(QueryOptionStrs
, "&"),
128 case do_http_request(Url
, put, Xml
) of
134 save_doc(Db
, Doc
, Options
) ->
135 couch_db:save_doc(Db
, Doc
, Options
).
137 open_doc(DbUrl
, #doc_info
{uuid
=DocId
}, Options
) when is_list(DbUrl
) ->
138 open_doc(DbUrl
, DocId
, Options
);
139 open_doc(DbUrl
, DocId
, Options
) when is_list(DbUrl
) ->
141 lists:map(fun(allow_stub
) ->
145 Url
= DbUrl
++ DocId
++ "?" ++ couch_util:implode(["full=true" | QueryOptionStrs
], "&"),
146 case do_http_request(Url
, get, []) of
152 open_doc(Db
, DocId
, Options
) ->
153 couch_db:open_doc(Db
, DocId
, Options
).
155 maybe_save_doc(DbTarget
, DbSource
, #doc_info
{uuid
=Uuid
, revision
=SrcRev
, deleted
=SrcDel
}=SrcDocInfo
, RetryCount
) ->
157 case get_doc_info(DbTarget
, Uuid
) of
158 {ok
, #doc_info
{revision
=SrcRev
, deleted
=SrcDel
}} ->
159 % we already have the same revision, nothing to do.
160 debug("Same revision, nothing to do."),
163 case doc_to_write_to_target(DbTarget
, TarDocInfo
, DbSource
, SrcDocInfo
) of
165 debug("loser conflict or same revision"),
170 {DocToWrite
, BasedOnTargetRev
} ->
171 debug("updating document Target rev: ~p~n", [BasedOnTargetRev
]),
172 case save_doc(DbTarget
, DocToWrite
, [{overwrite_rev
, BasedOnTargetRev
}, suppress_new_rev
]) of
176 debug("updated conflict writing ~p, trying again~n", [DocToWrite
]),
177 % someone updated underneath us, try again
184 case open_doc(DbSource
, SrcDocInfo
, [allow_stub
]) of
186 debug("Saving new document to target."),
187 case save_doc(DbTarget
, SrcDoc
, [suppress_new_rev
]) of
191 % someone updated underneath us, try again
194 couch_log:info("Error saving document ~s.", [Uuid
]),
200 % log error and abort
201 couch_log:info("Error opening document ~s.", [Uuid
]),
206 retry
when RetryCount
< 3 ->
207 maybe_save_doc(DbTarget
, DbSource
, SrcDocInfo
, RetryCount
+ 1);
213 doc_to_write_to_target(DbTarget
, TarDocInfo
, DbSource
, SrcDocInfo
) ->
214 case open_doc(DbTarget
, TarDocInfo
, [allow_stub
]) of
215 {ok
, #doc
{revisions
=TarRevisions
}=TarDoc
} ->
216 case open_doc(DbSource
, SrcDocInfo#doc_info
.uuid
, [allow_stub
]) of
217 {ok
, #doc
{revisions
=SrcRevisions
}=SrcDoc
} ->
218 [TarRev
|_
] = TarRevisions
,
219 [SrcRev
|_
] = SrcRevisions
,
220 case SrcRev
== TarRev
of
222 % the loaded docs are the same revision
225 case lists:member(TarRev
, SrcRevisions
) of
227 % src doc is a later revision of target, src wins
230 case lists:member(SrcRev
, TarRevisions
) of
232 % target doc is a later revision of src, target wins
237 case conflict_winner(TarDoc
, SrcDoc
) of
239 % source is loser, do nothing
242 ConflictDoc
= embed_conflict(SrcDoc
, TarDoc
),
243 {ConflictDoc
, TarRev
}
252 debug("updated error: ~p~s", [Error
]),
257 conflict_winner(#doc
{deleted
=false
}=DocA
, #doc
{deleted
=true
}) ->
259 conflict_winner(#doc
{deleted
=true
}, #doc
{deleted
=false
}=DocB
) ->
261 conflict_winner(#doc
{revisions
=RevsA
}=DocA
, #doc
{revisions
=RevsB
}) when length(RevsA
) > length(RevsB
) ->
263 conflict_winner(#doc
{revisions
=RevsA
}, #doc
{revisions
=RevsB
}=DocB
) when length(RevsA
) < length(RevsB
)->
265 conflict_winner(#doc
{revisions
=RevsA
}=DocA
, #doc
{revisions
=RevsB
}) when RevsA
< RevsB
->
267 conflict_winner(_DocA
, DocB
) ->
271 embed_conflict(Winner
, _ConflictDoc
) ->
278 couch_server:start(),
279 {ok
, LocalA
} = couch_server:create("replica_a", [overwrite
]),
280 {ok
, _
} = couch_server:create("replica_b", [overwrite
]),
283 DbB
= "http://localhost:8888/replica_b/",
284 DocUnids
= test_write_docs(10, LocalA
, []),
286 io:format("about to delete"),
287 {ok
, Rev
} = couch_db:delete_doc(LocalA
, lists:nth(1, DocUnids
), any
),
292 test_write_docs(0, _Db
, Output
) ->
293 lists:reverse(Output
);
294 test_write_docs(N
, Db
, Output
) ->
295 Doc
= couch_doc:new(),
296 Doc2
= couch_doc:set_field(Doc
, "foo", [integer_to_list(N
)]),
297 Doc3
= couch_doc:set_field(Doc2
, "num", [N
]),
298 Doc4
= couch_doc:set_field(Doc3
, "bar", ["blah"]),
299 save_doc(Db
, Doc4
, []),
300 test_write_docs(N
-1, Db
, [Doc3#doc
.uuid
| Output
]).