2 * replay.c : entry point for replay RA functions for ra_serf
4 * ====================================================================
5 * Copyright (c) 2006-2007 CollabNet. All rights reserved.
7 * This software is licensed as described in the file COPYING, which
8 * you should have received as part of this distribution. The terms
9 * are also available at http://subversion.tigris.org/license-1.html.
10 * If newer versions of this license are posted there, you may use a
11 * newer version instead, at your option.
13 * This software consists of voluntary contributions made by many
14 * individuals. For exact contribution history, see the revision
15 * history and replays, available at http://subversion.tigris.org/.
16 * ====================================================================
27 #include "svn_pools.h"
31 #include "../libsvn_ra/ra_loader.h"
32 #include "svn_config.h"
33 #include "svn_delta.h"
34 #include "svn_base64.h"
35 #include "svn_version.h"
37 #include "svn_private_config.h"
43 * This enum represents the current state of our XML parsing.
57 typedef struct replay_info_t replay_info_t
;
59 struct replay_info_t
{
65 replay_info_t
*parent
;
69 (*change_prop_t
)(void *baton
,
71 const svn_string_t
*value
,
80 svn_boolean_t del_prop
;
85 replay_info_t
*parent
;
91 /* Are we done fetching this file? */
93 svn_ra_serf__list_t
**done_list
;
94 svn_ra_serf__list_t done_item
;
96 /* callback to get an editor */
97 svn_ra_replay_revstart_callback_t revstart_func
;
98 svn_ra_replay_revfinish_callback_t revfinish_func
;
101 /* replay receiver function and baton */
102 const svn_delta_editor_t
*editor
;
105 /* current revision */
106 svn_revnum_t revision
;
108 /* Information needed to create the replay report body */
109 svn_revnum_t low_water_mark
;
110 svn_boolean_t send_deltas
;
115 /* Revision properties for this revision. */
116 apr_hash_t
*revs_props
;
123 push_state(svn_ra_serf__xml_parser_t
*parser
,
124 replay_context_t
*replay_ctx
,
125 replay_state_e state
)
127 svn_ra_serf__xml_push_state(parser
, state
);
129 if (state
== OPEN_DIR
|| state
== ADD_DIR
||
130 state
== OPEN_FILE
|| state
== ADD_FILE
)
134 info
= apr_palloc(parser
->state
->pool
, sizeof(*info
));
136 info
->pool
= parser
->state
->pool
;
137 info
->parent
= parser
->state
->private;
141 parser
->state
->private = info
;
143 else if (state
== CHANGE_PROP
)
147 info
= apr_pcalloc(parser
->state
->pool
, sizeof(*info
));
149 info
->pool
= parser
->state
->pool
;
150 info
->parent
= parser
->state
->private;
152 parser
->state
->private = info
;
155 return parser
->state
->private;
159 start_replay(svn_ra_serf__xml_parser_t
*parser
,
161 svn_ra_serf__dav_props_t name
,
164 replay_context_t
*ctx
= userData
;
165 replay_state_e state
;
167 state
= parser
->state
->current_state
;
170 strcmp(name
.name
, "editor-report") == 0)
172 push_state(parser
, ctx
, REPORT
);
173 ctx
->props
= apr_hash_make(ctx
->pool
);
175 svn_ra_serf__walk_all_props(ctx
->revs_props
, ctx
->vcc_url
, ctx
->revision
,
176 svn_ra_serf__set_bare_props
,
177 ctx
->props
, ctx
->pool
);
178 SVN_ERR(ctx
->revstart_func(ctx
->revision
, ctx
->replay_baton
,
179 &ctx
->editor
, &ctx
->editor_baton
,
183 else if (state
== REPORT
&&
184 strcmp(name
.name
, "target-revision") == 0)
188 rev
= svn_xml_get_attr_value("rev", attrs
);
191 return svn_error_create(SVN_ERR_RA_DAV_MALFORMED_DATA
, NULL
,
192 _("Missing revision attr in target-revision element"));
195 SVN_ERR(ctx
->editor
->set_target_revision(ctx
->editor_baton
,
197 parser
->state
->pool
));
199 else if (state
== REPORT
&&
200 strcmp(name
.name
, "open-root") == 0)
205 rev
= svn_xml_get_attr_value("rev", attrs
);
209 return svn_error_create(SVN_ERR_RA_DAV_MALFORMED_DATA
, NULL
,
210 _("Missing revision attr in open-root element"));
213 info
= push_state(parser
, ctx
, OPEN_DIR
);
215 SVN_ERR(ctx
->editor
->open_root(ctx
->editor_baton
,
216 SVN_STR_TO_REV(rev
), parser
->state
->pool
,
219 else if ((state
== OPEN_DIR
|| state
== ADD_DIR
) &&
220 strcmp(name
.name
, "delete-entry") == 0)
222 const char *file_name
, *rev
;
225 file_name
= svn_xml_get_attr_value("name", attrs
);
228 return svn_error_create(SVN_ERR_RA_DAV_MALFORMED_DATA
, NULL
,
229 _("Missing name attr in delete-entry element"));
231 rev
= svn_xml_get_attr_value("rev", attrs
);
234 return svn_error_create(SVN_ERR_RA_DAV_MALFORMED_DATA
, NULL
,
235 _("Missing revision attr in delete-entry element"));
238 info
= push_state(parser
, ctx
, DELETE_ENTRY
);
240 SVN_ERR(ctx
->editor
->delete_entry(file_name
, SVN_STR_TO_REV(rev
),
241 info
->baton
, parser
->state
->pool
));
243 svn_ra_serf__xml_pop_state(parser
);
245 else if ((state
== OPEN_DIR
|| state
== ADD_DIR
) &&
246 strcmp(name
.name
, "open-directory") == 0)
248 const char *rev
, *dirname
;
251 dirname
= svn_xml_get_attr_value("name", attrs
);
254 return svn_error_create(SVN_ERR_RA_DAV_MALFORMED_DATA
, NULL
,
255 _("Missing name attr in open-directory element"));
257 rev
= svn_xml_get_attr_value("rev", attrs
);
260 return svn_error_create(SVN_ERR_RA_DAV_MALFORMED_DATA
, NULL
,
261 _("Missing revision attr in open-directory element"));
264 info
= push_state(parser
, ctx
, OPEN_DIR
);
266 SVN_ERR(ctx
->editor
->open_directory(dirname
, info
->parent
->baton
,
268 parser
->state
->pool
, &info
->baton
));
270 else if ((state
== OPEN_DIR
|| state
== ADD_DIR
) &&
271 strcmp(name
.name
, "add-directory") == 0)
273 const char *dir_name
, *copyfrom
, *copyrev
;
277 dir_name
= svn_xml_get_attr_value("name", attrs
);
280 return svn_error_create(SVN_ERR_RA_DAV_MALFORMED_DATA
, NULL
,
281 _("Missing name attr in add-directory element"));
283 copyfrom
= svn_xml_get_attr_value("copyfrom-path", attrs
);
284 copyrev
= svn_xml_get_attr_value("copyfrom-rev", attrs
);
287 rev
= SVN_STR_TO_REV(copyrev
);
289 rev
= SVN_INVALID_REVNUM
;
291 info
= push_state(parser
, ctx
, ADD_DIR
);
293 SVN_ERR(ctx
->editor
->add_directory(dir_name
, info
->parent
->baton
,
295 parser
->state
->pool
, &info
->baton
));
297 else if ((state
== OPEN_DIR
|| state
== ADD_DIR
) &&
298 strcmp(name
.name
, "close-directory") == 0)
300 replay_info_t
*info
= parser
->state
->private;
302 SVN_ERR(ctx
->editor
->close_directory(info
->baton
, parser
->state
->pool
));
304 svn_ra_serf__xml_pop_state(parser
);
306 else if ((state
== OPEN_DIR
|| state
== ADD_DIR
) &&
307 strcmp(name
.name
, "open-file") == 0)
309 const char *file_name
, *rev
;
312 file_name
= svn_xml_get_attr_value("name", attrs
);
315 return svn_error_create(SVN_ERR_RA_DAV_MALFORMED_DATA
, NULL
,
316 _("Missing name attr in open-file element"));
318 rev
= svn_xml_get_attr_value("rev", attrs
);
321 return svn_error_create(SVN_ERR_RA_DAV_MALFORMED_DATA
, NULL
,
322 _("Missing revision attr in open-file element"));
325 info
= push_state(parser
, ctx
, OPEN_FILE
);
327 SVN_ERR(ctx
->editor
->open_file(file_name
, info
->parent
->baton
,
329 parser
->state
->pool
, &info
->baton
));
331 else if ((state
== OPEN_DIR
|| state
== ADD_DIR
) &&
332 strcmp(name
.name
, "add-file") == 0)
334 const char *file_name
, *copyfrom
, *copyrev
;
338 file_name
= svn_xml_get_attr_value("name", attrs
);
341 return svn_error_create(SVN_ERR_RA_DAV_MALFORMED_DATA
, NULL
,
342 _("Missing name attr in add-file element"));
344 copyfrom
= svn_xml_get_attr_value("copyfrom-path", attrs
);
345 copyrev
= svn_xml_get_attr_value("copyfrom-rev", attrs
);
347 info
= push_state(parser
, ctx
, ADD_FILE
);
350 rev
= SVN_STR_TO_REV(copyrev
);
352 rev
= SVN_INVALID_REVNUM
;
354 SVN_ERR(ctx
->editor
->add_file(file_name
, info
->parent
->baton
,
356 parser
->state
->pool
, &info
->baton
));
358 else if ((state
== OPEN_FILE
|| state
== ADD_FILE
) &&
359 strcmp(name
.name
, "apply-textdelta") == 0)
361 const char *checksum
;
363 svn_txdelta_window_handler_t textdelta
;
364 void *textdelta_baton
;
365 svn_stream_t
*delta_stream
;
367 info
= push_state(parser
, ctx
, APPLY_TEXTDELTA
);
369 checksum
= svn_xml_get_attr_value("checksum", attrs
);
372 checksum
= apr_pstrdup(info
->pool
, checksum
);
375 SVN_ERR(ctx
->editor
->apply_textdelta(info
->baton
, checksum
,
380 delta_stream
= svn_txdelta_parse_svndiff(textdelta
, textdelta_baton
,
382 info
->stream
= svn_base64_decode(delta_stream
, info
->pool
);
384 else if ((state
== OPEN_FILE
|| state
== ADD_FILE
) &&
385 strcmp(name
.name
, "close-file") == 0)
387 replay_info_t
*info
= parser
->state
->private;
388 const char *checksum
;
390 checksum
= svn_xml_get_attr_value("checksum", attrs
);
392 SVN_ERR(ctx
->editor
->close_file(info
->baton
, checksum
,
393 parser
->state
->pool
));
395 svn_ra_serf__xml_pop_state(parser
);
397 else if (((state
== OPEN_FILE
|| state
== ADD_FILE
) &&
398 strcmp(name
.name
, "change-file-prop") == 0) ||
399 ((state
== OPEN_DIR
|| state
== ADD_DIR
) &&
400 strcmp(name
.name
, "change-dir-prop") == 0))
402 const char *prop_name
;
405 prop_name
= svn_xml_get_attr_value("name", attrs
);
408 return svn_error_createf(SVN_ERR_RA_DAV_MALFORMED_DATA
, NULL
,
409 _("Missing name attr in %s element"),
413 info
= push_state(parser
, ctx
, CHANGE_PROP
);
415 info
->name
= apr_pstrdup(parser
->state
->pool
, prop_name
);
417 if (svn_xml_get_attr_value("del", attrs
))
418 info
->del_prop
= TRUE
;
420 info
->del_prop
= FALSE
;
422 if (state
== OPEN_FILE
|| state
== ADD_FILE
)
423 info
->change
= ctx
->editor
->change_file_prop
;
425 info
->change
= ctx
->editor
->change_dir_prop
;
433 end_replay(svn_ra_serf__xml_parser_t
*parser
,
435 svn_ra_serf__dav_props_t name
)
437 replay_context_t
*ctx
= userData
;
438 replay_state_e state
;
442 state
= parser
->state
->current_state
;
444 if (state
== REPORT
&&
445 strcmp(name
.name
, "editor-report") == 0)
447 svn_ra_serf__xml_pop_state(parser
);
448 SVN_ERR(ctx
->revfinish_func(ctx
->revision
, ctx
->replay_baton
,
449 ctx
->editor
, ctx
->editor_baton
,
453 else if (state
== OPEN_DIR
&& strcmp(name
.name
, "open-directory") == 0)
455 /* Don't do anything. */
457 else if (state
== ADD_DIR
&& strcmp(name
.name
, "add-directory") == 0)
459 /* Don't do anything. */
461 else if (state
== OPEN_FILE
&& strcmp(name
.name
, "open-file") == 0)
463 /* Don't do anything. */
465 else if (state
== ADD_FILE
&& strcmp(name
.name
, "add-file") == 0)
467 /* Don't do anything. */
469 else if ((state
== OPEN_FILE
|| state
== ADD_FILE
) &&
470 strcmp(name
.name
, "close-file") == 0)
472 /* Don't do anything. */
474 else if ((state
== APPLY_TEXTDELTA
) &&
475 strcmp(name
.name
, "apply-textdelta") == 0)
477 replay_info_t
*info
= parser
->state
->private;
478 SVN_ERR(svn_stream_close(info
->stream
));
479 svn_ra_serf__xml_pop_state(parser
);
481 else if (state
== CHANGE_PROP
&&
482 (strcmp(name
.name
, "change-file-prop") == 0 ||
483 strcmp(name
.name
, "change-dir-prop") == 0))
485 prop_info_t
*info
= parser
->state
->private;
486 const svn_string_t
*prop_val
;
488 if (info
->del_prop
== TRUE
)
494 svn_string_t tmp_prop
;
496 tmp_prop
.data
= info
->data
;
497 tmp_prop
.len
= info
->len
;
499 prop_val
= svn_base64_decode_string(&tmp_prop
, parser
->state
->pool
);
502 SVN_ERR(info
->change(info
->parent
->baton
, info
->name
, prop_val
,
503 info
->parent
->pool
));
504 svn_ra_serf__xml_pop_state(parser
);
511 cdata_replay(svn_ra_serf__xml_parser_t
*parser
,
516 replay_context_t
*replay_ctx
= userData
;
517 replay_state_e state
;
519 UNUSED_CTX(replay_ctx
);
521 state
= parser
->state
->current_state
;
523 if (state
== APPLY_TEXTDELTA
)
525 replay_info_t
*info
= parser
->state
->private;
530 SVN_ERR(svn_stream_write(info
->stream
, data
, &written
));
533 return svn_error_create(SVN_ERR_STREAM_UNEXPECTED_EOF
, NULL
,
534 _("Error writing stream: unexpected EOF"));
536 else if (state
== CHANGE_PROP
)
538 prop_info_t
*info
= parser
->state
->private;
540 svn_ra_serf__expand_string(&info
->data
, &info
->len
,
541 data
, len
, parser
->state
->pool
);
547 static serf_bucket_t
*
548 create_replay_body(void *baton
,
549 serf_bucket_alloc_t
*alloc
,
552 replay_context_t
*ctx
= baton
;
553 serf_bucket_t
*body_bkt
, *tmp
;
555 body_bkt
= serf_bucket_aggregate_create(alloc
);
557 tmp
= SERF_BUCKET_SIMPLE_STRING_LEN("<S:replay-report xmlns:S=\"",
558 sizeof("<S:replay-report xmlns:S=\"")-1,
560 serf_bucket_aggregate_append(body_bkt
, tmp
);
562 tmp
= SERF_BUCKET_SIMPLE_STRING_LEN(SVN_XML_NAMESPACE
,
563 sizeof(SVN_XML_NAMESPACE
)-1,
565 serf_bucket_aggregate_append(body_bkt
, tmp
);
567 tmp
= SERF_BUCKET_SIMPLE_STRING_LEN("\">",
570 serf_bucket_aggregate_append(body_bkt
, tmp
);
572 svn_ra_serf__add_tag_buckets(body_bkt
,
573 "S:revision", apr_ltoa(ctx
->pool
, ctx
->revision
),
575 svn_ra_serf__add_tag_buckets(body_bkt
,
577 apr_ltoa(ctx
->pool
, ctx
->low_water_mark
),
580 svn_ra_serf__add_tag_buckets(body_bkt
,
582 apr_ltoa(ctx
->pool
, ctx
->send_deltas
),
585 tmp
= SERF_BUCKET_SIMPLE_STRING_LEN("</S:replay-report>",
586 sizeof("</S:replay-report>")-1,
588 serf_bucket_aggregate_append(body_bkt
, tmp
);
594 svn_ra_serf__replay(svn_ra_session_t
*ra_session
,
595 svn_revnum_t revision
,
596 svn_revnum_t low_water_mark
,
597 svn_boolean_t send_deltas
,
598 const svn_delta_editor_t
*editor
,
602 replay_context_t
*replay_ctx
;
603 svn_ra_serf__session_t
*session
= ra_session
->priv
;
604 svn_ra_serf__handler_t
*handler
;
605 svn_ra_serf__xml_parser_t
*parser_ctx
;
607 replay_ctx
= apr_pcalloc(pool
, sizeof(*replay_ctx
));
608 replay_ctx
->pool
= pool
;
609 replay_ctx
->editor
= editor
;
610 replay_ctx
->editor_baton
= edit_baton
;
611 replay_ctx
->done
= FALSE
;
612 replay_ctx
->low_water_mark
= low_water_mark
;
613 replay_ctx
->send_deltas
= send_deltas
;
615 handler
= apr_pcalloc(pool
, sizeof(*handler
));
617 handler
->method
= "REPORT";
618 handler
->path
= session
->repos_url_str
;
619 handler
->body_delegate
= create_replay_body
;
620 handler
->body_delegate_baton
= replay_ctx
;
621 handler
->body_type
= "text/xml";
622 handler
->conn
= session
->conns
[0];
623 handler
->session
= session
;
625 parser_ctx
= apr_pcalloc(pool
, sizeof(*parser_ctx
));
627 parser_ctx
->pool
= pool
;
628 parser_ctx
->user_data
= replay_ctx
;
629 parser_ctx
->start
= start_replay
;
630 parser_ctx
->end
= end_replay
;
631 parser_ctx
->cdata
= cdata_replay
;
632 parser_ctx
->done
= &replay_ctx
->done
;
634 handler
->response_handler
= svn_ra_serf__handle_xml_parser
;
635 handler
->response_baton
= parser_ctx
;
637 svn_ra_serf__request_create(handler
);
642 /* The maximum number of outstanding requests at any time. When this number is
643 * reached, ra_serf will stop sending requests until responses on the previous
644 * requests are received and handled.
646 * Some observations about serf which lead us to the current value.
647 * ----------------------------------------------------------------
648 * We aim to keep serf's outgoing queue filled with enough requests so the
649 * network bandwidth and server capacity is used optimally. Originally we used
650 * 5 as the max. number of outstanding requests, but this turned out to be too
652 * Serf doesn't exit out of the serf_context_run loop as long as it has
653 * data to send or receive. With small responses (revs of a few kB), serf
654 * doesn't come out of this loop at all. So with MAX_OUTSTANDING_REQUESTS set
655 * to a low number, there's a big chance that serf handles those requests
656 * completely in its internal loop, and only then gives us a chance to create
657 * new requests. This results in hiccups, slowing down the whole process.
659 * With a larger MAX_OUTSTANDING_REQUESTS, like 100 or more, there's more chance
660 * that serf can come out of its internal loop so we can replenish the outgoing
662 * There's no real disadvantage of using a large number here, besides the memory
663 * used to store the message, parser and handler objects (approx. 250 bytes).
665 * In my test setup peak performance was reached at max. 30-35 requests. So I
666 * added a small margin and choose 50.
668 #define MAX_OUTSTANDING_REQUESTS 50
671 svn_ra_serf__replay_range(svn_ra_session_t
*ra_session
,
672 svn_revnum_t start_revision
,
673 svn_revnum_t end_revision
,
674 svn_revnum_t low_water_mark
,
675 svn_boolean_t send_deltas
,
676 svn_ra_replay_revstart_callback_t revstart_func
,
677 svn_ra_replay_revfinish_callback_t revfinish_func
,
681 svn_ra_serf__session_t
*session
= ra_session
->priv
;
682 svn_revnum_t rev
= start_revision
;
684 int active_reports
= 0;
686 SVN_ERR(svn_ra_serf__discover_root(&vcc_url
, NULL
,
687 session
, session
->conns
[0],
688 session
->repos_url
.path
, pool
));
690 while (active_reports
|| rev
<= end_revision
)
693 svn_ra_serf__list_t
*done_list
;
694 svn_ra_serf__list_t
*done_reports
= NULL
;
695 replay_context_t
*replay_ctx
;
697 /* Send pending requests, if any. Limit the number of outstanding
698 requests to MAX_OUTSTANDING_REQUESTS. */
699 if (rev
<= end_revision
&& active_reports
< MAX_OUTSTANDING_REQUESTS
)
701 svn_ra_serf__propfind_context_t
*prop_ctx
= NULL
;
702 svn_ra_serf__handler_t
*handler
;
703 svn_ra_serf__xml_parser_t
*parser_ctx
;
704 apr_pool_t
*ctx_pool
= svn_pool_create(pool
);
706 replay_ctx
= apr_pcalloc(ctx_pool
, sizeof(*replay_ctx
));
707 replay_ctx
->pool
= ctx_pool
;
708 replay_ctx
->revstart_func
= revstart_func
;
709 replay_ctx
->revfinish_func
= revfinish_func
;
710 replay_ctx
->replay_baton
= replay_baton
;
711 replay_ctx
->done
= FALSE
;
712 replay_ctx
->revision
= rev
;
713 replay_ctx
->low_water_mark
= low_water_mark
;
714 replay_ctx
->send_deltas
= send_deltas
;
715 replay_ctx
->done_item
.data
= replay_ctx
;
716 /* Request all properties of a certain revision. */
717 replay_ctx
->vcc_url
= vcc_url
;
718 replay_ctx
->revs_props
= apr_hash_make(replay_ctx
->pool
);
719 SVN_ERR(svn_ra_serf__deliver_props(&prop_ctx
,
720 replay_ctx
->revs_props
, session
,
721 session
->conns
[0], vcc_url
,
723 TRUE
, NULL
, replay_ctx
->pool
));
725 /* Send the replay report request. */
726 handler
= apr_pcalloc(replay_ctx
->pool
, sizeof(*handler
));
728 handler
->method
= "REPORT";
729 handler
->path
= session
->repos_url_str
;
730 handler
->body_delegate
= create_replay_body
;
731 handler
->body_delegate_baton
= replay_ctx
;
732 handler
->conn
= session
->conns
[0];
733 handler
->session
= session
;
735 parser_ctx
= apr_pcalloc(replay_ctx
->pool
, sizeof(*parser_ctx
));
737 /* Setup the XML parser context.
738 Because we have not one but a list of requests, the 'done' property
739 on the replay_ctx is not of much use. Instead, use 'done_list'.
740 On each handled response (succesfully or not), the parser will add
741 done_item to done_list, so by keeping track of the state of
742 done_list we know how many requests have been handled completely.
744 parser_ctx
->pool
= replay_ctx
->pool
;
745 parser_ctx
->user_data
= replay_ctx
;
746 parser_ctx
->start
= start_replay
;
747 parser_ctx
->end
= end_replay
;
748 parser_ctx
->cdata
= cdata_replay
;
749 parser_ctx
->done
= &replay_ctx
->done
;
750 parser_ctx
->done_list
= &done_reports
;
751 parser_ctx
->done_item
= &replay_ctx
->done_item
;
752 handler
->response_handler
= svn_ra_serf__handle_xml_parser
;
753 handler
->response_baton
= parser_ctx
;
755 svn_ra_serf__request_create(handler
);
761 /* Run the serf loop, send outgoing and process incoming requests.
762 This request will block when there are no more requests to send or
763 responses to receive, so we have to be careful on our bookkeeping. */
764 status
= serf_context_run(session
->context
, SERF_DURATION_FOREVER
,
768 SVN_ERR(session
->pending_error
);
770 return svn_error_wrap_apr(status
,
771 _("Error retrieving replay REPORT (%d)"),
775 /* Substract the number of completely handled responses from our
776 total nr. of open requests', so we'll know when to stop this loop.
777 Since the message is completely handled, we can destroy its pool. */
778 done_list
= done_reports
;
781 replay_context_t
*ctx
= (replay_context_t
*)done_list
->data
;
782 done_list
= done_list
->next
;
783 svn_pool_destroy(ctx
->pool
);
791 #undef MAX_OUTSTANDING_REQUESTS