2 %% Copyright (C) 2006 Damien Katz
4 %% This program is free software; you can redistribute it and/or
5 %% modify it under the terms of the GNU General Public License
6 %% as published by the Free Software Foundation; either version 2
7 %% of the License, or (at your option) any later version.
9 %% This program is distributed in the hope that it will be useful,
10 %% but WITHOUT ANY WARRANTY; without even the implied warranty of
11 %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 %% GNU General Public License for more details.
14 %% You should have received a copy of the GNU General Public License
15 %% along with this program; if not, write to the Free Software
16 %% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20 -include("couch_db.hrl").
22 -export([replicate
/2, replicate
/3, test
/0, test_write_docs
/3]).
32 replicate(DbNameA
, DbNameB
) ->
33 replicate(DbNameA
, DbNameB
, []).
35 replicate(DbNameA
, DbNameB
, Options
) ->
38 "http://" ++ _
-> {ok
, DbNameA
};
39 _
-> couch_server:open(DbNameA
)
44 "http://" ++ _
-> {ok
, DbNameB
};
45 _
-> couch_server:open(DbNameB
)
48 {ok
, HostName
} = inet:gethostname(),
52 case DbNameA
> DbNameB
of
53 % order the names consistently, in case they
54 % are entered in a different order in the args
55 true
-> DbNameA
++ "|" ++ DbNameB
;
56 false
-> DbNameB
++ "|" ++ DbNameA
59 <<RepRecHashedKeyInt:128/integer>> = erlang:md5(RepRecKeyString
),
60 RepRecHashedKey
= integer_to_list(RepRecHashedKeyInt
),
63 case lists:member(full
, Options
) of
67 RepRecA
= get_rep_rec(DbA
, RepRecHashedKey
),
68 RepRecB
= get_rep_rec(DbB
, RepRecHashedKey
),
70 case RepRecA
== RepRecB
of
71 % if the records are identical, then we have a valid replication history
73 {RepRecA#rep_rec
.seq_a
, RepRecA#rep_rec
.seq_b
};
75 io:format("Performing full replication."),
80 NewSeqNumA
= pull_rep(DbB
, DbA
, SeqNumA
),
81 NewSeqNumB
= pull_rep(DbA
, DbB
, SeqNumB
),
83 case NewSeqNumA
== SeqNumA andalso NewSeqNumB
== SeqNumB
of
85 % nothing changed, don't record results
88 % something changed, record results
92 session_id
= couch_util:rand32()},
94 ok
= set_rep_rec(DbA
, RepRecHashedKey
, NewRec
),
95 ok
= set_rep_rec(DbB
, RepRecHashedKey
, NewRec
)
100 pull_rep(DbTarget
, DbSource
, SourceSeqNum
) ->
102 enum_docs_since(DbSource
, SourceSeqNum
,
103 fun(#doc_info
{update_seq
=Seq
}=SrcDocInfo
, _
) ->
104 maybe_save_docs(DbTarget
, DbSource
, SrcDocInfo
),
110 maybe_save_docs(DbTarget
, DbSource
, #doc_info
{uuid
=Uuid
, revision
=SrcRev
, conflict_revs
=SrcConflicts
}) ->
111 SrcRevs
= [SrcRev
| SrcConflicts
],
112 {ok
, [{Uuid
, MissingRevs
}]} = get_missing_revs(DbTarget
, [{Uuid
, SrcRevs
}]),
119 % this 'ok' validates no serious or transaction oriented failures.
120 {ok
, DocResults
} = open_doc_revs(DbSource
, Uuid
, MissingRevs
, [latest
]),
122 % now we gloss over any errors opening the individual docs (security errors, etc).
123 Docs
= [RevDoc
|| {ok
, RevDoc
} <- DocResults
], % only match successful loads
130 % this 'ok' validates no serious or transaction oriented failures on save.
131 {ok
, _RevUpdatesResults
} = save_doc_revs(DbTarget
, Docs
, [])
134 % RevUpdatesResults contains information about if each revision
135 % was successfully saved. Ignore for now, maybe forever. There may be
136 % a security, validation model failing each save, but that's allowable
137 % in succesful replication.
144 get_rep_rec(Db
, Key
) ->
145 case open_doc(Db
, ?NON_REP_DOC_PREFIX
++ Key
, []) of
147 [SeqA
] = couch_doc:get_field(Doc
, "seq_a", [0]),
148 [SeqB
] = couch_doc:get_field(Doc
, "seq_b", [0]),
149 [SessionId
] = couch_doc:get_field(Doc
, "session_id", [0]),
150 #rep_rec
{seq_a
=SeqA
, seq_b
=SeqB
, session_id
=SessionId
};
151 {not_found
, missing
} ->
155 set_rep_rec(Db
, Key
, #rep_rec
{seq_a
=SeqA
, seq_b
=SeqB
, session_id
=SessionId
}) ->
156 Doc
= #doc
{uuid
= ?NON_REP_DOC_PREFIX
++ Key
},
157 Doc2
= couch_doc:set_fields(Doc
,
160 {"session_id", [SessionId
]}]),
161 {ok
, 0} = save_doc(Db
, Doc2
, []),
164 do_http_request(Url
, Action
, XmlBody
) ->
170 {Url
, [], "application/xml; charset=utf-8", XmlBody
}
172 {ok
, {{_
, ResponseCode
,_
},_Headers
, ResponseBody
}} = http:request(Action
, Request
, [], []),
174 ResponseCode
>= 200, ResponseCode
< 500 ->
175 mod_couch:parse_response(ResponseBody
)
179 enum_docs_since(DbUrl
, StartSeq
, InFun
, InAcc
) when is_list(DbUrl
) ->
180 Url
= DbUrl
++ "$all_docs_by_update_seq?startkey=" ++ integer_to_list(StartSeq
),
181 case do_http_request(Url
, get, []) of
182 {docs_by_update_seq
, DocInfos
} ->
184 (catch lists:foldl(fun(DocInfo
, {ok
, LocalAcc
}) ->
185 case InFun(DocInfo
, LocalAcc
) of
189 throw({stop
, LocalAcc2
})
192 {ok
, InAcc
}, DocInfos
)),
202 enum_docs_since(DbSource
, StartSeq
, Fun
, Acc
) ->
203 couch_db:enum_docs_since(DbSource
, StartSeq
, Fun
, Acc
).
205 get_missing_revs(DbUrl
, DocIdRevsList
) when is_list(DbUrl
) ->
206 Url
= DbUrl
++ "$missing_revs",
208 lists:map( fun({DocId
, Revs
}) ->
209 XmerlRevs
= [{rev
, [{rev
, couch_util:revid_to_list(Rev
)}], []} || Rev
<- Revs
],
210 {doc
, [{id
, DocId
}], XmerlRevs
}
213 Xml
= lists:flatten(xmerl:export_simple([{missing_revs
, [], DocsXmerl
}], xmerl_xml
, [])),
214 case do_http_request(Url
, post
, Xml
) of
215 {missing_revs
, DocMissingRevsList
} ->
216 {ok
, DocMissingRevsList
};
220 get_missing_revs(Db
, DocId
) ->
221 couch_db:get_missing_revs(Db
, DocId
).
224 save_doc(DbUrl
, #doc
{uuid
=DocId
}=Doc
, Options
) when is_list(DbUrl
) ->
226 lists:map(fun(suppress_new_rev
) ->
227 "suppress_new_rev=true";
228 ({overwrite_rev
, Rev
}) ->
229 "overwrite_rev=" ++ couch_util:revid_to_list(Rev
)
231 Xml
= mod_couch:doc_to_xml(Doc
),
232 Url
= DbUrl
++ DocId
++ "?" ++ couch_util:implode(QueryOptionStrs
, "&"),
233 case do_http_request(Url
, put, Xml
) of
239 save_doc(Db
, Doc
, Options
) ->
240 couch_db:save_doc(Db
, Doc
, Options
).
243 save_doc_revs(DbUrl
, [#doc
{uuid
=DocId
}|_
]=Docs
, []) when is_list(DbUrl
) ->
245 [mod_couch:doc_to_xmerl(Doc
, true
) || Doc
<- Docs
],
246 Xml
= lists:flatten(xmerl:export_simple([{docs
, [{id
,DocId
}], XmerlDocs
}], xmerl_xml
, [])),
247 Url
= DbUrl
++ DocId
,
248 case do_http_request(Url
, put, Xml
) of
249 {update_results
, Results
} ->
254 save_doc_revs(Db
, Docs
, Options
) ->
255 couch_db:save_doc_revs(Db
, Docs
, Options
).
258 open_doc(DbUrl
, DocId
, []) when is_list(DbUrl
) ->
259 case do_http_request(DbUrl
++ DocId
, get, []) of
265 open_doc(Db
, DocId
, Options
) when not
is_list(Db
) ->
266 couch_db:open_doc(Db
, DocId
, Options
).
269 open_doc_revs(DbUrl
, DocId
, Revs
, Options
) when is_list(DbUrl
) ->
271 lists:map(fun(latest
) ->
275 RevsQueryStrs
= ["rev=" ++ couch_util:revid_to_list(Rev
) || Rev
<- Revs
],
276 Url
= DbUrl
++ DocId
++ "?" ++ couch_util:implode(["full=true"] ++ QueryOptionStrs
++ RevsQueryStrs
, "&"),
277 case do_http_request(Url
, get, []) of
278 {doc_revs
, Results
} ->
283 open_doc_revs(Db
, DocId
, Revs
, Options
) ->
284 couch_db:open_doc_revs(Db
, DocId
, Revs
, Options
).
291 couch_server:start(),
292 %{ok, LocalA} = couch_server:open("replica_a"),
293 {ok
, LocalA
} = couch_server:create("replica_a", [overwrite
]),
294 {ok
, _
} = couch_server:create("replica_b", [overwrite
]),
296 DbA
= "http://localhost:8888/replica_a/",
298 DbB
= "http://localhost:8888/replica_b/",
299 DocUnids
= test_write_docs(10, LocalA
, []),
301 {ok
, _Rev
} = couch_db:delete_doc(LocalA
, lists:nth(1, DocUnids
), any
),
302 % replicate(DbA, DbB),
305 test_write_docs(0, _Db
, Output
) ->
306 lists:reverse(Output
);
307 test_write_docs(N
, Db
, Output
) ->
308 Doc
= couch_doc:new(),
309 Doc2
= couch_doc:set_field(Doc
, "foo", [integer_to_list(N
)]),
310 Doc3
= couch_doc:set_field(Doc2
, "num", [N
]),
311 Doc4
= couch_doc:set_field(Doc3
, "bar", ["blah"]),
312 couch_db:save_doc(Db
, Doc4
, []),
313 test_write_docs(N
-1, Db
, [Doc3#doc
.uuid
| Output
]).