various changes made on laptop
[couchdbimport.git] / CouchProjects / CouchDb / couch_rep.erl
blob823afb986464c3f92f3ac076c194d16e188a8015
1 %% CouchDb
2 %% Copyright (C) 2006 Damien Katz
3 %%
4 %% This program is free software; you can redistribute it and/or
5 %% modify it under the terms of the GNU General Public License
6 %% as published by the Free Software Foundation; either version 2
7 %% of the License, or (at your option) any later version.
8 %%
9 %% This program is distributed in the hope that it will be useful,
10 %% but WITHOUT ANY WARRANTY; without even the implied warranty of
11 %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 %% GNU General Public License for more details.
13 %%
14 %% You should have received a copy of the GNU General Public License
15 %% along with this program; if not, write to the Free Software
16 %% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 -module(couch_rep).
20 -include("couch_db.hrl").
22 -export([replicate/2, test/0]).
24 -record(rep_hist,
25 {last_seq=0
26 }).
28 replicate(DbNameA, DbNameB) ->
29 DbA =
30 case DbNameA of
31 "http://" ++ _ ->
32 DbNameA;
33 _ ->
34 {ok, Db} = couch_server:open(DbNameA),
36 end,
37 DbB =
38 case DbNameB of
39 "http://" ++ _ ->
40 DbNameB;
41 _ ->
42 {ok, Db2} = couch_server:open(DbNameB),
43 Db2
44 end,
45 NewSeqNumB = pull_rep(DbA, DbB),
46 NewSeqNumA = pull_rep(DbB, DbA).
50 pull_rep(DbTarget, DbSource) ->
51 enum_docs_since(DbSource, 0,
52 fun(#doc_info{update_seq=Seq}=SrcDocInfo, _) ->
53 maybe_save_doc(DbTarget, DbSource, SrcDocInfo, 0),
54 {ok, Seq}
55 end, 0).
57 debug(_Fmt, _Args) ->
58 %io:format(_Fmt, _Args),
59 ok.
61 debug(_Msg) ->
62 %io:format("~p~n", [_Msg]),
63 ok.
65 do_http_request(Url, Action, XmlBody) ->
66 Request =
67 case XmlBody of
68 [] ->
69 {Url, []};
70 _ ->
71 {Url, [], "application/xml; charset=utf-8", XmlBody}
72 end,
73 {ok, {{_, ResponseCode,_},_Headers, ResponseBody}} = http:request(Action, Request, [], []),
75 ResponseCode >= 200, ResponseCode < 500 ->
76 mod_couch:parse_response(ResponseBody)
77 end.
80 enum_docs_since(DbUrl, StartSeq, InFun, InAcc) when is_list(DbUrl) ->
81 Url = DbUrl ++ "$all_docs_by_update_seq?startkey=" ++ integer_to_list(StartSeq),
82 case do_http_request(Url, get, []) of
83 {docinfo_by_update_seq, DocInfos} ->
84 Result =
85 (catch lists:foldl(fun(DocInfo, LocalAcc) ->
86 case InFun(DocInfo, LocalAcc) of
87 {ok, LocalAcc2} ->
88 LocalAcc2;
89 {stop, LocalAcc2} ->
90 throw({stop, LocalAcc2})
91 end
92 end,
93 InAcc, DocInfos)),
94 case Result of
95 {stop, InAcc2} ->
96 {ok, InAcc2};
97 InAcc2 ->
98 {ok, InAcc2}
99 end;
100 {error, Error} ->
101 throw(Error)
102 end;
103 enum_docs_since(DbSource, StartSeq, Fun, Acc) ->
104 couch_db:enum_docs_since(DbSource, StartSeq, Fun, Acc).
106 get_doc_info(DbUrl, DocId) when is_list(DbUrl) ->
107 Url = DbUrl ++ DocId ++ "?info=true",
108 case do_http_request(Url, get, []) of
109 {doc_info, DocInfo} ->
110 {ok, DocInfo};
111 {error, {not_found, missing}} ->
112 not_found;
113 {error, Error} ->
114 Error
115 end;
116 get_doc_info(Db, DocId) ->
117 couch_db:get_doc_info(Db, DocId).
119 save_doc(DbUrl, #doc{uuid=DocId}=Doc, Options) when is_list(DbUrl) ->
120 QueryOptionStrs =
121 lists:map(fun(suppress_new_rev) ->
122 "suppress_new_rev=true";
123 ({overwrite_rev, Rev}) ->
124 "overwrite_rev=" ++ couch_util:revid_to_list(Rev)
125 end, Options),
126 Xml = mod_couch:doc_to_xml(Doc),
127 Url = DbUrl ++ DocId ++ "?" ++ couch_util:implode(QueryOptionStrs, "&"),
128 case do_http_request(Url, put, Xml) of
129 {rev, RevId} ->
130 {ok, RevId};
131 {error, Error} ->
132 Error
133 end;
134 save_doc(Db, Doc, Options) ->
135 couch_db:save_doc(Db, Doc, Options).
137 open_doc(DbUrl, #doc_info{uuid=DocId}, Options) when is_list(DbUrl) ->
138 open_doc(DbUrl, DocId, Options);
139 open_doc(DbUrl, DocId, Options) when is_list(DbUrl) ->
140 QueryOptionStrs =
141 lists:map(fun(allow_stub) ->
142 "allow_stub=true"
143 end, Options),
145 Url = DbUrl ++ DocId ++ "?" ++ couch_util:implode(["full=true" | QueryOptionStrs], "&"),
146 case do_http_request(Url, get, []) of
147 {doc, Doc} ->
148 {ok, Doc};
149 {error, Error} ->
150 Error
151 end;
152 open_doc(Db, DocId, Options) ->
153 couch_db:open_doc(Db, DocId, Options).
155 maybe_save_doc(DbTarget, DbSource, #doc_info{uuid=Uuid, revision=SrcRev, deleted=SrcDel}=SrcDocInfo, RetryCount) ->
156 Result =
157 case get_doc_info(DbTarget, Uuid) of
158 {ok, #doc_info{revision=SrcRev, deleted=SrcDel}} ->
159 % we already have the same revision, nothing to do.
160 debug("Same revision, nothing to do."),
162 {ok, TarDocInfo} ->
163 case doc_to_write_to_target(DbTarget, TarDocInfo, DbSource, SrcDocInfo) of
164 none ->
165 debug("loser conflict or same revision"),
167 retry ->
168 debug("retrying"),
169 retry;
170 {DocToWrite, BasedOnTargetRev} ->
171 debug("updating document Target rev: ~p~n", [BasedOnTargetRev]),
172 case save_doc(DbTarget, DocToWrite, [{overwrite_rev, BasedOnTargetRev}, suppress_new_rev]) of
173 {ok, _NewRev} ->
175 conflict ->
176 debug("updated conflict writing ~p, trying again~n", [DocToWrite]),
177 % someone updated underneath us, try again
178 retry;
179 Error ->
180 Error
182 end;
183 not_found ->
184 case open_doc(DbSource, SrcDocInfo, [allow_stub]) of
185 {ok, SrcDoc} ->
186 debug("Saving new document to target."),
187 case save_doc(DbTarget, SrcDoc, [suppress_new_rev]) of
188 {ok, _} ->
189 ok;
190 conflict ->
191 % someone updated underneath us, try again
192 retry;
193 Error ->
194 couch_log:info("Error saving document ~s.", [Uuid]),
195 throw(Error)
196 end;
197 {not_found, _} ->
199 Error ->
200 % log error and abort
201 couch_log:info("Error opening document ~s.", [Uuid]),
202 throw(Error)
204 end,
205 case Result of
206 retry when RetryCount < 3 ->
207 maybe_save_doc(DbTarget, DbSource, SrcDocInfo, RetryCount + 1);
208 ok ->
210 end.
213 doc_to_write_to_target(DbTarget, TarDocInfo, DbSource, SrcDocInfo) ->
214 case open_doc(DbTarget, TarDocInfo, [allow_stub]) of
215 {ok, #doc{revisions=TarRevisions}=TarDoc} ->
216 case open_doc(DbSource, SrcDocInfo#doc_info.uuid, [allow_stub]) of
217 {ok, #doc{revisions=SrcRevisions}=SrcDoc} ->
218 [TarRev|_] = TarRevisions,
219 [SrcRev|_] = SrcRevisions,
220 case SrcRev == TarRev of
221 true ->
222 % the loaded docs are the same revision
223 none;
224 false ->
225 case lists:member(TarRev, SrcRevisions) of
226 true ->
227 % src doc is a later revision of target, src wins
228 {SrcDoc, TarRev};
229 false ->
230 case lists:member(SrcRev, TarRevisions) of
231 true ->
232 % target doc is a later revision of src, target wins
233 none;
234 false ->
235 % Conflict!!!!
236 debug("conflict"),
237 case conflict_winner(TarDoc, SrcDoc) of
238 TarDoc ->
239 % source is loser, do nothing
240 none;
241 SrcDoc ->
242 ConflictDoc = embed_conflict(SrcDoc, TarDoc),
243 {ConflictDoc, TarRev}
247 end;
248 {not_found, _} ->
249 none
250 end;
251 Error ->
252 debug("updated error: ~p~s", [Error]),
253 retry
254 end.
257 conflict_winner(#doc{deleted=false}=DocA, #doc{deleted=true}) ->
258 DocA;
259 conflict_winner(#doc{deleted=true}, #doc{deleted=false}=DocB) ->
260 DocB;
261 conflict_winner(#doc{revisions=RevsA}=DocA, #doc{revisions=RevsB}) when length(RevsA) > length(RevsB) ->
262 DocA;
263 conflict_winner(#doc{revisions=RevsA}, #doc{revisions=RevsB}=DocB) when length(RevsA) < length(RevsB)->
264 DocB;
265 conflict_winner(#doc{revisions=RevsA}=DocA, #doc{revisions=RevsB}) when RevsA < RevsB ->
266 DocA;
267 conflict_winner(_DocA, DocB) ->
268 DocB.
271 embed_conflict(Winner, _ConflictDoc) ->
272 % do nothing for now
273 Winner.
277 test() ->
278 couch_server:start(),
279 {ok, LocalA} = couch_server:create("replica_a", [overwrite]),
280 {ok, _} = couch_server:create("replica_b", [overwrite]),
281 DbA = "replica_a",
282 %DbB = "replica_b",
283 DbB = "http://localhost:8888/replica_b/",
284 DocUnids = test_write_docs(10, LocalA, []),
285 replicate(DbA, DbB),
286 io:format("about to delete"),
287 {ok, Rev} = couch_db:delete_doc(LocalA, lists:nth(1, DocUnids), any),
288 replicate(DbA, DbB).
292 test_write_docs(0, _Db, Output) ->
293 lists:reverse(Output);
294 test_write_docs(N, Db, Output) ->
295 Doc = couch_doc:new(),
296 Doc2 = couch_doc:set_field(Doc, "foo", [integer_to_list(N)]),
297 Doc3 = couch_doc:set_field(Doc2, "num", [N]),
298 Doc4 = couch_doc:set_field(Doc3, "bar", ["blah"]),
299 save_doc(Db, Doc4, []),
300 test_write_docs(N-1, Db, [Doc3#doc.uuid | Output]).