1 // SPDX-License-Identifier: GPL-2.0
5 #include "btree_locking.h"
6 #include "btree_update.h"
7 #include "btree_update_interior.h"
8 #include "btree_write_buffer.h"
9 #include "disk_accounting.h"
13 #include "journal_io.h"
14 #include "journal_reclaim.h"
16 #include <linux/prefetch.h>
17 #include <linux/sort.h>
19 static int bch2_btree_write_buffer_journal_flush(struct journal
*,
20 struct journal_entry_pin
*, u64
);
22 static int bch2_journal_keys_to_write_buffer(struct bch_fs
*, struct journal_buf
*);
24 static inline bool __wb_key_ref_cmp(const struct wb_key_ref
*l
, const struct wb_key_ref
*r
)
26 return (cmp_int(l
->hi
, r
->hi
) ?:
27 cmp_int(l
->mi
, r
->mi
) ?:
28 cmp_int(l
->lo
, r
->lo
)) >= 0;
31 static inline bool wb_key_ref_cmp(const struct wb_key_ref
*l
, const struct wb_key_ref
*r
)
36 asm("mov (%[l]), %%rax;"
40 "mov 16(%[l]), %%rax;"
41 "sbb 16(%[r]), %%rax;"
43 : [l
] "r" (l
), [r
] "r" (r
)
46 EBUG_ON(cmp
!= __wb_key_ref_cmp(l
, r
));
49 return __wb_key_ref_cmp(l
, r
);
53 static int wb_key_seq_cmp(const void *_l
, const void *_r
)
55 const struct btree_write_buffered_key
*l
= _l
;
56 const struct btree_write_buffered_key
*r
= _r
;
58 return cmp_int(l
->journal_seq
, r
->journal_seq
);
61 /* Compare excluding idx, the low 24 bits: */
62 static inline bool wb_key_eq(const void *_l
, const void *_r
)
64 const struct wb_key_ref
*l
= _l
;
65 const struct wb_key_ref
*r
= _r
;
67 return !((l
->hi
^ r
->hi
)|
69 ((l
->lo
>> 24) ^ (r
->lo
>> 24)));
72 static noinline
void wb_sort(struct wb_key_ref
*base
, size_t num
)
74 size_t n
= num
, a
= num
/ 2;
76 if (!a
) /* num < 2 || size == 0 */
82 if (a
) /* Building heap: sift down --a */
84 else if (--n
) /* Sorting: Extract root to --n */
85 swap(base
[0], base
[n
]);
86 else /* Sort complete */
90 * Sift element at "a" down into heap. This is the
91 * "bottom-up" variant, which significantly reduces
92 * calls to cmp_func(): we find the sift-down path all
93 * the way to the leaves (one compare per level), then
94 * backtrack to find where to insert the target element.
96 * Because elements tend to sift down close to the leaves,
97 * this uses fewer compares than doing two per level
98 * on the way down. (A bit more than half as many on
99 * average, 3/4 worst-case.)
101 for (b
= a
; c
= 2*b
+ 1, (d
= c
+ 1) < n
;)
102 b
= wb_key_ref_cmp(base
+ c
, base
+ d
) ? c
: d
;
103 if (d
== n
) /* Special case last leaf with no sibling */
106 /* Now backtrack from "b" to the correct location for "a" */
107 while (b
!= a
&& wb_key_ref_cmp(base
+ a
, base
+ b
))
109 c
= b
; /* Where "a" belongs */
110 while (b
!= a
) { /* Shift it into place */
112 swap(base
[b
], base
[c
]);
117 static noinline
int wb_flush_one_slowpath(struct btree_trans
*trans
,
118 struct btree_iter
*iter
,
119 struct btree_write_buffered_key
*wb
)
121 struct btree_path
*path
= btree_iter_path(trans
, iter
);
123 bch2_btree_node_unlock_write(trans
, path
, path
->l
[0].b
);
125 trans
->journal_res
.seq
= wb
->journal_seq
;
127 return bch2_trans_update(trans
, iter
, &wb
->k
,
128 BTREE_UPDATE_internal_snapshot_node
) ?:
129 bch2_trans_commit(trans
, NULL
, NULL
,
130 BCH_TRANS_COMMIT_no_enospc
|
131 BCH_TRANS_COMMIT_no_check_rw
|
132 BCH_TRANS_COMMIT_no_journal_res
|
133 BCH_TRANS_COMMIT_journal_reclaim
);
136 static inline int wb_flush_one(struct btree_trans
*trans
, struct btree_iter
*iter
,
137 struct btree_write_buffered_key
*wb
,
139 bool *accounting_accumulated
,
142 struct btree_path
*path
;
145 EBUG_ON(!wb
->journal_seq
);
146 EBUG_ON(!trans
->c
->btree_write_buffer
.flushing
.pin
.seq
);
147 EBUG_ON(trans
->c
->btree_write_buffer
.flushing
.pin
.seq
> wb
->journal_seq
);
149 ret
= bch2_btree_iter_traverse(iter
);
153 if (!*accounting_accumulated
&& wb
->k
.k
.type
== KEY_TYPE_accounting
) {
155 struct bkey_s_c k
= bch2_btree_path_peek_slot_exact(btree_iter_path(trans
, iter
), &u
);
157 if (k
.k
->type
== KEY_TYPE_accounting
)
158 bch2_accounting_accumulate(bkey_i_to_accounting(&wb
->k
),
159 bkey_s_c_to_accounting(k
));
161 *accounting_accumulated
= true;
164 * We can't clone a path that has write locks: unshare it now, before
165 * set_pos and traverse():
167 if (btree_iter_path(trans
, iter
)->ref
> 1)
168 iter
->path
= __bch2_btree_path_make_mut(trans
, iter
->path
, true, _THIS_IP_
);
170 path
= btree_iter_path(trans
, iter
);
172 if (!*write_locked
) {
173 ret
= bch2_btree_node_lock_write(trans
, path
, &path
->l
[0].b
->c
);
177 bch2_btree_node_prep_for_write(trans
, path
, path
->l
[0].b
);
178 *write_locked
= true;
181 if (unlikely(!bch2_btree_node_insert_fits(path
->l
[0].b
, wb
->k
.k
.u64s
))) {
182 *write_locked
= false;
183 return wb_flush_one_slowpath(trans
, iter
, wb
);
186 bch2_btree_insert_key_leaf(trans
, path
, &wb
->k
, wb
->journal_seq
);
192 * Update a btree with a write buffered key using the journal seq of the
193 * original write buffer insert.
195 * It is not safe to rejournal the key once it has been inserted into the write
196 * buffer because that may break recovery ordering. For example, the key may
197 * have already been modified in the active write buffer in a seq that comes
198 * before the current transaction. If we were to journal this key again and
199 * crash, recovery would process updates in the wrong order.
202 btree_write_buffered_insert(struct btree_trans
*trans
,
203 struct btree_write_buffered_key
*wb
)
205 struct btree_iter iter
;
208 bch2_trans_iter_init(trans
, &iter
, wb
->btree
, bkey_start_pos(&wb
->k
.k
),
209 BTREE_ITER_cached
|BTREE_ITER_intent
);
211 trans
->journal_res
.seq
= wb
->journal_seq
;
213 ret
= bch2_btree_iter_traverse(&iter
) ?:
214 bch2_trans_update(trans
, &iter
, &wb
->k
,
215 BTREE_UPDATE_internal_snapshot_node
);
216 bch2_trans_iter_exit(trans
, &iter
);
220 static void move_keys_from_inc_to_flushing(struct btree_write_buffer
*wb
)
222 struct bch_fs
*c
= container_of(wb
, struct bch_fs
, btree_write_buffer
);
223 struct journal
*j
= &c
->journal
;
225 if (!wb
->inc
.keys
.nr
)
228 bch2_journal_pin_add(j
, wb
->inc
.keys
.data
[0].journal_seq
, &wb
->flushing
.pin
,
229 bch2_btree_write_buffer_journal_flush
);
231 darray_resize(&wb
->flushing
.keys
, min_t(size_t, 1U << 20, wb
->flushing
.keys
.nr
+ wb
->inc
.keys
.nr
));
232 darray_resize(&wb
->sorted
, wb
->flushing
.keys
.size
);
234 if (!wb
->flushing
.keys
.nr
&& wb
->sorted
.size
>= wb
->inc
.keys
.nr
) {
235 swap(wb
->flushing
.keys
, wb
->inc
.keys
);
239 size_t nr
= min(darray_room(wb
->flushing
.keys
),
240 wb
->sorted
.size
- wb
->flushing
.keys
.nr
);
241 nr
= min(nr
, wb
->inc
.keys
.nr
);
243 memcpy(&darray_top(wb
->flushing
.keys
),
245 sizeof(wb
->inc
.keys
.data
[0]) * nr
);
247 memmove(wb
->inc
.keys
.data
,
248 wb
->inc
.keys
.data
+ nr
,
249 sizeof(wb
->inc
.keys
.data
[0]) * (wb
->inc
.keys
.nr
- nr
));
251 wb
->flushing
.keys
.nr
+= nr
;
252 wb
->inc
.keys
.nr
-= nr
;
254 if (!wb
->inc
.keys
.nr
)
255 bch2_journal_pin_drop(j
, &wb
->inc
.pin
);
257 bch2_journal_pin_update(j
, wb
->inc
.keys
.data
[0].journal_seq
, &wb
->inc
.pin
,
258 bch2_btree_write_buffer_journal_flush
);
262 bch2_journal_set_watermark(j
);
263 spin_unlock(&j
->lock
);
266 BUG_ON(wb
->sorted
.size
< wb
->flushing
.keys
.nr
);
269 static int bch2_btree_write_buffer_flush_locked(struct btree_trans
*trans
)
271 struct bch_fs
*c
= trans
->c
;
272 struct journal
*j
= &c
->journal
;
273 struct btree_write_buffer
*wb
= &c
->btree_write_buffer
;
274 struct btree_iter iter
= { NULL
};
275 size_t overwritten
= 0, fast
= 0, slowpath
= 0, could_not_insert
= 0;
276 bool write_locked
= false;
277 bool accounting_replay_done
= test_bit(BCH_FS_accounting_replay_done
, &c
->flags
);
280 ret
= bch2_journal_error(&c
->journal
);
284 bch2_trans_unlock(trans
);
285 bch2_trans_begin(trans
);
287 mutex_lock(&wb
->inc
.lock
);
288 move_keys_from_inc_to_flushing(wb
);
289 mutex_unlock(&wb
->inc
.lock
);
291 for (size_t i
= 0; i
< wb
->flushing
.keys
.nr
; i
++) {
292 wb
->sorted
.data
[i
].idx
= i
;
293 wb
->sorted
.data
[i
].btree
= wb
->flushing
.keys
.data
[i
].btree
;
294 memcpy(&wb
->sorted
.data
[i
].pos
, &wb
->flushing
.keys
.data
[i
].k
.k
.p
, sizeof(struct bpos
));
296 wb
->sorted
.nr
= wb
->flushing
.keys
.nr
;
299 * We first sort so that we can detect and skip redundant updates, and
300 * then we attempt to flush in sorted btree order, as this is most
303 * However, since we're not flushing in the order they appear in the
304 * journal we won't be able to drop our journal pin until everything is
305 * flushed - which means this could deadlock the journal if we weren't
306 * passing BCH_TRANS_COMMIT_journal_reclaim. This causes the update to fail
307 * if it would block taking a journal reservation.
309 * If that happens, simply skip the key so we can optimistically insert
310 * as many keys as possible in the fast path.
312 wb_sort(wb
->sorted
.data
, wb
->sorted
.nr
);
314 darray_for_each(wb
->sorted
, i
) {
315 struct btree_write_buffered_key
*k
= &wb
->flushing
.keys
.data
[i
->idx
];
317 for (struct wb_key_ref
*n
= i
+ 1; n
< min(i
+ 4, &darray_top(wb
->sorted
)); n
++)
318 prefetch(&wb
->flushing
.keys
.data
[n
->idx
]);
320 BUG_ON(!k
->journal_seq
);
322 if (!accounting_replay_done
&&
323 k
->k
.k
.type
== KEY_TYPE_accounting
) {
328 if (i
+ 1 < &darray_top(wb
->sorted
) &&
329 wb_key_eq(i
, i
+ 1)) {
330 struct btree_write_buffered_key
*n
= &wb
->flushing
.keys
.data
[i
[1].idx
];
332 if (k
->k
.k
.type
== KEY_TYPE_accounting
&&
333 n
->k
.k
.type
== KEY_TYPE_accounting
)
334 bch2_accounting_accumulate(bkey_i_to_accounting(&n
->k
),
335 bkey_i_to_s_c_accounting(&k
->k
));
338 n
->journal_seq
= min_t(u64
, n
->journal_seq
, k
->journal_seq
);
344 struct btree_path
*path
= btree_iter_path(trans
, &iter
);
346 if (path
->btree_id
!= i
->btree
||
347 bpos_gt(k
->k
.k
.p
, path
->l
[0].b
->key
.k
.p
)) {
348 bch2_btree_node_unlock_write(trans
, path
, path
->l
[0].b
);
349 write_locked
= false;
351 ret
= lockrestart_do(trans
,
352 bch2_btree_iter_traverse(&iter
) ?:
353 bch2_foreground_maybe_merge(trans
, iter
.path
, 0,
354 BCH_WATERMARK_reclaim
|
355 BCH_TRANS_COMMIT_journal_reclaim
|
356 BCH_TRANS_COMMIT_no_check_rw
|
357 BCH_TRANS_COMMIT_no_enospc
));
363 if (!iter
.path
|| iter
.btree_id
!= k
->btree
) {
364 bch2_trans_iter_exit(trans
, &iter
);
365 bch2_trans_iter_init(trans
, &iter
, k
->btree
, k
->k
.k
.p
,
366 BTREE_ITER_intent
|BTREE_ITER_all_snapshots
);
369 bch2_btree_iter_set_pos(&iter
, k
->k
.k
.p
);
370 btree_iter_path(trans
, &iter
)->preserve
= false;
372 bool accounting_accumulated
= false;
375 ret
= -BCH_ERR_journal_reclaim_would_deadlock
;
379 ret
= wb_flush_one(trans
, &iter
, k
, &write_locked
,
380 &accounting_accumulated
, &fast
);
382 bch2_trans_begin(trans
);
383 } while (bch2_err_matches(ret
, BCH_ERR_transaction_restart
));
387 } else if (ret
== -BCH_ERR_journal_reclaim_would_deadlock
) {
395 struct btree_path
*path
= btree_iter_path(trans
, &iter
);
396 bch2_btree_node_unlock_write(trans
, path
, path
->l
[0].b
);
398 bch2_trans_iter_exit(trans
, &iter
);
405 * Flush in the order they were present in the journal, so that
406 * we can release journal pins:
407 * The fastpath zapped the seq of keys that were successfully flushed so
408 * we can skip those here.
410 trace_and_count(c
, write_buffer_flush_slowpath
, trans
, slowpath
, wb
->flushing
.keys
.nr
);
412 sort(wb
->flushing
.keys
.data
,
413 wb
->flushing
.keys
.nr
,
414 sizeof(wb
->flushing
.keys
.data
[0]),
415 wb_key_seq_cmp
, NULL
);
417 darray_for_each(wb
->flushing
.keys
, i
) {
421 if (!accounting_replay_done
&&
422 i
->k
.k
.type
== KEY_TYPE_accounting
) {
427 if (!could_not_insert
)
428 bch2_journal_pin_update(j
, i
->journal_seq
, &wb
->flushing
.pin
,
429 bch2_btree_write_buffer_journal_flush
);
431 bch2_trans_begin(trans
);
433 ret
= commit_do(trans
, NULL
, NULL
,
434 BCH_WATERMARK_reclaim
|
435 BCH_TRANS_COMMIT_journal_reclaim
|
436 BCH_TRANS_COMMIT_no_check_rw
|
437 BCH_TRANS_COMMIT_no_enospc
|
438 BCH_TRANS_COMMIT_no_journal_res
,
439 btree_write_buffered_insert(trans
, i
));
447 * If journal replay hasn't finished with accounting keys we
448 * can't flush accounting keys at all - condense them and leave
449 * them for next time.
451 * Q: Can the write buffer overflow?
452 * A Shouldn't be any actual risk. It's just new accounting
453 * updates that the write buffer can't flush, and those are only
454 * going to be generated by interior btree node updates as
455 * journal replay has to split/rewrite nodes to make room for
458 * And for those new acounting updates, updates to the same
459 * counters get accumulated as they're flushed from the journal
460 * to the write buffer - see the patch for eytzingcer tree
461 * accumulated. So we could only overflow if the number of
462 * distinct counters touched somehow was very large.
464 if (could_not_insert
) {
465 struct btree_write_buffered_key
*dst
= wb
->flushing
.keys
.data
;
467 darray_for_each(wb
->flushing
.keys
, i
)
470 wb
->flushing
.keys
.nr
= dst
- wb
->flushing
.keys
.data
;
474 if (ret
|| !could_not_insert
) {
475 bch2_journal_pin_drop(j
, &wb
->flushing
.pin
);
476 wb
->flushing
.keys
.nr
= 0;
479 bch2_fs_fatal_err_on(ret
, c
, "%s", bch2_err_str(ret
));
480 trace_write_buffer_flush(trans
, wb
->flushing
.keys
.nr
, overwritten
, fast
, 0);
484 static int fetch_wb_keys_from_journal(struct bch_fs
*c
, u64 seq
)
486 struct journal
*j
= &c
->journal
;
487 struct journal_buf
*buf
;
490 while (!ret
&& (buf
= bch2_next_write_buffer_flush_journal_buf(j
, seq
))) {
491 ret
= bch2_journal_keys_to_write_buffer(c
, buf
);
492 mutex_unlock(&j
->buf_lock
);
498 static int btree_write_buffer_flush_seq(struct btree_trans
*trans
, u64 seq
,
501 struct bch_fs
*c
= trans
->c
;
502 struct btree_write_buffer
*wb
= &c
->btree_write_buffer
;
503 int ret
= 0, fetch_from_journal_err
;
506 bch2_trans_unlock(trans
);
508 fetch_from_journal_err
= fetch_wb_keys_from_journal(c
, seq
);
510 *did_work
|= wb
->inc
.keys
.nr
|| wb
->flushing
.keys
.nr
;
513 * On memory allocation failure, bch2_btree_write_buffer_flush_locked()
514 * is not guaranteed to empty wb->inc:
516 mutex_lock(&wb
->flushing
.lock
);
517 ret
= bch2_btree_write_buffer_flush_locked(trans
);
518 mutex_unlock(&wb
->flushing
.lock
);
520 (fetch_from_journal_err
||
521 (wb
->inc
.pin
.seq
&& wb
->inc
.pin
.seq
<= seq
) ||
522 (wb
->flushing
.pin
.seq
&& wb
->flushing
.pin
.seq
<= seq
)));
527 static int bch2_btree_write_buffer_journal_flush(struct journal
*j
,
528 struct journal_entry_pin
*_pin
, u64 seq
)
530 struct bch_fs
*c
= container_of(j
, struct bch_fs
, journal
);
531 bool did_work
= false;
533 return bch2_trans_run(c
, btree_write_buffer_flush_seq(trans
, seq
, &did_work
));
536 int bch2_btree_write_buffer_flush_sync(struct btree_trans
*trans
)
538 struct bch_fs
*c
= trans
->c
;
539 bool did_work
= false;
541 trace_and_count(c
, write_buffer_flush_sync
, trans
, _RET_IP_
);
543 return btree_write_buffer_flush_seq(trans
, journal_cur_seq(&c
->journal
), &did_work
);
547 * The write buffer requires flushing when going RO: keys in the journal for the
548 * write buffer don't have a journal pin yet
550 bool bch2_btree_write_buffer_flush_going_ro(struct bch_fs
*c
)
552 if (bch2_journal_error(&c
->journal
))
555 bool did_work
= false;
556 bch2_trans_run(c
, btree_write_buffer_flush_seq(trans
,
557 journal_cur_seq(&c
->journal
), &did_work
));
561 int bch2_btree_write_buffer_flush_nocheck_rw(struct btree_trans
*trans
)
563 struct bch_fs
*c
= trans
->c
;
564 struct btree_write_buffer
*wb
= &c
->btree_write_buffer
;
567 if (mutex_trylock(&wb
->flushing
.lock
)) {
568 ret
= bch2_btree_write_buffer_flush_locked(trans
);
569 mutex_unlock(&wb
->flushing
.lock
);
575 int bch2_btree_write_buffer_tryflush(struct btree_trans
*trans
)
577 struct bch_fs
*c
= trans
->c
;
579 if (!bch2_write_ref_tryget(c
, BCH_WRITE_REF_btree_write_buffer
))
580 return -BCH_ERR_erofs_no_writes
;
582 int ret
= bch2_btree_write_buffer_flush_nocheck_rw(trans
);
583 bch2_write_ref_put(c
, BCH_WRITE_REF_btree_write_buffer
);
588 * In check and repair code, when checking references to write buffer btrees we
589 * need to issue a flush before we have a definitive error: this issues a flush
590 * if this is a key we haven't yet checked.
592 int bch2_btree_write_buffer_maybe_flush(struct btree_trans
*trans
,
593 struct bkey_s_c referring_k
,
594 struct bkey_buf
*last_flushed
)
596 struct bch_fs
*c
= trans
->c
;
600 bch2_bkey_buf_init(&tmp
);
602 if (!bkey_and_val_eq(referring_k
, bkey_i_to_s_c(last_flushed
->k
))) {
603 bch2_bkey_buf_reassemble(&tmp
, c
, referring_k
);
605 if (bkey_is_btree_ptr(referring_k
.k
)) {
606 bch2_trans_unlock(trans
);
607 bch2_btree_interior_updates_flush(c
);
610 ret
= bch2_btree_write_buffer_flush_sync(trans
);
614 bch2_bkey_buf_copy(last_flushed
, c
, tmp
.k
);
615 ret
= -BCH_ERR_transaction_restart_write_buffer_flush
;
618 bch2_bkey_buf_exit(&tmp
, c
);
622 static void bch2_btree_write_buffer_flush_work(struct work_struct
*work
)
624 struct bch_fs
*c
= container_of(work
, struct bch_fs
, btree_write_buffer
.flush_work
);
625 struct btree_write_buffer
*wb
= &c
->btree_write_buffer
;
628 mutex_lock(&wb
->flushing
.lock
);
630 ret
= bch2_trans_run(c
, bch2_btree_write_buffer_flush_locked(trans
));
631 } while (!ret
&& bch2_btree_write_buffer_should_flush(c
));
632 mutex_unlock(&wb
->flushing
.lock
);
634 bch2_write_ref_put(c
, BCH_WRITE_REF_btree_write_buffer
);
637 static void wb_accounting_sort(struct btree_write_buffer
*wb
)
639 eytzinger0_sort(wb
->accounting
.data
, wb
->accounting
.nr
,
640 sizeof(wb
->accounting
.data
[0]),
644 int bch2_accounting_key_to_wb_slowpath(struct bch_fs
*c
, enum btree_id btree
,
645 struct bkey_i_accounting
*k
)
647 struct btree_write_buffer
*wb
= &c
->btree_write_buffer
;
648 struct btree_write_buffered_key
new = { .btree
= btree
};
650 bkey_copy(&new.k
, &k
->k_i
);
652 int ret
= darray_push(&wb
->accounting
, new);
656 wb_accounting_sort(wb
);
660 int bch2_journal_key_to_wb_slowpath(struct bch_fs
*c
,
661 struct journal_keys_to_wb
*dst
,
662 enum btree_id btree
, struct bkey_i
*k
)
664 struct btree_write_buffer
*wb
= &c
->btree_write_buffer
;
667 ret
= darray_make_room_gfp(&dst
->wb
->keys
, 1, GFP_KERNEL
);
668 if (!ret
&& dst
->wb
== &wb
->flushing
)
669 ret
= darray_resize(&wb
->sorted
, wb
->flushing
.keys
.size
);
672 if (dst
->wb
== &c
->btree_write_buffer
.flushing
) {
673 mutex_unlock(&dst
->wb
->lock
);
674 dst
->wb
= &c
->btree_write_buffer
.inc
;
675 bch2_journal_pin_add(&c
->journal
, dst
->seq
, &dst
->wb
->pin
,
676 bch2_btree_write_buffer_journal_flush
);
683 dst
->room
= darray_room(dst
->wb
->keys
);
684 if (dst
->wb
== &wb
->flushing
)
685 dst
->room
= min(dst
->room
, wb
->sorted
.size
- wb
->flushing
.keys
.nr
);
689 struct btree_write_buffered_key
*wb_k
= &darray_top(dst
->wb
->keys
);
690 wb_k
->journal_seq
= dst
->seq
;
692 bkey_copy(&wb_k
->k
, k
);
698 void bch2_journal_keys_to_write_buffer_start(struct bch_fs
*c
, struct journal_keys_to_wb
*dst
, u64 seq
)
700 struct btree_write_buffer
*wb
= &c
->btree_write_buffer
;
702 if (mutex_trylock(&wb
->flushing
.lock
)) {
703 mutex_lock(&wb
->inc
.lock
);
704 move_keys_from_inc_to_flushing(wb
);
707 * Attempt to skip wb->inc, and add keys directly to
708 * wb->flushing, saving us a copy later:
711 if (!wb
->inc
.keys
.nr
) {
712 dst
->wb
= &wb
->flushing
;
714 mutex_unlock(&wb
->flushing
.lock
);
718 mutex_lock(&wb
->inc
.lock
);
722 dst
->room
= darray_room(dst
->wb
->keys
);
723 if (dst
->wb
== &wb
->flushing
)
724 dst
->room
= min(dst
->room
, wb
->sorted
.size
- wb
->flushing
.keys
.nr
);
727 bch2_journal_pin_add(&c
->journal
, seq
, &dst
->wb
->pin
,
728 bch2_btree_write_buffer_journal_flush
);
730 darray_for_each(wb
->accounting
, i
)
731 memset(&i
->k
.v
, 0, bkey_val_bytes(&i
->k
.k
));
734 int bch2_journal_keys_to_write_buffer_end(struct bch_fs
*c
, struct journal_keys_to_wb
*dst
)
736 struct btree_write_buffer
*wb
= &c
->btree_write_buffer
;
737 unsigned live_accounting_keys
= 0;
740 darray_for_each(wb
->accounting
, i
)
741 if (!bch2_accounting_key_is_zero(bkey_i_to_s_c_accounting(&i
->k
))) {
742 i
->journal_seq
= dst
->seq
;
743 live_accounting_keys
++;
744 ret
= __bch2_journal_key_to_wb(c
, dst
, i
->btree
, &i
->k
);
749 if (live_accounting_keys
* 2 < wb
->accounting
.nr
) {
750 struct btree_write_buffered_key
*dst
= wb
->accounting
.data
;
752 darray_for_each(wb
->accounting
, src
)
753 if (!bch2_accounting_key_is_zero(bkey_i_to_s_c_accounting(&src
->k
)))
755 wb
->accounting
.nr
= dst
- wb
->accounting
.data
;
756 wb_accounting_sort(wb
);
759 if (!dst
->wb
->keys
.nr
)
760 bch2_journal_pin_drop(&c
->journal
, &dst
->wb
->pin
);
762 if (bch2_btree_write_buffer_should_flush(c
) &&
763 __bch2_write_ref_tryget(c
, BCH_WRITE_REF_btree_write_buffer
) &&
764 !queue_work(system_unbound_wq
, &c
->btree_write_buffer
.flush_work
))
765 bch2_write_ref_put(c
, BCH_WRITE_REF_btree_write_buffer
);
767 if (dst
->wb
== &wb
->flushing
)
768 mutex_unlock(&wb
->flushing
.lock
);
769 mutex_unlock(&wb
->inc
.lock
);
774 static int bch2_journal_keys_to_write_buffer(struct bch_fs
*c
, struct journal_buf
*buf
)
776 struct journal_keys_to_wb dst
;
779 bch2_journal_keys_to_write_buffer_start(c
, &dst
, le64_to_cpu(buf
->data
->seq
));
781 for_each_jset_entry_type(entry
, buf
->data
, BCH_JSET_ENTRY_write_buffer_keys
) {
782 jset_entry_for_each_key(entry
, k
) {
783 ret
= bch2_journal_key_to_wb(c
, &dst
, entry
->btree_id
, k
);
788 entry
->type
= BCH_JSET_ENTRY_btree_keys
;
791 spin_lock(&c
->journal
.lock
);
792 buf
->need_flush_to_write_buffer
= false;
793 spin_unlock(&c
->journal
.lock
);
795 ret
= bch2_journal_keys_to_write_buffer_end(c
, &dst
) ?: ret
;
799 static int wb_keys_resize(struct btree_write_buffer_keys
*wb
, size_t new_size
)
801 if (wb
->keys
.size
>= new_size
)
804 if (!mutex_trylock(&wb
->lock
))
807 int ret
= darray_resize(&wb
->keys
, new_size
);
808 mutex_unlock(&wb
->lock
);
812 int bch2_btree_write_buffer_resize(struct bch_fs
*c
, size_t new_size
)
814 struct btree_write_buffer
*wb
= &c
->btree_write_buffer
;
816 return wb_keys_resize(&wb
->flushing
, new_size
) ?:
817 wb_keys_resize(&wb
->inc
, new_size
);
820 void bch2_fs_btree_write_buffer_exit(struct bch_fs
*c
)
822 struct btree_write_buffer
*wb
= &c
->btree_write_buffer
;
824 BUG_ON((wb
->inc
.keys
.nr
|| wb
->flushing
.keys
.nr
) &&
825 !bch2_journal_error(&c
->journal
));
827 darray_exit(&wb
->accounting
);
828 darray_exit(&wb
->sorted
);
829 darray_exit(&wb
->flushing
.keys
);
830 darray_exit(&wb
->inc
.keys
);
833 int bch2_fs_btree_write_buffer_init(struct bch_fs
*c
)
835 struct btree_write_buffer
*wb
= &c
->btree_write_buffer
;
837 mutex_init(&wb
->inc
.lock
);
838 mutex_init(&wb
->flushing
.lock
);
839 INIT_WORK(&wb
->flush_work
, bch2_btree_write_buffer_flush_work
);
841 /* Will be resized by journal as needed: */
842 unsigned initial_size
= 1 << 16;
844 return darray_make_room(&wb
->inc
.keys
, initial_size
) ?:
845 darray_make_room(&wb
->flushing
.keys
, initial_size
) ?:
846 darray_make_room(&wb
->sorted
, initial_size
);