add C lib package
[couchdbimport.git] / CouchProjects / CouchDb / couch_rep.erl
blob5a587b7c6acb146229462b4c9289bc2c9398bef8
1 %% CouchDb
2 %% Copyright (C) 2006 Damien Katz
3 %%
4 %% This program is free software; you can redistribute it and/or
5 %% modify it under the terms of the GNU General Public License
6 %% as published by the Free Software Foundation; either version 2
7 %% of the License, or (at your option) any later version.
8 %%
9 %% This program is distributed in the hope that it will be useful,
10 %% but WITHOUT ANY WARRANTY; without even the implied warranty of
11 %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 %% GNU General Public License for more details.
13 %%
14 %% You should have received a copy of the GNU General Public License
15 %% along with this program; if not, write to the Free Software
16 %% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 -module(couch_rep).
20 -include("couch_db.hrl").
22 -export([replicate/2, replicate/3, test/0, test_write_docs/3]).
24 -record(rep_rec,
25 {seq_a = 0,
26 seq_b = 0,
27 session_id = 0
28 }).
32 replicate(DbNameA, DbNameB) ->
33 replicate(DbNameA, DbNameB, []).
35 replicate(DbNameA, DbNameB, Options) ->
36 {ok, DbA} =
37 case DbNameA of
38 "http://" ++ _ -> {ok, DbNameA};
39 _ -> couch_server:open(DbNameA)
40 end,
42 {ok, DbB} =
43 case DbNameB of
44 "http://" ++ _ -> {ok, DbNameB};
45 _ -> couch_server:open(DbNameB)
46 end,
48 {ok, HostName} = inet:gethostname(),
50 RepRecKeyString =
51 HostName ++ "|" ++
52 case DbNameA > DbNameB of
53 % order the names consistently, in case they
54 % are entered in a different order in the args
55 true -> DbNameA ++ "|" ++ DbNameB;
56 false -> DbNameB ++ "|" ++ DbNameA
57 end,
59 <<RepRecHashedKeyInt:128/integer>> = erlang:md5(RepRecKeyString),
60 RepRecHashedKey = integer_to_list(RepRecHashedKeyInt),
62 {SeqNumA, SeqNumB} =
63 case lists:member(full, Options) of
64 true ->
65 {0, 0};
66 false ->
67 RepRecA = get_rep_rec(DbA, RepRecHashedKey),
68 RepRecB = get_rep_rec(DbB, RepRecHashedKey),
70 case RepRecA == RepRecB of
71 % if the records are identical, then we have a valid replication history
72 true ->
73 {RepRecA#rep_rec.seq_a, RepRecA#rep_rec.seq_b};
74 false ->
75 io:format("Performing full replication."),
76 {0, 0}
77 end
78 end,
80 NewSeqNumA = pull_rep(DbB, DbA, SeqNumA),
81 NewSeqNumB = pull_rep(DbA, DbB, SeqNumB),
83 case NewSeqNumA == SeqNumA andalso NewSeqNumB == SeqNumB of
84 true ->
85 % nothing changed, don't record results
86 ok;
87 false ->
88 % something changed, record results
89 NewRec = #rep_rec{
90 seq_a = NewSeqNumA,
91 seq_b = NewSeqNumB,
92 session_id = couch_util:rand32()},
94 ok = set_rep_rec(DbA, RepRecHashedKey, NewRec),
95 ok = set_rep_rec(DbB, RepRecHashedKey, NewRec)
96 end.
100 pull_rep(DbTarget, DbSource, SourceSeqNum) ->
101 {ok, NewSeq} =
102 enum_docs_since(DbSource, SourceSeqNum,
103 fun(#doc_info{update_seq=Seq}=SrcDocInfo, _) ->
104 maybe_save_docs(DbTarget, DbSource, SrcDocInfo),
105 {ok, Seq}
106 end, SourceSeqNum),
107 NewSeq.
110 maybe_save_docs(DbTarget, DbSource, #doc_info{uuid=Uuid, revision=SrcRev, conflict_revs=SrcConflicts}) ->
111 SrcRevs = [SrcRev | SrcConflicts],
112 {ok, [{Uuid, MissingRevs}]} = get_missing_revs(DbTarget, [{Uuid, SrcRevs}]),
114 case MissingRevs of
115 [] ->
117 _Else ->
119 % this 'ok' validates no serious or transaction oriented failures.
120 {ok, DocResults} = open_doc_revs(DbSource, Uuid, MissingRevs, [latest]),
122 % now we gloss over any errors opening the individual docs (security errors, etc).
123 Docs = [RevDoc || {ok, RevDoc} <- DocResults], % only match successful loads
126 case Docs of
127 [] ->
129 _ ->
130 % this 'ok' validates no serious or transaction oriented failures on save.
131 {ok, _RevUpdatesResults} = save_doc_revs(DbTarget, Docs, [])
132 end,
134 % RevUpdatesResults contains information about if each revision
135 % was successfully saved. Ignore for now, maybe forever. There may be
136 % a security, validation model failing each save, but that's allowable
137 % in succesful replication.
140 end.
144 get_rep_rec(Db, Key) ->
145 case open_doc(Db, ?NON_REP_DOC_PREFIX ++ Key, []) of
146 {ok, Doc} ->
147 [SeqA] = couch_doc:get_field(Doc, "seq_a", [0]),
148 [SeqB] = couch_doc:get_field(Doc, "seq_b", [0]),
149 [SessionId] = couch_doc:get_field(Doc, "session_id", [0]),
150 #rep_rec{seq_a=SeqA, seq_b=SeqB, session_id=SessionId};
151 {not_found, missing} ->
152 #rep_rec{}
153 end.
155 set_rep_rec(Db, Key, #rep_rec{seq_a=SeqA, seq_b=SeqB, session_id=SessionId}) ->
156 Doc = #doc{uuid= ?NON_REP_DOC_PREFIX ++ Key},
157 Doc2 = couch_doc:set_fields(Doc,
158 [{"seq_a", [SeqA]},
159 {"seq_b", [SeqB]},
160 {"session_id", [SessionId]}]),
161 {ok, 0} = save_doc(Db, Doc2, []),
164 do_http_request(Url, Action, XmlBody) ->
165 Request =
166 case XmlBody of
167 [] ->
168 {Url, []};
169 _ ->
170 {Url, [], "application/xml; charset=utf-8", XmlBody}
171 end,
172 {ok, {{_, ResponseCode,_},_Headers, ResponseBody}} = http:request(Action, Request, [], []),
174 ResponseCode >= 200, ResponseCode < 500 ->
175 mod_couch:parse_response(ResponseBody)
176 end.
179 enum_docs_since(DbUrl, StartSeq, InFun, InAcc) when is_list(DbUrl) ->
180 Url = DbUrl ++ "$all_docs_by_update_seq?startkey=" ++ integer_to_list(StartSeq),
181 case do_http_request(Url, get, []) of
182 {docs_by_update_seq, DocInfos} ->
183 Result =
184 (catch lists:foldl(fun(DocInfo, {ok, LocalAcc}) ->
185 case InFun(DocInfo, LocalAcc) of
186 {ok, LocalAcc2} ->
187 {ok, LocalAcc2};
188 {stop, LocalAcc2} ->
189 throw({stop, LocalAcc2})
191 end,
192 {ok, InAcc}, DocInfos)),
193 case Result of
194 {stop, InAcc2} ->
195 {ok, InAcc2};
196 {ok, InAcc2} ->
197 {ok, InAcc2}
198 end;
199 {error, Error} ->
200 throw(Error)
201 end;
202 enum_docs_since(DbSource, StartSeq, Fun, Acc) ->
203 couch_db:enum_docs_since(DbSource, StartSeq, Fun, Acc).
205 get_missing_revs(DbUrl, DocIdRevsList) when is_list(DbUrl) ->
206 Url = DbUrl ++ "$missing_revs",
207 DocsXmerl =
208 lists:map( fun({DocId, Revs}) ->
209 XmerlRevs = [{rev, [{rev, couch_util:revid_to_list(Rev)}], []} || Rev <- Revs],
210 {doc, [{id, DocId}], XmerlRevs}
211 end, DocIdRevsList),
213 Xml = lists:flatten(xmerl:export_simple([{missing_revs, [], DocsXmerl}], xmerl_xml, [])),
214 case do_http_request(Url, post, Xml) of
215 {missing_revs, DocMissingRevsList} ->
216 {ok, DocMissingRevsList};
217 {error, Error} ->
218 Error
219 end;
220 get_missing_revs(Db, DocId) ->
221 couch_db:get_missing_revs(Db, DocId).
224 save_doc(DbUrl, #doc{uuid=DocId}=Doc, Options) when is_list(DbUrl) ->
225 QueryOptionStrs =
226 lists:map(fun(suppress_new_rev) ->
227 "suppress_new_rev=true";
228 ({overwrite_rev, Rev}) ->
229 "overwrite_rev=" ++ couch_util:revid_to_list(Rev)
230 end, Options),
231 Xml = mod_couch:doc_to_xml(Doc),
232 Url = DbUrl ++ DocId ++ "?" ++ couch_util:implode(QueryOptionStrs, "&"),
233 case do_http_request(Url, put, Xml) of
234 {rev, RevId} ->
235 {ok, RevId};
236 {error, Error} ->
237 Error
238 end;
239 save_doc(Db, Doc, Options) ->
240 couch_db:save_doc(Db, Doc, Options).
243 save_doc_revs(DbUrl, [#doc{uuid=DocId}|_]=Docs, []) when is_list(DbUrl) ->
244 XmerlDocs =
245 [mod_couch:doc_to_xmerl(Doc, true) || Doc <- Docs],
246 Xml = lists:flatten(xmerl:export_simple([{docs, [{id,DocId}], XmerlDocs}], xmerl_xml, [])),
247 Url = DbUrl ++ DocId,
248 case do_http_request(Url, put, Xml) of
249 {update_results, Results} ->
250 {ok, Results};
251 {error, Error} ->
252 Error
253 end;
254 save_doc_revs(Db, Docs, Options) ->
255 couch_db:save_doc_revs(Db, Docs, Options).
258 open_doc(DbUrl, DocId, []) when is_list(DbUrl) ->
259 case do_http_request(DbUrl ++ DocId, get, []) of
260 {doc, Doc} ->
261 {ok, Doc};
262 {error, Error} ->
263 Error
264 end;
265 open_doc(Db, DocId, Options) when not is_list(Db) ->
266 couch_db:open_doc(Db, DocId, Options).
269 open_doc_revs(DbUrl, DocId, Revs, Options) when is_list(DbUrl) ->
270 QueryOptionStrs =
271 lists:map(fun(latest) ->
272 "latest=true"
273 end, Options),
275 RevsQueryStrs = ["rev=" ++ couch_util:revid_to_list(Rev) || Rev <- Revs],
276 Url = DbUrl ++ DocId ++ "?" ++ couch_util:implode(["full=true"] ++ QueryOptionStrs ++ RevsQueryStrs, "&"),
277 case do_http_request(Url, get, []) of
278 {doc_revs, Results} ->
279 {ok, Results};
280 {error, Error} ->
281 Error
282 end;
283 open_doc_revs(Db, DocId, Revs, Options) ->
284 couch_db:open_doc_revs(Db, DocId, Revs, Options).
290 test() ->
291 couch_server:start(),
292 %{ok, LocalA} = couch_server:open("replica_a"),
293 {ok, LocalA} = couch_server:create("replica_a", [overwrite]),
294 {ok, _} = couch_server:create("replica_b", [overwrite]),
295 %DbA = "replica_a",
296 DbA = "http://localhost:8888/replica_a/",
297 %DbB = "replica_b",
298 DbB = "http://localhost:8888/replica_b/",
299 DocUnids = test_write_docs(10, LocalA, []),
300 replicate(DbA, DbB),
301 {ok, _Rev} = couch_db:delete_doc(LocalA, lists:nth(1, DocUnids), any),
302 % replicate(DbA, DbB),
305 test_write_docs(0, _Db, Output) ->
306 lists:reverse(Output);
307 test_write_docs(N, Db, Output) ->
308 Doc = couch_doc:new(),
309 Doc2 = couch_doc:set_field(Doc, "foo", [integer_to_list(N)]),
310 Doc3 = couch_doc:set_field(Doc2, "num", [N]),
311 Doc4 = couch_doc:set_field(Doc3, "bar", ["blah"]),
312 couch_db:save_doc(Db, Doc4, []),
313 test_write_docs(N-1, Db, [Doc3#doc.uuid | Output]).