1 // SPDX-License-Identifier: GPL-2.0-only
3 * Copyright 2023 Red Hat
6 #include "index-session.h"
8 #include <linux/atomic.h>
11 #include "memory-alloc.h"
12 #include "time-utils.h"
14 #include "funnel-requestqueue.h"
16 #include "index-layout.h"
19 * The index session contains a lock (the request_mutex) which ensures that only one thread can
20 * change the state of its index at a time. The state field indicates the current state of the
21 * index through a set of descriptive flags. The request_mutex must be notified whenever a
22 * non-transient state flag is cleared. The request_mutex is also used to count the number of
23 * requests currently in progress so that they can be drained when suspending or closing the index.
25 * If the index session is suspended shortly after opening an index, it may have to suspend during
26 * a rebuild. Depending on the size of the index, a rebuild may take a significant amount of time,
27 * so UDS allows the rebuild to be paused in order to suspend the session in a timely manner. When
28 * the index session is resumed, the rebuild can continue from where it left off. If the index
29 * session is shut down with a suspended rebuild, the rebuild progress is abandoned and the rebuild
30 * will start from the beginning the next time the index is loaded. The mutex and status fields in
31 * the index_load_context are used to record the state of any interrupted rebuild.
34 enum index_session_flag_bit
{
35 IS_FLAG_BIT_START
= 8,
36 /* The session has started loading an index but not completed it. */
37 IS_FLAG_BIT_LOADING
= IS_FLAG_BIT_START
,
38 /* The session has loaded an index, which can handle requests. */
40 /* The session's index has been permanently disabled. */
42 /* The session's index is suspended. */
43 IS_FLAG_BIT_SUSPENDED
,
44 /* The session is handling some index state change. */
46 /* The session's index is closing and draining requests. */
48 /* The session is being destroyed and is draining requests. */
49 IS_FLAG_BIT_DESTROYING
,
52 enum index_session_flag
{
53 IS_FLAG_LOADED
= (1 << IS_FLAG_BIT_LOADED
),
54 IS_FLAG_LOADING
= (1 << IS_FLAG_BIT_LOADING
),
55 IS_FLAG_DISABLED
= (1 << IS_FLAG_BIT_DISABLED
),
56 IS_FLAG_SUSPENDED
= (1 << IS_FLAG_BIT_SUSPENDED
),
57 IS_FLAG_WAITING
= (1 << IS_FLAG_BIT_WAITING
),
58 IS_FLAG_CLOSING
= (1 << IS_FLAG_BIT_CLOSING
),
59 IS_FLAG_DESTROYING
= (1 << IS_FLAG_BIT_DESTROYING
),
62 /* Release a reference to an index session. */
63 static void release_index_session(struct uds_index_session
*index_session
)
65 mutex_lock(&index_session
->request_mutex
);
66 if (--index_session
->request_count
== 0)
67 uds_broadcast_cond(&index_session
->request_cond
);
68 mutex_unlock(&index_session
->request_mutex
);
72 * Acquire a reference to the index session for an asynchronous index request. The reference must
73 * eventually be released with a corresponding call to release_index_session().
75 static int get_index_session(struct uds_index_session
*index_session
)
78 int result
= UDS_SUCCESS
;
80 mutex_lock(&index_session
->request_mutex
);
81 index_session
->request_count
++;
82 state
= index_session
->state
;
83 mutex_unlock(&index_session
->request_mutex
);
85 if (state
== IS_FLAG_LOADED
) {
87 } else if (state
& IS_FLAG_DISABLED
) {
88 result
= UDS_DISABLED
;
89 } else if ((state
& IS_FLAG_LOADING
) ||
90 (state
& IS_FLAG_SUSPENDED
) ||
91 (state
& IS_FLAG_WAITING
)) {
94 result
= UDS_NO_INDEX
;
97 release_index_session(index_session
);
101 int uds_launch_request(struct uds_request
*request
)
103 size_t internal_size
;
106 if (request
->callback
== NULL
) {
107 vdo_log_error("missing required callback");
111 switch (request
->type
) {
115 case UDS_QUERY_NO_UPDATE
:
119 vdo_log_error("received invalid callback type");
123 /* Reset all internal fields before processing. */
125 sizeof(struct uds_request
) - offsetof(struct uds_request
, zone_number
);
126 // FIXME should be using struct_group for this instead
127 memset((char *) request
+ sizeof(*request
) - internal_size
, 0, internal_size
);
129 result
= get_index_session(request
->session
);
130 if (result
!= UDS_SUCCESS
)
133 request
->found
= false;
134 request
->unbatched
= false;
135 request
->index
= request
->session
->index
;
137 uds_enqueue_request(request
, STAGE_TRIAGE
);
141 static void enter_callback_stage(struct uds_request
*request
)
143 if (request
->status
!= UDS_SUCCESS
) {
144 /* All request errors are considered unrecoverable */
145 mutex_lock(&request
->session
->request_mutex
);
146 request
->session
->state
|= IS_FLAG_DISABLED
;
147 mutex_unlock(&request
->session
->request_mutex
);
150 uds_request_queue_enqueue(request
->session
->callback_queue
, request
);
153 static inline void count_once(u64
*count_ptr
)
155 WRITE_ONCE(*count_ptr
, READ_ONCE(*count_ptr
) + 1);
158 static void update_session_stats(struct uds_request
*request
)
160 struct session_stats
*session_stats
= &request
->session
->stats
;
162 count_once(&session_stats
->requests
);
164 switch (request
->type
) {
167 count_once(&session_stats
->posts_found
);
169 count_once(&session_stats
->posts_not_found
);
171 if (request
->location
== UDS_LOCATION_IN_OPEN_CHAPTER
)
172 count_once(&session_stats
->posts_found_open_chapter
);
173 else if (request
->location
== UDS_LOCATION_IN_DENSE
)
174 count_once(&session_stats
->posts_found_dense
);
175 else if (request
->location
== UDS_LOCATION_IN_SPARSE
)
176 count_once(&session_stats
->posts_found_sparse
);
181 count_once(&session_stats
->updates_found
);
183 count_once(&session_stats
->updates_not_found
);
188 count_once(&session_stats
->deletions_found
);
190 count_once(&session_stats
->deletions_not_found
);
194 case UDS_QUERY_NO_UPDATE
:
196 count_once(&session_stats
->queries_found
);
198 count_once(&session_stats
->queries_not_found
);
202 request
->status
= VDO_ASSERT(false, "unknown request type: %d",
207 static void handle_callbacks(struct uds_request
*request
)
209 struct uds_index_session
*index_session
= request
->session
;
211 if (request
->status
== UDS_SUCCESS
)
212 update_session_stats(request
);
214 request
->status
= uds_status_to_errno(request
->status
);
215 request
->callback(request
);
216 release_index_session(index_session
);
219 static int __must_check
make_empty_index_session(struct uds_index_session
**index_session_ptr
)
222 struct uds_index_session
*session
;
224 result
= vdo_allocate(1, struct uds_index_session
, __func__
, &session
);
225 if (result
!= VDO_SUCCESS
)
228 mutex_init(&session
->request_mutex
);
229 uds_init_cond(&session
->request_cond
);
230 mutex_init(&session
->load_context
.mutex
);
231 uds_init_cond(&session
->load_context
.cond
);
233 result
= uds_make_request_queue("callbackW", &handle_callbacks
,
234 &session
->callback_queue
);
235 if (result
!= UDS_SUCCESS
) {
240 *index_session_ptr
= session
;
244 int uds_create_index_session(struct uds_index_session
**session
)
246 if (session
== NULL
) {
247 vdo_log_error("missing session pointer");
251 return uds_status_to_errno(make_empty_index_session(session
));
254 static int __must_check
start_loading_index_session(struct uds_index_session
*index_session
)
258 mutex_lock(&index_session
->request_mutex
);
259 if (index_session
->state
& IS_FLAG_SUSPENDED
) {
260 vdo_log_info("Index session is suspended");
262 } else if (index_session
->state
!= 0) {
263 vdo_log_info("Index is already loaded");
266 index_session
->state
|= IS_FLAG_LOADING
;
267 result
= UDS_SUCCESS
;
269 mutex_unlock(&index_session
->request_mutex
);
273 static void finish_loading_index_session(struct uds_index_session
*index_session
,
276 mutex_lock(&index_session
->request_mutex
);
277 index_session
->state
&= ~IS_FLAG_LOADING
;
278 if (result
== UDS_SUCCESS
)
279 index_session
->state
|= IS_FLAG_LOADED
;
281 uds_broadcast_cond(&index_session
->request_cond
);
282 mutex_unlock(&index_session
->request_mutex
);
285 static int initialize_index_session(struct uds_index_session
*index_session
,
286 enum uds_open_index_type open_type
)
289 struct uds_configuration
*config
;
291 result
= uds_make_configuration(&index_session
->parameters
, &config
);
292 if (result
!= UDS_SUCCESS
) {
293 vdo_log_error_strerror(result
, "Failed to allocate config");
297 memset(&index_session
->stats
, 0, sizeof(index_session
->stats
));
298 result
= uds_make_index(config
, open_type
, &index_session
->load_context
,
299 enter_callback_stage
, &index_session
->index
);
300 if (result
!= UDS_SUCCESS
)
301 vdo_log_error_strerror(result
, "Failed to make index");
303 uds_log_configuration(config
);
305 uds_free_configuration(config
);
309 static const char *get_open_type_string(enum uds_open_index_type open_type
)
313 return "creating index";
315 return "loading or rebuilding index";
317 return "loading index";
319 return "unknown open method";
324 * Open an index under the given session. This operation will fail if the
325 * index session is suspended, or if there is already an open index.
327 int uds_open_index(enum uds_open_index_type open_type
,
328 const struct uds_parameters
*parameters
,
329 struct uds_index_session
*session
)
332 char name
[BDEVNAME_SIZE
];
334 if (parameters
== NULL
) {
335 vdo_log_error("missing required parameters");
338 if (parameters
->bdev
== NULL
) {
339 vdo_log_error("missing required block device");
342 if (session
== NULL
) {
343 vdo_log_error("missing required session pointer");
347 result
= start_loading_index_session(session
);
348 if (result
!= UDS_SUCCESS
)
349 return uds_status_to_errno(result
);
351 session
->parameters
= *parameters
;
352 format_dev_t(name
, parameters
->bdev
->bd_dev
);
353 vdo_log_info("%s: %s", get_open_type_string(open_type
), name
);
355 result
= initialize_index_session(session
, open_type
);
356 if (result
!= UDS_SUCCESS
)
357 vdo_log_error_strerror(result
, "Failed %s",
358 get_open_type_string(open_type
));
360 finish_loading_index_session(session
, result
);
361 return uds_status_to_errno(result
);
364 static void wait_for_no_requests_in_progress(struct uds_index_session
*index_session
)
366 mutex_lock(&index_session
->request_mutex
);
367 while (index_session
->request_count
> 0) {
368 uds_wait_cond(&index_session
->request_cond
,
369 &index_session
->request_mutex
);
371 mutex_unlock(&index_session
->request_mutex
);
374 static int __must_check
save_index(struct uds_index_session
*index_session
)
376 wait_for_no_requests_in_progress(index_session
);
377 return uds_save_index(index_session
->index
);
380 static void suspend_rebuild(struct uds_index_session
*session
)
382 mutex_lock(&session
->load_context
.mutex
);
383 switch (session
->load_context
.status
) {
385 session
->load_context
.status
= INDEX_SUSPENDING
;
387 /* Wait until the index indicates that it is not replaying. */
388 while ((session
->load_context
.status
!= INDEX_SUSPENDED
) &&
389 (session
->load_context
.status
!= INDEX_READY
)) {
390 uds_wait_cond(&session
->load_context
.cond
,
391 &session
->load_context
.mutex
);
397 /* Index load does not need to be suspended. */
400 case INDEX_SUSPENDED
:
401 case INDEX_SUSPENDING
:
404 /* These cases should not happen. */
405 VDO_ASSERT_LOG_ONLY(false, "Bad load context state %u",
406 session
->load_context
.status
);
409 mutex_unlock(&session
->load_context
.mutex
);
413 * Suspend index operation, draining all current index requests and preventing new index requests
414 * from starting. Optionally saves all index data before returning.
416 int uds_suspend_index_session(struct uds_index_session
*session
, bool save
)
418 int result
= UDS_SUCCESS
;
419 bool no_work
= false;
420 bool rebuilding
= false;
422 /* Wait for any current index state change to complete. */
423 mutex_lock(&session
->request_mutex
);
424 while (session
->state
& IS_FLAG_CLOSING
)
425 uds_wait_cond(&session
->request_cond
, &session
->request_mutex
);
427 if ((session
->state
& IS_FLAG_WAITING
) || (session
->state
& IS_FLAG_DESTROYING
)) {
429 vdo_log_info("Index session is already changing state");
431 } else if (session
->state
& IS_FLAG_SUSPENDED
) {
433 } else if (session
->state
& IS_FLAG_LOADING
) {
434 session
->state
|= IS_FLAG_WAITING
;
436 } else if (session
->state
& IS_FLAG_LOADED
) {
437 session
->state
|= IS_FLAG_WAITING
;
440 session
->state
|= IS_FLAG_SUSPENDED
;
441 uds_broadcast_cond(&session
->request_cond
);
443 mutex_unlock(&session
->request_mutex
);
446 return uds_status_to_errno(result
);
449 suspend_rebuild(session
);
451 result
= save_index(session
);
453 result
= uds_flush_index_session(session
);
455 mutex_lock(&session
->request_mutex
);
456 session
->state
&= ~IS_FLAG_WAITING
;
457 session
->state
|= IS_FLAG_SUSPENDED
;
458 uds_broadcast_cond(&session
->request_cond
);
459 mutex_unlock(&session
->request_mutex
);
460 return uds_status_to_errno(result
);
463 static int replace_device(struct uds_index_session
*session
, struct block_device
*bdev
)
467 result
= uds_replace_index_storage(session
->index
, bdev
);
468 if (result
!= UDS_SUCCESS
)
471 session
->parameters
.bdev
= bdev
;
476 * Resume index operation after being suspended. If the index is suspended and the supplied block
477 * device differs from the current backing store, the index will start using the new backing store.
479 int uds_resume_index_session(struct uds_index_session
*session
,
480 struct block_device
*bdev
)
482 int result
= UDS_SUCCESS
;
483 bool no_work
= false;
484 bool resume_replay
= false;
486 mutex_lock(&session
->request_mutex
);
487 if (session
->state
& IS_FLAG_WAITING
) {
488 vdo_log_info("Index session is already changing state");
491 } else if (!(session
->state
& IS_FLAG_SUSPENDED
)) {
492 /* If not suspended, just succeed. */
494 result
= UDS_SUCCESS
;
496 session
->state
|= IS_FLAG_WAITING
;
497 if (session
->state
& IS_FLAG_LOADING
)
498 resume_replay
= true;
500 mutex_unlock(&session
->request_mutex
);
505 if ((session
->index
!= NULL
) && (bdev
!= session
->parameters
.bdev
)) {
506 result
= replace_device(session
, bdev
);
507 if (result
!= UDS_SUCCESS
) {
508 mutex_lock(&session
->request_mutex
);
509 session
->state
&= ~IS_FLAG_WAITING
;
510 uds_broadcast_cond(&session
->request_cond
);
511 mutex_unlock(&session
->request_mutex
);
512 return uds_status_to_errno(result
);
517 mutex_lock(&session
->load_context
.mutex
);
518 switch (session
->load_context
.status
) {
519 case INDEX_SUSPENDED
:
520 session
->load_context
.status
= INDEX_OPENING
;
521 /* Notify the index to start replaying again. */
522 uds_broadcast_cond(&session
->load_context
.cond
);
526 /* There is no index rebuild to resume. */
530 case INDEX_SUSPENDING
:
533 /* These cases should not happen; do nothing. */
534 VDO_ASSERT_LOG_ONLY(false, "Bad load context state %u",
535 session
->load_context
.status
);
538 mutex_unlock(&session
->load_context
.mutex
);
541 mutex_lock(&session
->request_mutex
);
542 session
->state
&= ~IS_FLAG_WAITING
;
543 session
->state
&= ~IS_FLAG_SUSPENDED
;
544 uds_broadcast_cond(&session
->request_cond
);
545 mutex_unlock(&session
->request_mutex
);
549 static int save_and_free_index(struct uds_index_session
*index_session
)
551 int result
= UDS_SUCCESS
;
553 struct uds_index
*index
= index_session
->index
;
558 mutex_lock(&index_session
->request_mutex
);
559 suspended
= (index_session
->state
& IS_FLAG_SUSPENDED
);
560 mutex_unlock(&index_session
->request_mutex
);
563 result
= uds_save_index(index
);
564 if (result
!= UDS_SUCCESS
)
565 vdo_log_warning_strerror(result
,
566 "ignoring error from save_index");
568 uds_free_index(index
);
569 index_session
->index
= NULL
;
572 * Reset all index state that happens to be in the index
573 * session, so it doesn't affect any future index.
575 mutex_lock(&index_session
->load_context
.mutex
);
576 index_session
->load_context
.status
= INDEX_OPENING
;
577 mutex_unlock(&index_session
->load_context
.mutex
);
579 mutex_lock(&index_session
->request_mutex
);
580 /* Only the suspend bit will remain relevant. */
581 index_session
->state
&= IS_FLAG_SUSPENDED
;
582 mutex_unlock(&index_session
->request_mutex
);
587 /* Save and close the current index. */
588 int uds_close_index(struct uds_index_session
*index_session
)
590 int result
= UDS_SUCCESS
;
592 /* Wait for any current index state change to complete. */
593 mutex_lock(&index_session
->request_mutex
);
594 while ((index_session
->state
& IS_FLAG_WAITING
) ||
595 (index_session
->state
& IS_FLAG_CLOSING
)) {
596 uds_wait_cond(&index_session
->request_cond
,
597 &index_session
->request_mutex
);
600 if (index_session
->state
& IS_FLAG_SUSPENDED
) {
601 vdo_log_info("Index session is suspended");
603 } else if ((index_session
->state
& IS_FLAG_DESTROYING
) ||
604 !(index_session
->state
& IS_FLAG_LOADED
)) {
605 /* The index doesn't exist, hasn't finished loading, or is being destroyed. */
606 result
= UDS_NO_INDEX
;
608 index_session
->state
|= IS_FLAG_CLOSING
;
610 mutex_unlock(&index_session
->request_mutex
);
611 if (result
!= UDS_SUCCESS
)
612 return uds_status_to_errno(result
);
614 vdo_log_debug("Closing index");
615 wait_for_no_requests_in_progress(index_session
);
616 result
= save_and_free_index(index_session
);
617 vdo_log_debug("Closed index");
619 mutex_lock(&index_session
->request_mutex
);
620 index_session
->state
&= ~IS_FLAG_CLOSING
;
621 uds_broadcast_cond(&index_session
->request_cond
);
622 mutex_unlock(&index_session
->request_mutex
);
623 return uds_status_to_errno(result
);
626 /* This will save and close an open index before destroying the session. */
627 int uds_destroy_index_session(struct uds_index_session
*index_session
)
630 bool load_pending
= false;
632 vdo_log_debug("Destroying index session");
634 /* Wait for any current index state change to complete. */
635 mutex_lock(&index_session
->request_mutex
);
636 while ((index_session
->state
& IS_FLAG_WAITING
) ||
637 (index_session
->state
& IS_FLAG_CLOSING
)) {
638 uds_wait_cond(&index_session
->request_cond
,
639 &index_session
->request_mutex
);
642 if (index_session
->state
& IS_FLAG_DESTROYING
) {
643 mutex_unlock(&index_session
->request_mutex
);
644 vdo_log_info("Index session is already closing");
648 index_session
->state
|= IS_FLAG_DESTROYING
;
649 load_pending
= ((index_session
->state
& IS_FLAG_LOADING
) &&
650 (index_session
->state
& IS_FLAG_SUSPENDED
));
651 mutex_unlock(&index_session
->request_mutex
);
654 /* Tell the index to terminate the rebuild. */
655 mutex_lock(&index_session
->load_context
.mutex
);
656 if (index_session
->load_context
.status
== INDEX_SUSPENDED
) {
657 index_session
->load_context
.status
= INDEX_FREEING
;
658 uds_broadcast_cond(&index_session
->load_context
.cond
);
660 mutex_unlock(&index_session
->load_context
.mutex
);
662 /* Wait until the load exits before proceeding. */
663 mutex_lock(&index_session
->request_mutex
);
664 while (index_session
->state
& IS_FLAG_LOADING
) {
665 uds_wait_cond(&index_session
->request_cond
,
666 &index_session
->request_mutex
);
668 mutex_unlock(&index_session
->request_mutex
);
671 wait_for_no_requests_in_progress(index_session
);
672 result
= save_and_free_index(index_session
);
673 uds_request_queue_finish(index_session
->callback_queue
);
674 index_session
->callback_queue
= NULL
;
675 vdo_log_debug("Destroyed index session");
676 vdo_free(index_session
);
677 return uds_status_to_errno(result
);
680 /* Wait until all callbacks for index operations are complete. */
681 int uds_flush_index_session(struct uds_index_session
*index_session
)
683 wait_for_no_requests_in_progress(index_session
);
684 uds_wait_for_idle_index(index_session
->index
);
688 /* Statistics collection is intended to be thread-safe. */
689 static void collect_stats(const struct uds_index_session
*index_session
,
690 struct uds_index_stats
*stats
)
692 const struct session_stats
*session_stats
= &index_session
->stats
;
694 stats
->current_time
= ktime_to_seconds(current_time_ns(CLOCK_REALTIME
));
695 stats
->posts_found
= READ_ONCE(session_stats
->posts_found
);
696 stats
->in_memory_posts_found
= READ_ONCE(session_stats
->posts_found_open_chapter
);
697 stats
->dense_posts_found
= READ_ONCE(session_stats
->posts_found_dense
);
698 stats
->sparse_posts_found
= READ_ONCE(session_stats
->posts_found_sparse
);
699 stats
->posts_not_found
= READ_ONCE(session_stats
->posts_not_found
);
700 stats
->updates_found
= READ_ONCE(session_stats
->updates_found
);
701 stats
->updates_not_found
= READ_ONCE(session_stats
->updates_not_found
);
702 stats
->deletions_found
= READ_ONCE(session_stats
->deletions_found
);
703 stats
->deletions_not_found
= READ_ONCE(session_stats
->deletions_not_found
);
704 stats
->queries_found
= READ_ONCE(session_stats
->queries_found
);
705 stats
->queries_not_found
= READ_ONCE(session_stats
->queries_not_found
);
706 stats
->requests
= READ_ONCE(session_stats
->requests
);
709 int uds_get_index_session_stats(struct uds_index_session
*index_session
,
710 struct uds_index_stats
*stats
)
713 vdo_log_error("received a NULL index stats pointer");
717 collect_stats(index_session
, stats
);
718 if (index_session
->index
!= NULL
) {
719 uds_get_index_stats(index_session
->index
, stats
);
721 stats
->entries_indexed
= 0;
722 stats
->memory_used
= 0;
723 stats
->collisions
= 0;
724 stats
->entries_discarded
= 0;
730 void uds_wait_cond(struct cond_var
*cv
, struct mutex
*mutex
)
734 prepare_to_wait(&cv
->wait_queue
, &__wait
, TASK_IDLE
);
737 finish_wait(&cv
->wait_queue
, &__wait
);