1 // SPDX-License-Identifier: GPL-2.0-only
3 * Copyright 2023 Red Hat
6 #include "recovery-journal.h"
8 #include <linux/atomic.h>
12 #include "memory-alloc.h"
13 #include "permassert.h"
15 #include "block-map.h"
16 #include "completion.h"
17 #include "constants.h"
19 #include "encodings.h"
20 #include "io-submitter.h"
21 #include "slab-depot.h"
25 #include "wait-queue.h"
27 static const u64 RECOVERY_COUNT_MASK
= 0xff;
30 * The number of reserved blocks must be large enough to prevent a new recovery journal
31 * block write from overwriting a block which appears to still be a valid head block of the
32 * journal. Currently, that means reserving enough space for all 2048 data_vios.
34 #define RECOVERY_JOURNAL_RESERVED_BLOCKS \
35 ((MAXIMUM_VDO_USER_VIOS / RECOVERY_JOURNAL_ENTRIES_PER_BLOCK) + 2)
40 * A lock_counter is intended to keep all of the locks for the blocks in the recovery journal. The
41 * per-zone counters are all kept in a single array which is arranged by zone (i.e. zone 0's lock 0
42 * is at index 0, zone 0's lock 1 is at index 1, and zone 1's lock 0 is at index 'locks'. This
43 * arrangement is intended to minimize cache-line contention for counters from different zones.
45 * The locks are implemented as a single object instead of as a lock counter per lock both to
46 * afford this opportunity to reduce cache line contention and also to eliminate the need to have a
47 * completion per lock.
49 * Lock sets are laid out with the set for recovery journal first, followed by the logical zones,
50 * and then the physical zones.
53 enum lock_counter_state
{
54 LOCK_COUNTER_STATE_NOT_NOTIFYING
,
55 LOCK_COUNTER_STATE_NOTIFYING
,
56 LOCK_COUNTER_STATE_SUSPENDED
,
60 * get_zone_count_ptr() - Get a pointer to the zone count for a given lock on a given zone.
61 * @journal: The recovery journal.
62 * @lock_number: The lock to get.
63 * @zone_type: The zone type whose count is desired.
65 * Return: A pointer to the zone count for the given lock and zone.
67 static inline atomic_t
*get_zone_count_ptr(struct recovery_journal
*journal
,
68 block_count_t lock_number
,
69 enum vdo_zone_type zone_type
)
71 return ((zone_type
== VDO_ZONE_TYPE_LOGICAL
)
72 ? &journal
->lock_counter
.logical_zone_counts
[lock_number
]
73 : &journal
->lock_counter
.physical_zone_counts
[lock_number
]);
77 * get_counter() - Get the zone counter for a given lock on a given zone.
78 * @journal: The recovery journal.
79 * @lock_number: The lock to get.
80 * @zone_type: The zone type whose count is desired.
81 * @zone_id: The zone index whose count is desired.
83 * Return: The counter for the given lock and zone.
85 static inline u16
*get_counter(struct recovery_journal
*journal
,
86 block_count_t lock_number
, enum vdo_zone_type zone_type
,
89 struct lock_counter
*counter
= &journal
->lock_counter
;
90 block_count_t zone_counter
= (counter
->locks
* zone_id
) + lock_number
;
92 if (zone_type
== VDO_ZONE_TYPE_JOURNAL
)
93 return &counter
->journal_counters
[zone_counter
];
95 if (zone_type
== VDO_ZONE_TYPE_LOGICAL
)
96 return &counter
->logical_counters
[zone_counter
];
98 return &counter
->physical_counters
[zone_counter
];
101 static atomic_t
*get_decrement_counter(struct recovery_journal
*journal
,
102 block_count_t lock_number
)
104 return &journal
->lock_counter
.journal_decrement_counts
[lock_number
];
108 * is_journal_zone_locked() - Check whether the journal zone is locked for a given lock.
109 * @journal: The recovery journal.
110 * @lock_number: The lock to check.
112 * Return: true if the journal zone is locked.
114 static bool is_journal_zone_locked(struct recovery_journal
*journal
,
115 block_count_t lock_number
)
117 u16 journal_value
= *get_counter(journal
, lock_number
, VDO_ZONE_TYPE_JOURNAL
, 0);
118 u32 decrements
= atomic_read(get_decrement_counter(journal
, lock_number
));
120 /* Pairs with barrier in vdo_release_journal_entry_lock() */
122 VDO_ASSERT_LOG_ONLY((decrements
<= journal_value
),
123 "journal zone lock counter must not underflow");
124 return (journal_value
!= decrements
);
128 * vdo_release_recovery_journal_block_reference() - Release a reference to a recovery journal
130 * @journal: The recovery journal.
131 * @sequence_number: The journal sequence number of the referenced block.
132 * @zone_type: The type of the zone making the adjustment.
133 * @zone_id: The ID of the zone making the adjustment.
135 * If this is the last reference for a given zone type, an attempt will be made to reap the
138 void vdo_release_recovery_journal_block_reference(struct recovery_journal
*journal
,
139 sequence_number_t sequence_number
,
140 enum vdo_zone_type zone_type
,
141 zone_count_t zone_id
)
144 block_count_t lock_number
;
147 if (sequence_number
== 0)
150 lock_number
= vdo_get_recovery_journal_block_number(journal
, sequence_number
);
151 current_value
= get_counter(journal
, lock_number
, zone_type
, zone_id
);
153 VDO_ASSERT_LOG_ONLY((*current_value
>= 1),
154 "decrement of lock counter must not underflow");
157 if (zone_type
== VDO_ZONE_TYPE_JOURNAL
) {
158 if (is_journal_zone_locked(journal
, lock_number
))
161 atomic_t
*zone_count
;
163 if (*current_value
!= 0)
166 zone_count
= get_zone_count_ptr(journal
, lock_number
, zone_type
);
168 if (atomic_add_return(-1, zone_count
) > 0)
173 * Extra barriers because this was original developed using a CAS operation that implicitly
176 smp_mb__before_atomic();
177 prior_state
= atomic_cmpxchg(&journal
->lock_counter
.state
,
178 LOCK_COUNTER_STATE_NOT_NOTIFYING
,
179 LOCK_COUNTER_STATE_NOTIFYING
);
180 /* same as before_atomic */
181 smp_mb__after_atomic();
183 if (prior_state
!= LOCK_COUNTER_STATE_NOT_NOTIFYING
)
186 vdo_launch_completion(&journal
->lock_counter
.completion
);
189 static inline struct recovery_journal_block
* __must_check
get_journal_block(struct list_head
*list
)
191 return list_first_entry_or_null(list
, struct recovery_journal_block
, list_node
);
195 * pop_free_list() - Get a block from the end of the free list.
196 * @journal: The journal.
198 * Return: The block or NULL if the list is empty.
200 static struct recovery_journal_block
* __must_check
pop_free_list(struct recovery_journal
*journal
)
202 struct recovery_journal_block
*block
;
204 if (list_empty(&journal
->free_tail_blocks
))
207 block
= list_last_entry(&journal
->free_tail_blocks
,
208 struct recovery_journal_block
, list_node
);
209 list_del_init(&block
->list_node
);
214 * is_block_dirty() - Check whether a recovery block is dirty.
215 * @block: The block to check.
217 * Indicates it has any uncommitted entries, which includes both entries not written and entries
218 * written but not yet acknowledged.
220 * Return: true if the block has any uncommitted entries.
222 static inline bool __must_check
is_block_dirty(const struct recovery_journal_block
*block
)
224 return (block
->uncommitted_entry_count
> 0);
228 * is_block_empty() - Check whether a journal block is empty.
229 * @block: The block to check.
231 * Return: true if the block has no entries.
233 static inline bool __must_check
is_block_empty(const struct recovery_journal_block
*block
)
235 return (block
->entry_count
== 0);
239 * is_block_full() - Check whether a journal block is full.
240 * @block: The block to check.
242 * Return: true if the block is full.
244 static inline bool __must_check
is_block_full(const struct recovery_journal_block
*block
)
246 return ((block
== NULL
) || (block
->journal
->entries_per_block
== block
->entry_count
));
250 * assert_on_journal_thread() - Assert that we are running on the journal thread.
251 * @journal: The journal.
252 * @function_name: The function doing the check (for logging).
254 static void assert_on_journal_thread(struct recovery_journal
*journal
,
255 const char *function_name
)
257 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == journal
->thread_id
),
258 "%s() called on journal thread", function_name
);
262 * continue_waiter() - Release a data_vio from the journal.
264 * Invoked whenever a data_vio is to be released from the journal, either because its entry was
265 * committed to disk, or because there was an error. Implements waiter_callback_fn.
267 static void continue_waiter(struct vdo_waiter
*waiter
, void *context
)
269 continue_data_vio_with_error(vdo_waiter_as_data_vio(waiter
), *((int *) context
));
273 * has_block_waiters() - Check whether the journal has any waiters on any blocks.
274 * @journal: The journal in question.
276 * Return: true if any block has a waiter.
278 static inline bool has_block_waiters(struct recovery_journal
*journal
)
280 struct recovery_journal_block
*block
= get_journal_block(&journal
->active_tail_blocks
);
283 * Either the first active tail block (if it exists) has waiters, or no active tail block
286 return ((block
!= NULL
) &&
287 (vdo_waitq_has_waiters(&block
->entry_waiters
) ||
288 vdo_waitq_has_waiters(&block
->commit_waiters
)));
291 static void recycle_journal_blocks(struct recovery_journal
*journal
);
292 static void recycle_journal_block(struct recovery_journal_block
*block
);
293 static void notify_commit_waiters(struct recovery_journal
*journal
);
296 * suspend_lock_counter() - Prevent the lock counter from notifying.
297 * @counter: The counter.
299 * Return: true if the lock counter was not notifying and hence the suspend was efficacious.
301 static bool suspend_lock_counter(struct lock_counter
*counter
)
306 * Extra barriers because this was originally developed using a CAS operation that
307 * implicitly had them.
309 smp_mb__before_atomic();
310 prior_state
= atomic_cmpxchg(&counter
->state
, LOCK_COUNTER_STATE_NOT_NOTIFYING
,
311 LOCK_COUNTER_STATE_SUSPENDED
);
312 /* same as before_atomic */
313 smp_mb__after_atomic();
315 return ((prior_state
== LOCK_COUNTER_STATE_SUSPENDED
) ||
316 (prior_state
== LOCK_COUNTER_STATE_NOT_NOTIFYING
));
319 static inline bool is_read_only(struct recovery_journal
*journal
)
321 return vdo_is_read_only(journal
->flush_vio
->completion
.vdo
);
325 * check_for_drain_complete() - Check whether the journal has drained.
326 * @journal: The journal which may have just drained.
328 static void check_for_drain_complete(struct recovery_journal
*journal
)
330 int result
= VDO_SUCCESS
;
332 if (is_read_only(journal
)) {
333 result
= VDO_READ_ONLY
;
335 * Clean up any full active blocks which were not written due to read-only mode.
337 * FIXME: This would probably be better as a short-circuit in write_block().
339 notify_commit_waiters(journal
);
340 recycle_journal_blocks(journal
);
342 /* Release any data_vios waiting to be assigned entries. */
343 vdo_waitq_notify_all_waiters(&journal
->entry_waiters
,
344 continue_waiter
, &result
);
347 if (!vdo_is_state_draining(&journal
->state
) ||
349 has_block_waiters(journal
) ||
350 vdo_waitq_has_waiters(&journal
->entry_waiters
) ||
351 !suspend_lock_counter(&journal
->lock_counter
))
354 if (vdo_is_state_saving(&journal
->state
)) {
355 if (journal
->active_block
!= NULL
) {
356 VDO_ASSERT_LOG_ONLY(((result
== VDO_READ_ONLY
) ||
357 !is_block_dirty(journal
->active_block
)),
358 "journal being saved has clean active block");
359 recycle_journal_block(journal
->active_block
);
362 VDO_ASSERT_LOG_ONLY(list_empty(&journal
->active_tail_blocks
),
363 "all blocks in a journal being saved must be inactive");
366 vdo_finish_draining_with_result(&journal
->state
, result
);
370 * notify_recovery_journal_of_read_only_mode() - Notify a recovery journal that the VDO has gone
372 * @listener: The journal.
373 * @parent: The completion to notify in order to acknowledge the notification.
375 * Implements vdo_read_only_notification_fn.
377 static void notify_recovery_journal_of_read_only_mode(void *listener
,
378 struct vdo_completion
*parent
)
380 check_for_drain_complete(listener
);
381 vdo_finish_completion(parent
);
385 * enter_journal_read_only_mode() - Put the journal in read-only mode.
386 * @journal: The journal which has failed.
387 * @error_code: The error result triggering this call.
389 * All attempts to add entries after this function is called will fail. All VIOs waiting for
390 * commits will be awakened with an error.
392 static void enter_journal_read_only_mode(struct recovery_journal
*journal
,
395 vdo_enter_read_only_mode(journal
->flush_vio
->completion
.vdo
, error_code
);
396 check_for_drain_complete(journal
);
400 * vdo_get_recovery_journal_current_sequence_number() - Obtain the recovery journal's current
402 * @journal: The journal in question.
404 * Exposed only so the block map can be initialized therefrom.
406 * Return: The sequence number of the tail block.
408 sequence_number_t
vdo_get_recovery_journal_current_sequence_number(struct recovery_journal
*journal
)
410 return journal
->tail
;
414 * get_recovery_journal_head() - Get the head of the recovery journal.
415 * @journal: The journal.
417 * The head is the lowest sequence number of the block map head and the slab journal head.
419 * Return: the head of the journal.
421 static inline sequence_number_t
get_recovery_journal_head(const struct recovery_journal
*journal
)
423 return min(journal
->block_map_head
, journal
->slab_journal_head
);
427 * compute_recovery_count_byte() - Compute the recovery count byte for a given recovery count.
428 * @recovery_count: The recovery count.
430 * Return: The byte corresponding to the recovery count.
432 static inline u8 __must_check
compute_recovery_count_byte(u64 recovery_count
)
434 return (u8
)(recovery_count
& RECOVERY_COUNT_MASK
);
438 * check_slab_journal_commit_threshold() - Check whether the journal is over the threshold, and if
439 * so, force the oldest slab journal tail block to commit.
440 * @journal: The journal.
442 static void check_slab_journal_commit_threshold(struct recovery_journal
*journal
)
444 block_count_t current_length
= journal
->tail
- journal
->slab_journal_head
;
446 if (current_length
> journal
->slab_journal_commit_threshold
) {
447 journal
->events
.slab_journal_commits_requested
++;
448 vdo_commit_oldest_slab_journal_tail_blocks(journal
->depot
,
449 journal
->slab_journal_head
);
453 static void reap_recovery_journal(struct recovery_journal
*journal
);
454 static void assign_entries(struct recovery_journal
*journal
);
457 * finish_reaping() - Finish reaping the journal.
458 * @journal: The journal being reaped.
460 static void finish_reaping(struct recovery_journal
*journal
)
462 block_count_t blocks_reaped
;
463 sequence_number_t old_head
= get_recovery_journal_head(journal
);
465 journal
->block_map_head
= journal
->block_map_reap_head
;
466 journal
->slab_journal_head
= journal
->slab_journal_reap_head
;
467 blocks_reaped
= get_recovery_journal_head(journal
) - old_head
;
468 journal
->available_space
+= blocks_reaped
* journal
->entries_per_block
;
469 journal
->reaping
= false;
470 check_slab_journal_commit_threshold(journal
);
471 assign_entries(journal
);
472 check_for_drain_complete(journal
);
476 * complete_reaping() - Finish reaping the journal after flushing the lower layer.
477 * @completion: The journal's flush VIO.
479 * This is the callback registered in reap_recovery_journal().
481 static void complete_reaping(struct vdo_completion
*completion
)
483 struct recovery_journal
*journal
= completion
->parent
;
485 finish_reaping(journal
);
487 /* Try reaping again in case more locks were released while flush was out. */
488 reap_recovery_journal(journal
);
492 * handle_flush_error() - Handle an error when flushing the lower layer due to reaping.
493 * @completion: The journal's flush VIO.
495 static void handle_flush_error(struct vdo_completion
*completion
)
497 struct recovery_journal
*journal
= completion
->parent
;
499 vio_record_metadata_io_error(as_vio(completion
));
500 journal
->reaping
= false;
501 enter_journal_read_only_mode(journal
, completion
->result
);
504 static void flush_endio(struct bio
*bio
)
506 struct vio
*vio
= bio
->bi_private
;
507 struct recovery_journal
*journal
= vio
->completion
.parent
;
509 continue_vio_after_io(vio
, complete_reaping
, journal
->thread_id
);
513 * initialize_journal_state() - Set all journal fields appropriately to start journaling from the
514 * current active block.
515 * @journal: The journal to be reset based on its active block.
517 static void initialize_journal_state(struct recovery_journal
*journal
)
519 journal
->append_point
.sequence_number
= journal
->tail
;
520 journal
->last_write_acknowledged
= journal
->tail
;
521 journal
->block_map_head
= journal
->tail
;
522 journal
->slab_journal_head
= journal
->tail
;
523 journal
->block_map_reap_head
= journal
->tail
;
524 journal
->slab_journal_reap_head
= journal
->tail
;
525 journal
->block_map_head_block_number
=
526 vdo_get_recovery_journal_block_number(journal
, journal
->block_map_head
);
527 journal
->slab_journal_head_block_number
=
528 vdo_get_recovery_journal_block_number(journal
,
529 journal
->slab_journal_head
);
530 journal
->available_space
=
531 (journal
->entries_per_block
* vdo_get_recovery_journal_length(journal
->size
));
535 * vdo_get_recovery_journal_length() - Get the number of usable recovery journal blocks.
536 * @journal_size: The size of the recovery journal in blocks.
538 * Return: the number of recovery journal blocks usable for entries.
540 block_count_t
vdo_get_recovery_journal_length(block_count_t journal_size
)
542 block_count_t reserved_blocks
= journal_size
/ 4;
544 if (reserved_blocks
> RECOVERY_JOURNAL_RESERVED_BLOCKS
)
545 reserved_blocks
= RECOVERY_JOURNAL_RESERVED_BLOCKS
;
546 return (journal_size
- reserved_blocks
);
550 * reap_recovery_journal_callback() - Attempt to reap the journal.
551 * @completion: The lock counter completion.
553 * Attempts to reap the journal now that all the locks on some journal block have been released.
554 * This is the callback registered with the lock counter.
556 static void reap_recovery_journal_callback(struct vdo_completion
*completion
)
558 struct recovery_journal
*journal
= (struct recovery_journal
*) completion
->parent
;
560 * The acknowledgment must be done before reaping so that there is no race between
561 * acknowledging the notification and unlocks wishing to notify.
564 atomic_set(&journal
->lock_counter
.state
, LOCK_COUNTER_STATE_NOT_NOTIFYING
);
566 if (vdo_is_state_quiescing(&journal
->state
)) {
568 * Don't start reaping when the journal is trying to quiesce. Do check if this
569 * notification is the last thing the is waiting on.
571 check_for_drain_complete(journal
);
575 reap_recovery_journal(journal
);
576 check_slab_journal_commit_threshold(journal
);
580 * initialize_lock_counter() - Initialize a lock counter.
582 * @journal: The recovery journal.
585 * Return: VDO_SUCCESS or an error.
587 static int __must_check
initialize_lock_counter(struct recovery_journal
*journal
,
591 struct thread_config
*config
= &vdo
->thread_config
;
592 struct lock_counter
*counter
= &journal
->lock_counter
;
594 result
= vdo_allocate(journal
->size
, u16
, __func__
, &counter
->journal_counters
);
595 if (result
!= VDO_SUCCESS
)
598 result
= vdo_allocate(journal
->size
, atomic_t
, __func__
,
599 &counter
->journal_decrement_counts
);
600 if (result
!= VDO_SUCCESS
)
603 result
= vdo_allocate(journal
->size
* config
->logical_zone_count
, u16
, __func__
,
604 &counter
->logical_counters
);
605 if (result
!= VDO_SUCCESS
)
608 result
= vdo_allocate(journal
->size
, atomic_t
, __func__
,
609 &counter
->logical_zone_counts
);
610 if (result
!= VDO_SUCCESS
)
613 result
= vdo_allocate(journal
->size
* config
->physical_zone_count
, u16
, __func__
,
614 &counter
->physical_counters
);
615 if (result
!= VDO_SUCCESS
)
618 result
= vdo_allocate(journal
->size
, atomic_t
, __func__
,
619 &counter
->physical_zone_counts
);
620 if (result
!= VDO_SUCCESS
)
623 vdo_initialize_completion(&counter
->completion
, vdo
,
624 VDO_LOCK_COUNTER_COMPLETION
);
625 vdo_prepare_completion(&counter
->completion
, reap_recovery_journal_callback
,
626 reap_recovery_journal_callback
, config
->journal_thread
,
628 counter
->logical_zones
= config
->logical_zone_count
;
629 counter
->physical_zones
= config
->physical_zone_count
;
630 counter
->locks
= journal
->size
;
635 * set_journal_tail() - Set the journal's tail sequence number.
636 * @journal: The journal whose tail is to be set.
637 * @tail: The new tail value.
639 static void set_journal_tail(struct recovery_journal
*journal
, sequence_number_t tail
)
641 /* VDO does not support sequence numbers above 1 << 48 in the slab journal. */
642 if (tail
>= (1ULL << 48))
643 enter_journal_read_only_mode(journal
, VDO_JOURNAL_OVERFLOW
);
645 journal
->tail
= tail
;
649 * initialize_recovery_block() - Initialize a journal block.
650 * @vdo: The vdo from which to construct vios.
651 * @journal: The journal to which the block will belong.
652 * @block: The block to initialize.
654 * Return: VDO_SUCCESS or an error.
656 static int initialize_recovery_block(struct vdo
*vdo
, struct recovery_journal
*journal
,
657 struct recovery_journal_block
*block
)
663 * Ensure that a block is large enough to store RECOVERY_JOURNAL_ENTRIES_PER_BLOCK entries.
665 BUILD_BUG_ON(RECOVERY_JOURNAL_ENTRIES_PER_BLOCK
>
666 ((VDO_BLOCK_SIZE
- sizeof(struct packed_journal_header
)) /
667 sizeof(struct packed_recovery_journal_entry
)));
670 * Allocate a full block for the journal block even though not all of the space is used
671 * since the VIO needs to write a full disk block.
673 result
= vdo_allocate(VDO_BLOCK_SIZE
, char, __func__
, &data
);
674 if (result
!= VDO_SUCCESS
)
677 result
= allocate_vio_components(vdo
, VIO_TYPE_RECOVERY_JOURNAL
,
678 VIO_PRIORITY_HIGH
, block
, 1, data
, &block
->vio
);
679 if (result
!= VDO_SUCCESS
) {
684 list_add_tail(&block
->list_node
, &journal
->free_tail_blocks
);
685 block
->journal
= journal
;
690 * vdo_decode_recovery_journal() - Make a recovery journal and initialize it with the state that
691 * was decoded from the super block.
693 * @state: The decoded state of the journal.
694 * @nonce: The nonce of the VDO.
696 * @partition: The partition for the journal.
697 * @recovery_count: The VDO's number of completed recoveries.
698 * @journal_size: The number of blocks in the journal on disk.
699 * @journal_ptr: The pointer to hold the new recovery journal.
701 * Return: A success or error code.
703 int vdo_decode_recovery_journal(struct recovery_journal_state_7_0 state
, nonce_t nonce
,
704 struct vdo
*vdo
, struct partition
*partition
,
705 u64 recovery_count
, block_count_t journal_size
,
706 struct recovery_journal
**journal_ptr
)
709 struct recovery_journal
*journal
;
712 result
= vdo_allocate_extended(struct recovery_journal
,
713 RECOVERY_JOURNAL_RESERVED_BLOCKS
,
714 struct recovery_journal_block
, __func__
,
716 if (result
!= VDO_SUCCESS
)
719 INIT_LIST_HEAD(&journal
->free_tail_blocks
);
720 INIT_LIST_HEAD(&journal
->active_tail_blocks
);
721 vdo_waitq_init(&journal
->pending_writes
);
723 journal
->thread_id
= vdo
->thread_config
.journal_thread
;
724 journal
->origin
= partition
->offset
;
725 journal
->nonce
= nonce
;
726 journal
->recovery_count
= compute_recovery_count_byte(recovery_count
);
727 journal
->size
= journal_size
;
728 journal
->slab_journal_commit_threshold
= (journal_size
* 2) / 3;
729 journal
->logical_blocks_used
= state
.logical_blocks_used
;
730 journal
->block_map_data_blocks
= state
.block_map_data_blocks
;
731 journal
->entries_per_block
= RECOVERY_JOURNAL_ENTRIES_PER_BLOCK
;
732 set_journal_tail(journal
, state
.journal_start
);
733 initialize_journal_state(journal
);
734 /* TODO: this will have to change if we make initial resume of a VDO a real resume */
735 vdo_set_admin_state_code(&journal
->state
, VDO_ADMIN_STATE_SUSPENDED
);
737 for (i
= 0; i
< RECOVERY_JOURNAL_RESERVED_BLOCKS
; i
++) {
738 struct recovery_journal_block
*block
= &journal
->blocks
[i
];
740 result
= initialize_recovery_block(vdo
, journal
, block
);
741 if (result
!= VDO_SUCCESS
) {
742 vdo_free_recovery_journal(journal
);
747 result
= initialize_lock_counter(journal
, vdo
);
748 if (result
!= VDO_SUCCESS
) {
749 vdo_free_recovery_journal(journal
);
753 result
= create_metadata_vio(vdo
, VIO_TYPE_RECOVERY_JOURNAL
, VIO_PRIORITY_HIGH
,
754 journal
, NULL
, &journal
->flush_vio
);
755 if (result
!= VDO_SUCCESS
) {
756 vdo_free_recovery_journal(journal
);
760 result
= vdo_register_read_only_listener(vdo
, journal
,
761 notify_recovery_journal_of_read_only_mode
,
763 if (result
!= VDO_SUCCESS
) {
764 vdo_free_recovery_journal(journal
);
768 result
= vdo_make_default_thread(vdo
, journal
->thread_id
);
769 if (result
!= VDO_SUCCESS
) {
770 vdo_free_recovery_journal(journal
);
774 journal
->flush_vio
->completion
.callback_thread_id
= journal
->thread_id
;
775 *journal_ptr
= journal
;
780 * vdo_free_recovery_journal() - Free a recovery journal.
781 * @journal: The recovery journal to free.
783 void vdo_free_recovery_journal(struct recovery_journal
*journal
)
790 vdo_free(vdo_forget(journal
->lock_counter
.logical_zone_counts
));
791 vdo_free(vdo_forget(journal
->lock_counter
.physical_zone_counts
));
792 vdo_free(vdo_forget(journal
->lock_counter
.journal_counters
));
793 vdo_free(vdo_forget(journal
->lock_counter
.journal_decrement_counts
));
794 vdo_free(vdo_forget(journal
->lock_counter
.logical_counters
));
795 vdo_free(vdo_forget(journal
->lock_counter
.physical_counters
));
796 free_vio(vdo_forget(journal
->flush_vio
));
799 * FIXME: eventually, the journal should be constructed in a quiescent state which
800 * requires opening before use.
802 if (!vdo_is_state_quiescent(&journal
->state
)) {
803 VDO_ASSERT_LOG_ONLY(list_empty(&journal
->active_tail_blocks
),
804 "journal being freed has no active tail blocks");
805 } else if (!vdo_is_state_saved(&journal
->state
) &&
806 !list_empty(&journal
->active_tail_blocks
)) {
807 vdo_log_warning("journal being freed has uncommitted entries");
810 for (i
= 0; i
< RECOVERY_JOURNAL_RESERVED_BLOCKS
; i
++) {
811 struct recovery_journal_block
*block
= &journal
->blocks
[i
];
813 vdo_free(vdo_forget(block
->vio
.data
));
814 free_vio_components(&block
->vio
);
821 * vdo_initialize_recovery_journal_post_repair() - Initialize the journal after a repair.
822 * @journal: The journal in question.
823 * @recovery_count: The number of completed recoveries.
824 * @tail: The new tail block sequence number.
825 * @logical_blocks_used: The new number of logical blocks used.
826 * @block_map_data_blocks: The new number of block map data blocks.
828 void vdo_initialize_recovery_journal_post_repair(struct recovery_journal
*journal
,
830 sequence_number_t tail
,
831 block_count_t logical_blocks_used
,
832 block_count_t block_map_data_blocks
)
834 set_journal_tail(journal
, tail
+ 1);
835 journal
->recovery_count
= compute_recovery_count_byte(recovery_count
);
836 initialize_journal_state(journal
);
837 journal
->logical_blocks_used
= logical_blocks_used
;
838 journal
->block_map_data_blocks
= block_map_data_blocks
;
842 * vdo_get_journal_block_map_data_blocks_used() - Get the number of block map pages, allocated from
843 * data blocks, currently in use.
844 * @journal: The journal in question.
846 * Return: The number of block map pages allocated from slabs.
848 block_count_t
vdo_get_journal_block_map_data_blocks_used(struct recovery_journal
*journal
)
850 return journal
->block_map_data_blocks
;
854 * vdo_get_recovery_journal_thread_id() - Get the ID of a recovery journal's thread.
855 * @journal: The journal to query.
857 * Return: The ID of the journal's thread.
859 thread_id_t
vdo_get_recovery_journal_thread_id(struct recovery_journal
*journal
)
861 return journal
->thread_id
;
865 * vdo_open_recovery_journal() - Prepare the journal for new entries.
866 * @journal: The journal in question.
867 * @depot: The slab depot for this VDO.
868 * @block_map: The block map for this VDO.
870 void vdo_open_recovery_journal(struct recovery_journal
*journal
,
871 struct slab_depot
*depot
, struct block_map
*block_map
)
873 journal
->depot
= depot
;
874 journal
->block_map
= block_map
;
875 WRITE_ONCE(journal
->state
.current_state
, VDO_ADMIN_STATE_NORMAL_OPERATION
);
879 * vdo_record_recovery_journal() - Record the state of a recovery journal for encoding in the super
881 * @journal: the recovery journal.
883 * Return: the state of the journal.
885 struct recovery_journal_state_7_0
886 vdo_record_recovery_journal(const struct recovery_journal
*journal
)
888 struct recovery_journal_state_7_0 state
= {
889 .logical_blocks_used
= journal
->logical_blocks_used
,
890 .block_map_data_blocks
= journal
->block_map_data_blocks
,
893 if (vdo_is_state_saved(&journal
->state
)) {
895 * If the journal is saved, we should start one past the active block (since the
896 * active block is not guaranteed to be empty).
898 state
.journal_start
= journal
->tail
;
901 * When we're merely suspended or have gone read-only, we must record the first
902 * block that might have entries that need to be applied.
904 state
.journal_start
= get_recovery_journal_head(journal
);
911 * get_block_header() - Get a pointer to the packed journal block header in the block buffer.
912 * @block: The recovery block.
914 * Return: The block's header.
916 static inline struct packed_journal_header
*
917 get_block_header(const struct recovery_journal_block
*block
)
919 return (struct packed_journal_header
*) block
->vio
.data
;
923 * set_active_sector() - Set the current sector of the current block and initialize it.
924 * @block: The block to update.
925 * @sector: A pointer to the first byte of the new sector.
927 static void set_active_sector(struct recovery_journal_block
*block
, void *sector
)
929 block
->sector
= sector
;
930 block
->sector
->check_byte
= get_block_header(block
)->check_byte
;
931 block
->sector
->recovery_count
= block
->journal
->recovery_count
;
932 block
->sector
->entry_count
= 0;
936 * advance_tail() - Advance the tail of the journal.
937 * @journal: The journal whose tail should be advanced.
939 * Return: true if the tail was advanced.
941 static bool advance_tail(struct recovery_journal
*journal
)
943 struct recovery_block_header unpacked
;
944 struct packed_journal_header
*header
;
945 struct recovery_journal_block
*block
;
947 block
= journal
->active_block
= pop_free_list(journal
);
951 list_move_tail(&block
->list_node
, &journal
->active_tail_blocks
);
953 unpacked
= (struct recovery_block_header
) {
954 .metadata_type
= VDO_METADATA_RECOVERY_JOURNAL_2
,
955 .block_map_data_blocks
= journal
->block_map_data_blocks
,
956 .logical_blocks_used
= journal
->logical_blocks_used
,
957 .nonce
= journal
->nonce
,
958 .recovery_count
= journal
->recovery_count
,
959 .sequence_number
= journal
->tail
,
960 .check_byte
= vdo_compute_recovery_journal_check_byte(journal
,
964 header
= get_block_header(block
);
965 memset(block
->vio
.data
, 0x0, VDO_BLOCK_SIZE
);
966 block
->sequence_number
= journal
->tail
;
967 block
->entry_count
= 0;
968 block
->uncommitted_entry_count
= 0;
969 block
->block_number
= vdo_get_recovery_journal_block_number(journal
,
972 vdo_pack_recovery_block_header(&unpacked
, header
);
973 set_active_sector(block
, vdo_get_journal_block_sector(header
, 1));
974 set_journal_tail(journal
, journal
->tail
+ 1);
975 vdo_advance_block_map_era(journal
->block_map
, journal
->tail
);
980 * initialize_lock_count() - Initialize the value of the journal zone's counter for a given lock.
981 * @journal: The recovery journal.
983 * Context: This must be called from the journal zone.
985 static void initialize_lock_count(struct recovery_journal
*journal
)
988 block_count_t lock_number
= journal
->active_block
->block_number
;
989 atomic_t
*decrement_counter
= get_decrement_counter(journal
, lock_number
);
991 journal_value
= get_counter(journal
, lock_number
, VDO_ZONE_TYPE_JOURNAL
, 0);
992 VDO_ASSERT_LOG_ONLY((*journal_value
== atomic_read(decrement_counter
)),
993 "count to be initialized not in use");
994 *journal_value
= journal
->entries_per_block
+ 1;
995 atomic_set(decrement_counter
, 0);
999 * prepare_to_assign_entry() - Prepare the currently active block to receive an entry and check
1000 * whether an entry of the given type may be assigned at this time.
1001 * @journal: The journal receiving an entry.
1003 * Return: true if there is space in the journal to store an entry of the specified type.
1005 static bool prepare_to_assign_entry(struct recovery_journal
*journal
)
1007 if (journal
->available_space
== 0)
1010 if (is_block_full(journal
->active_block
) && !advance_tail(journal
))
1013 if (!is_block_empty(journal
->active_block
))
1016 if ((journal
->tail
- get_recovery_journal_head(journal
)) > journal
->size
) {
1017 /* Cannot use this block since the journal is full. */
1018 journal
->events
.disk_full
++;
1023 * Don't allow the new block to be reaped until all of its entries have been committed to
1024 * the block map and until the journal block has been fully committed as well. Because the
1025 * block map update is done only after any slab journal entries have been made, the
1026 * per-entry lock for the block map entry serves to protect those as well.
1028 initialize_lock_count(journal
);
1032 static void write_blocks(struct recovery_journal
*journal
);
1035 * schedule_block_write() - Queue a block for writing.
1036 * @journal: The journal in question.
1037 * @block: The block which is now ready to write.
1039 * The block is expected to be full. If the block is currently writing, this is a noop as the block
1040 * will be queued for writing when the write finishes. The block must not currently be queued for
1043 static void schedule_block_write(struct recovery_journal
*journal
,
1044 struct recovery_journal_block
*block
)
1046 if (!block
->committing
)
1047 vdo_waitq_enqueue_waiter(&journal
->pending_writes
, &block
->write_waiter
);
1049 * At the end of adding entries, or discovering this partial block is now full and ready to
1050 * rewrite, we will call write_blocks() and write a whole batch.
1055 * release_journal_block_reference() - Release a reference to a journal block.
1056 * @block: The journal block from which to release a reference.
1058 static void release_journal_block_reference(struct recovery_journal_block
*block
)
1060 vdo_release_recovery_journal_block_reference(block
->journal
,
1061 block
->sequence_number
,
1062 VDO_ZONE_TYPE_JOURNAL
, 0);
1065 static void update_usages(struct recovery_journal
*journal
, struct data_vio
*data_vio
)
1067 if (data_vio
->increment_updater
.operation
== VDO_JOURNAL_BLOCK_MAP_REMAPPING
) {
1068 journal
->block_map_data_blocks
++;
1072 if (data_vio
->new_mapped
.state
!= VDO_MAPPING_STATE_UNMAPPED
)
1073 journal
->logical_blocks_used
++;
1075 if (data_vio
->mapped
.state
!= VDO_MAPPING_STATE_UNMAPPED
)
1076 journal
->logical_blocks_used
--;
1080 * assign_entry() - Assign an entry waiter to the active block.
1082 * Implements waiter_callback_fn.
1084 static void assign_entry(struct vdo_waiter
*waiter
, void *context
)
1086 struct data_vio
*data_vio
= vdo_waiter_as_data_vio(waiter
);
1087 struct recovery_journal_block
*block
= context
;
1088 struct recovery_journal
*journal
= block
->journal
;
1090 /* Record the point at which we will make the journal entry. */
1091 data_vio
->recovery_journal_point
= (struct journal_point
) {
1092 .sequence_number
= block
->sequence_number
,
1093 .entry_count
= block
->entry_count
,
1096 update_usages(journal
, data_vio
);
1097 journal
->available_space
--;
1099 if (!vdo_waitq_has_waiters(&block
->entry_waiters
))
1100 journal
->events
.blocks
.started
++;
1102 vdo_waitq_enqueue_waiter(&block
->entry_waiters
, &data_vio
->waiter
);
1103 block
->entry_count
++;
1104 block
->uncommitted_entry_count
++;
1105 journal
->events
.entries
.started
++;
1107 if (is_block_full(block
)) {
1109 * The block is full, so we can write it anytime henceforth. If it is already
1110 * committing, we'll queue it for writing when it comes back.
1112 schedule_block_write(journal
, block
);
1115 /* Force out slab journal tail blocks when threshold is reached. */
1116 check_slab_journal_commit_threshold(journal
);
1119 static void assign_entries(struct recovery_journal
*journal
)
1121 if (journal
->adding_entries
) {
1122 /* Protect against re-entrancy. */
1126 journal
->adding_entries
= true;
1127 while (vdo_waitq_has_waiters(&journal
->entry_waiters
) &&
1128 prepare_to_assign_entry(journal
)) {
1129 vdo_waitq_notify_next_waiter(&journal
->entry_waiters
,
1130 assign_entry
, journal
->active_block
);
1133 /* Now that we've finished with entries, see if we have a batch of blocks to write. */
1134 write_blocks(journal
);
1135 journal
->adding_entries
= false;
1139 * recycle_journal_block() - Prepare an in-memory journal block to be reused now that it has been
1141 * @block: The block to be recycled.
1143 static void recycle_journal_block(struct recovery_journal_block
*block
)
1145 struct recovery_journal
*journal
= block
->journal
;
1148 list_move_tail(&block
->list_node
, &journal
->free_tail_blocks
);
1150 /* Release any unused entry locks. */
1151 for (i
= block
->entry_count
; i
< journal
->entries_per_block
; i
++)
1152 release_journal_block_reference(block
);
1155 * Release our own lock against reaping now that the block is completely committed, or
1156 * we're giving up because we're in read-only mode.
1158 if (block
->entry_count
> 0)
1159 release_journal_block_reference(block
);
1161 if (block
== journal
->active_block
)
1162 journal
->active_block
= NULL
;
1166 * continue_committed_waiter() - invoked whenever a VIO is to be released from the journal because
1167 * its entry was committed to disk.
1169 * Implements waiter_callback_fn.
1171 static void continue_committed_waiter(struct vdo_waiter
*waiter
, void *context
)
1173 struct data_vio
*data_vio
= vdo_waiter_as_data_vio(waiter
);
1174 struct recovery_journal
*journal
= context
;
1175 int result
= (is_read_only(journal
) ? VDO_READ_ONLY
: VDO_SUCCESS
);
1178 VDO_ASSERT_LOG_ONLY(vdo_before_journal_point(&journal
->commit_point
,
1179 &data_vio
->recovery_journal_point
),
1180 "DataVIOs released from recovery journal in order. Recovery journal point is (%llu, %u), but commit waiter point is (%llu, %u)",
1181 (unsigned long long) journal
->commit_point
.sequence_number
,
1182 journal
->commit_point
.entry_count
,
1183 (unsigned long long) data_vio
->recovery_journal_point
.sequence_number
,
1184 data_vio
->recovery_journal_point
.entry_count
);
1186 journal
->commit_point
= data_vio
->recovery_journal_point
;
1187 data_vio
->last_async_operation
= VIO_ASYNC_OP_UPDATE_REFERENCE_COUNTS
;
1188 if (result
!= VDO_SUCCESS
) {
1189 continue_data_vio_with_error(data_vio
, result
);
1194 * The increment must be launched first since it must come before the
1195 * decrement if they are in the same slab.
1197 has_decrement
= (data_vio
->decrement_updater
.zpbn
.pbn
!= VDO_ZERO_BLOCK
);
1198 if ((data_vio
->increment_updater
.zpbn
.pbn
!= VDO_ZERO_BLOCK
) || !has_decrement
)
1199 continue_data_vio(data_vio
);
1202 vdo_launch_completion(&data_vio
->decrement_completion
);
1206 * notify_commit_waiters() - Notify any VIOs whose entries have now committed.
1207 * @journal: The recovery journal to update.
1209 static void notify_commit_waiters(struct recovery_journal
*journal
)
1211 struct recovery_journal_block
*block
;
1213 list_for_each_entry(block
, &journal
->active_tail_blocks
, list_node
) {
1214 if (block
->committing
)
1217 vdo_waitq_notify_all_waiters(&block
->commit_waiters
,
1218 continue_committed_waiter
, journal
);
1219 if (is_read_only(journal
)) {
1220 vdo_waitq_notify_all_waiters(&block
->entry_waiters
,
1221 continue_committed_waiter
,
1223 } else if (is_block_dirty(block
) || !is_block_full(block
)) {
1224 /* Stop at partially-committed or partially-filled blocks. */
1231 * recycle_journal_blocks() - Recycle any journal blocks which have been fully committed.
1232 * @journal: The recovery journal to update.
1234 static void recycle_journal_blocks(struct recovery_journal
*journal
)
1236 struct recovery_journal_block
*block
, *tmp
;
1238 list_for_each_entry_safe(block
, tmp
, &journal
->active_tail_blocks
, list_node
) {
1239 if (block
->committing
) {
1240 /* Don't recycle committing blocks. */
1244 if (!is_read_only(journal
) &&
1245 (is_block_dirty(block
) || !is_block_full(block
))) {
1247 * Don't recycle partially written or partially full blocks, except in
1253 recycle_journal_block(block
);
1258 * complete_write() - Handle post-commit processing.
1259 * @completion: The completion of the VIO writing this block.
1261 * This is the callback registered by write_block(). If more entries accumulated in the block being
1262 * committed while the commit was in progress, another commit will be initiated.
1264 static void complete_write(struct vdo_completion
*completion
)
1266 struct recovery_journal_block
*block
= completion
->parent
;
1267 struct recovery_journal
*journal
= block
->journal
;
1268 struct recovery_journal_block
*last_active_block
;
1270 assert_on_journal_thread(journal
, __func__
);
1272 journal
->pending_write_count
-= 1;
1273 journal
->events
.blocks
.committed
+= 1;
1274 journal
->events
.entries
.committed
+= block
->entries_in_commit
;
1275 block
->uncommitted_entry_count
-= block
->entries_in_commit
;
1276 block
->entries_in_commit
= 0;
1277 block
->committing
= false;
1279 /* If this block is the latest block to be acknowledged, record that fact. */
1280 if (block
->sequence_number
> journal
->last_write_acknowledged
)
1281 journal
->last_write_acknowledged
= block
->sequence_number
;
1283 last_active_block
= get_journal_block(&journal
->active_tail_blocks
);
1284 VDO_ASSERT_LOG_ONLY((block
->sequence_number
>= last_active_block
->sequence_number
),
1285 "completed journal write is still active");
1287 notify_commit_waiters(journal
);
1290 * Is this block now full? Reaping, and adding entries, might have already sent it off for
1291 * rewriting; else, queue it for rewrite.
1293 if (is_block_dirty(block
) && is_block_full(block
))
1294 schedule_block_write(journal
, block
);
1296 recycle_journal_blocks(journal
);
1297 write_blocks(journal
);
1299 check_for_drain_complete(journal
);
1302 static void handle_write_error(struct vdo_completion
*completion
)
1304 struct recovery_journal_block
*block
= completion
->parent
;
1305 struct recovery_journal
*journal
= block
->journal
;
1307 vio_record_metadata_io_error(as_vio(completion
));
1308 vdo_log_error_strerror(completion
->result
,
1309 "cannot write recovery journal block %llu",
1310 (unsigned long long) block
->sequence_number
);
1311 enter_journal_read_only_mode(journal
, completion
->result
);
1312 complete_write(completion
);
1315 static void complete_write_endio(struct bio
*bio
)
1317 struct vio
*vio
= bio
->bi_private
;
1318 struct recovery_journal_block
*block
= vio
->completion
.parent
;
1319 struct recovery_journal
*journal
= block
->journal
;
1321 continue_vio_after_io(vio
, complete_write
, journal
->thread_id
);
1325 * add_queued_recovery_entries() - Actually add entries from the queue to the given block.
1326 * @block: The journal block.
1328 static void add_queued_recovery_entries(struct recovery_journal_block
*block
)
1330 while (vdo_waitq_has_waiters(&block
->entry_waiters
)) {
1331 struct data_vio
*data_vio
=
1332 vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&block
->entry_waiters
));
1333 struct tree_lock
*lock
= &data_vio
->tree_lock
;
1334 struct packed_recovery_journal_entry
*packed_entry
;
1335 struct recovery_journal_entry new_entry
;
1337 if (block
->sector
->entry_count
== RECOVERY_JOURNAL_ENTRIES_PER_SECTOR
)
1338 set_active_sector(block
,
1339 (char *) block
->sector
+ VDO_SECTOR_SIZE
);
1341 /* Compose and encode the entry. */
1342 packed_entry
= &block
->sector
->entries
[block
->sector
->entry_count
++];
1343 new_entry
= (struct recovery_journal_entry
) {
1345 .pbn
= data_vio
->increment_updater
.zpbn
.pbn
,
1346 .state
= data_vio
->increment_updater
.zpbn
.state
,
1349 .pbn
= data_vio
->decrement_updater
.zpbn
.pbn
,
1350 .state
= data_vio
->decrement_updater
.zpbn
.state
,
1352 .operation
= data_vio
->increment_updater
.operation
,
1353 .slot
= lock
->tree_slots
[lock
->height
].block_map_slot
,
1355 *packed_entry
= vdo_pack_recovery_journal_entry(&new_entry
);
1356 data_vio
->recovery_sequence_number
= block
->sequence_number
;
1358 /* Enqueue the data_vio to wait for its entry to commit. */
1359 vdo_waitq_enqueue_waiter(&block
->commit_waiters
, &data_vio
->waiter
);
1364 * write_block() - Issue a block for writing.
1366 * Implements waiter_callback_fn.
1368 static void write_block(struct vdo_waiter
*waiter
, void __always_unused
*context
)
1370 struct recovery_journal_block
*block
=
1371 container_of(waiter
, struct recovery_journal_block
, write_waiter
);
1372 struct recovery_journal
*journal
= block
->journal
;
1373 struct packed_journal_header
*header
= get_block_header(block
);
1375 if (block
->committing
|| !vdo_waitq_has_waiters(&block
->entry_waiters
) ||
1376 is_read_only(journal
))
1379 block
->entries_in_commit
= vdo_waitq_num_waiters(&block
->entry_waiters
);
1380 add_queued_recovery_entries(block
);
1382 journal
->pending_write_count
+= 1;
1383 journal
->events
.blocks
.written
+= 1;
1384 journal
->events
.entries
.written
+= block
->entries_in_commit
;
1386 header
->block_map_head
= __cpu_to_le64(journal
->block_map_head
);
1387 header
->slab_journal_head
= __cpu_to_le64(journal
->slab_journal_head
);
1388 header
->entry_count
= __cpu_to_le16(block
->entry_count
);
1390 block
->committing
= true;
1393 * We must issue a flush and a FUA for every commit. The flush is necessary to ensure that
1394 * the data being referenced is stable. The FUA is necessary to ensure that the journal
1395 * block itself is stable before allowing overwrites of the lbn's previous data.
1397 vdo_submit_metadata_vio(&block
->vio
, journal
->origin
+ block
->block_number
,
1398 complete_write_endio
, handle_write_error
,
1399 REQ_OP_WRITE
| REQ_PRIO
| REQ_PREFLUSH
| REQ_SYNC
| REQ_FUA
);
1404 * write_blocks() - Attempt to commit blocks, according to write policy.
1405 * @journal: The recovery journal.
1407 static void write_blocks(struct recovery_journal
*journal
)
1409 assert_on_journal_thread(journal
, __func__
);
1411 * We call this function after adding entries to the journal and after finishing a block
1412 * write. Thus, when this function terminates we must either have no VIOs waiting in the
1413 * journal or have some outstanding IO to provide a future wakeup.
1415 * We want to only issue full blocks if there are no pending writes. However, if there are
1416 * no outstanding writes and some unwritten entries, we must issue a block, even if it's
1417 * the active block and it isn't full.
1419 if (journal
->pending_write_count
> 0)
1422 /* Write all the full blocks. */
1423 vdo_waitq_notify_all_waiters(&journal
->pending_writes
, write_block
, NULL
);
1426 * Do we need to write the active block? Only if we have no outstanding writes, even after
1427 * issuing all of the full writes.
1429 if ((journal
->pending_write_count
== 0) && (journal
->active_block
!= NULL
))
1430 write_block(&journal
->active_block
->write_waiter
, NULL
);
1434 * vdo_add_recovery_journal_entry() - Add an entry to a recovery journal.
1435 * @journal: The journal in which to make an entry.
1436 * @data_vio: The data_vio for which to add the entry. The entry will be taken
1437 * from the logical and new_mapped fields of the data_vio. The
1438 * data_vio's recovery_sequence_number field will be set to the
1439 * sequence number of the journal block in which the entry was
1442 * This method is asynchronous. The data_vio will not be called back until the entry is committed
1443 * to the on-disk journal.
1445 void vdo_add_recovery_journal_entry(struct recovery_journal
*journal
,
1446 struct data_vio
*data_vio
)
1448 assert_on_journal_thread(journal
, __func__
);
1449 if (!vdo_is_state_normal(&journal
->state
)) {
1450 continue_data_vio_with_error(data_vio
, VDO_INVALID_ADMIN_STATE
);
1454 if (is_read_only(journal
)) {
1455 continue_data_vio_with_error(data_vio
, VDO_READ_ONLY
);
1459 VDO_ASSERT_LOG_ONLY(data_vio
->recovery_sequence_number
== 0,
1460 "journal lock not held for new entry");
1462 vdo_advance_journal_point(&journal
->append_point
, journal
->entries_per_block
);
1463 vdo_waitq_enqueue_waiter(&journal
->entry_waiters
, &data_vio
->waiter
);
1464 assign_entries(journal
);
1468 * is_lock_locked() - Check whether a lock is locked for a zone type.
1469 * @journal: The recovery journal.
1470 * @lock_number: The lock to check.
1471 * @zone_type: The type of the zone.
1473 * If the recovery journal has a lock on the lock number, both logical and physical zones are
1474 * considered locked.
1476 * Return: true if the specified lock has references (is locked).
1478 static bool is_lock_locked(struct recovery_journal
*journal
, block_count_t lock_number
,
1479 enum vdo_zone_type zone_type
)
1481 atomic_t
*zone_count
;
1484 if (is_journal_zone_locked(journal
, lock_number
))
1487 zone_count
= get_zone_count_ptr(journal
, lock_number
, zone_type
);
1488 locked
= (atomic_read(zone_count
) != 0);
1489 /* Pairs with implicit barrier in vdo_release_recovery_journal_block_reference() */
1495 * reap_recovery_journal() - Conduct a sweep on a recovery journal to reclaim unreferenced blocks.
1496 * @journal: The recovery journal.
1498 static void reap_recovery_journal(struct recovery_journal
*journal
)
1500 if (journal
->reaping
) {
1502 * We already have an outstanding reap in progress. We need to wait for it to
1508 if (vdo_is_state_quiescent(&journal
->state
)) {
1509 /* We are supposed to not do IO. Don't botch it by reaping. */
1514 * Start reclaiming blocks only when the journal head has no references. Then stop when a
1515 * block is referenced.
1517 while ((journal
->block_map_reap_head
< journal
->last_write_acknowledged
) &&
1518 !is_lock_locked(journal
, journal
->block_map_head_block_number
,
1519 VDO_ZONE_TYPE_LOGICAL
)) {
1520 journal
->block_map_reap_head
++;
1521 if (++journal
->block_map_head_block_number
== journal
->size
)
1522 journal
->block_map_head_block_number
= 0;
1525 while ((journal
->slab_journal_reap_head
< journal
->last_write_acknowledged
) &&
1526 !is_lock_locked(journal
, journal
->slab_journal_head_block_number
,
1527 VDO_ZONE_TYPE_PHYSICAL
)) {
1528 journal
->slab_journal_reap_head
++;
1529 if (++journal
->slab_journal_head_block_number
== journal
->size
)
1530 journal
->slab_journal_head_block_number
= 0;
1533 if ((journal
->block_map_reap_head
== journal
->block_map_head
) &&
1534 (journal
->slab_journal_reap_head
== journal
->slab_journal_head
)) {
1535 /* Nothing happened. */
1540 * If the block map head will advance, we must flush any block map page modified by the
1541 * entries we are reaping. If the slab journal head will advance, we must flush the slab
1542 * summary update covering the slab journal that just released some lock.
1544 journal
->reaping
= true;
1545 vdo_submit_flush_vio(journal
->flush_vio
, flush_endio
, handle_flush_error
);
1549 * vdo_acquire_recovery_journal_block_reference() - Acquire a reference to a recovery journal block
1550 * from somewhere other than the journal itself.
1551 * @journal: The recovery journal.
1552 * @sequence_number: The journal sequence number of the referenced block.
1553 * @zone_type: The type of the zone making the adjustment.
1554 * @zone_id: The ID of the zone making the adjustment.
1556 void vdo_acquire_recovery_journal_block_reference(struct recovery_journal
*journal
,
1557 sequence_number_t sequence_number
,
1558 enum vdo_zone_type zone_type
,
1559 zone_count_t zone_id
)
1561 block_count_t lock_number
;
1564 if (sequence_number
== 0)
1567 VDO_ASSERT_LOG_ONLY((zone_type
!= VDO_ZONE_TYPE_JOURNAL
),
1568 "invalid lock count increment from journal zone");
1570 lock_number
= vdo_get_recovery_journal_block_number(journal
, sequence_number
);
1571 current_value
= get_counter(journal
, lock_number
, zone_type
, zone_id
);
1572 VDO_ASSERT_LOG_ONLY(*current_value
< U16_MAX
,
1573 "increment of lock counter must not overflow");
1575 if (*current_value
== 0) {
1577 * This zone is acquiring this lock for the first time. Extra barriers because this
1578 * was original developed using an atomic add operation that implicitly had them.
1580 smp_mb__before_atomic();
1581 atomic_inc(get_zone_count_ptr(journal
, lock_number
, zone_type
));
1582 /* same as before_atomic */
1583 smp_mb__after_atomic();
1586 *current_value
+= 1;
1590 * vdo_release_journal_entry_lock() - Release a single per-entry reference count for a recovery
1592 * @journal: The recovery journal.
1593 * @sequence_number: The journal sequence number of the referenced block.
1595 void vdo_release_journal_entry_lock(struct recovery_journal
*journal
,
1596 sequence_number_t sequence_number
)
1598 block_count_t lock_number
;
1600 if (sequence_number
== 0)
1603 lock_number
= vdo_get_recovery_journal_block_number(journal
, sequence_number
);
1605 * Extra barriers because this was originally developed using an atomic add operation that
1606 * implicitly had them.
1608 smp_mb__before_atomic();
1609 atomic_inc(get_decrement_counter(journal
, lock_number
));
1610 /* same as before_atomic */
1611 smp_mb__after_atomic();
1615 * initiate_drain() - Initiate a drain.
1617 * Implements vdo_admin_initiator_fn.
1619 static void initiate_drain(struct admin_state
*state
)
1621 check_for_drain_complete(container_of(state
, struct recovery_journal
, state
));
1625 * vdo_drain_recovery_journal() - Drain recovery journal I/O.
1626 * @journal: The journal to drain.
1627 * @operation: The drain operation (suspend or save).
1628 * @parent: The completion to notify once the journal is drained.
1630 * All uncommitted entries will be written out.
1632 void vdo_drain_recovery_journal(struct recovery_journal
*journal
,
1633 const struct admin_state_code
*operation
,
1634 struct vdo_completion
*parent
)
1636 assert_on_journal_thread(journal
, __func__
);
1637 vdo_start_draining(&journal
->state
, operation
, parent
, initiate_drain
);
1641 * resume_lock_counter() - Re-allow notifications from a suspended lock counter.
1642 * @counter: The counter.
1644 * Return: true if the lock counter was suspended.
1646 static bool resume_lock_counter(struct lock_counter
*counter
)
1651 * Extra barriers because this was original developed using a CAS operation that implicitly
1654 smp_mb__before_atomic();
1655 prior_state
= atomic_cmpxchg(&counter
->state
, LOCK_COUNTER_STATE_SUSPENDED
,
1656 LOCK_COUNTER_STATE_NOT_NOTIFYING
);
1657 /* same as before_atomic */
1658 smp_mb__after_atomic();
1660 return (prior_state
== LOCK_COUNTER_STATE_SUSPENDED
);
1664 * vdo_resume_recovery_journal() - Resume a recovery journal which has been drained.
1665 * @journal: The journal to resume.
1666 * @parent: The completion to finish once the journal is resumed.
1668 void vdo_resume_recovery_journal(struct recovery_journal
*journal
,
1669 struct vdo_completion
*parent
)
1673 assert_on_journal_thread(journal
, __func__
);
1674 saved
= vdo_is_state_saved(&journal
->state
);
1675 vdo_set_completion_result(parent
, vdo_resume_if_quiescent(&journal
->state
));
1676 if (is_read_only(journal
)) {
1677 vdo_continue_completion(parent
, VDO_READ_ONLY
);
1682 initialize_journal_state(journal
);
1684 if (resume_lock_counter(&journal
->lock_counter
)) {
1685 /* We might have missed a notification. */
1686 reap_recovery_journal(journal
);
1689 vdo_launch_completion(parent
);
1693 * vdo_get_recovery_journal_logical_blocks_used() - Get the number of logical blocks in use by the
1695 * @journal: The journal.
1697 * Return: The number of logical blocks in use by the VDO.
1699 block_count_t
vdo_get_recovery_journal_logical_blocks_used(const struct recovery_journal
*journal
)
1701 return journal
->logical_blocks_used
;
1705 * vdo_get_recovery_journal_statistics() - Get the current statistics from the recovery journal.
1706 * @journal: The recovery journal to query.
1708 * Return: A copy of the current statistics for the journal.
1710 struct recovery_journal_statistics
1711 vdo_get_recovery_journal_statistics(const struct recovery_journal
*journal
)
1713 return journal
->events
;
1717 * dump_recovery_block() - Dump the contents of the recovery block to the log.
1718 * @block: The block to dump.
1720 static void dump_recovery_block(const struct recovery_journal_block
*block
)
1722 vdo_log_info(" sequence number %llu; entries %u; %s; %zu entry waiters; %zu commit waiters",
1723 (unsigned long long) block
->sequence_number
, block
->entry_count
,
1724 (block
->committing
? "committing" : "waiting"),
1725 vdo_waitq_num_waiters(&block
->entry_waiters
),
1726 vdo_waitq_num_waiters(&block
->commit_waiters
));
1730 * vdo_dump_recovery_journal_statistics() - Dump some current statistics and other debug info from
1731 * the recovery journal.
1732 * @journal: The recovery journal to dump.
1734 void vdo_dump_recovery_journal_statistics(const struct recovery_journal
*journal
)
1736 const struct recovery_journal_block
*block
;
1737 struct recovery_journal_statistics stats
= vdo_get_recovery_journal_statistics(journal
);
1739 vdo_log_info("Recovery Journal");
1740 vdo_log_info(" block_map_head=%llu slab_journal_head=%llu last_write_acknowledged=%llu tail=%llu block_map_reap_head=%llu slab_journal_reap_head=%llu disk_full=%llu slab_journal_commits_requested=%llu entry_waiters=%zu",
1741 (unsigned long long) journal
->block_map_head
,
1742 (unsigned long long) journal
->slab_journal_head
,
1743 (unsigned long long) journal
->last_write_acknowledged
,
1744 (unsigned long long) journal
->tail
,
1745 (unsigned long long) journal
->block_map_reap_head
,
1746 (unsigned long long) journal
->slab_journal_reap_head
,
1747 (unsigned long long) stats
.disk_full
,
1748 (unsigned long long) stats
.slab_journal_commits_requested
,
1749 vdo_waitq_num_waiters(&journal
->entry_waiters
));
1750 vdo_log_info(" entries: started=%llu written=%llu committed=%llu",
1751 (unsigned long long) stats
.entries
.started
,
1752 (unsigned long long) stats
.entries
.written
,
1753 (unsigned long long) stats
.entries
.committed
);
1754 vdo_log_info(" blocks: started=%llu written=%llu committed=%llu",
1755 (unsigned long long) stats
.blocks
.started
,
1756 (unsigned long long) stats
.blocks
.written
,
1757 (unsigned long long) stats
.blocks
.committed
);
1759 vdo_log_info(" active blocks:");
1760 list_for_each_entry(block
, &journal
->active_tail_blocks
, list_node
)
1761 dump_recovery_block(block
);