4 * This file and its contents are supplied under the terms of the
5 * Common Development and Distribution License ("CDDL"), version 1.0.
6 * You may only use this file in accordance with the terms of version
9 * A full copy of the text of the CDDL should have accompanied this
10 * source. A copy of the CDDL is also available via the Internet at
11 * http://www.illumos.org/license/CDDL.
16 * Copyright (c) 2014, 2018 by Delphix. All rights reserved.
19 #include <sys/bqueue.h>
20 #include <sys/zfs_context.h>
22 static inline bqueue_node_t
*
23 obj2node(bqueue_t
*q
, void *data
)
25 return ((bqueue_node_t
*)((char *)data
+ q
->bq_node_offset
));
29 * Initialize a blocking queue The maximum capacity of the queue is set to
30 * size. Types that are stored in a bqueue must contain a bqueue_node_t, and
31 * node_offset must be its offset from the start of the struct. fill_fraction
32 * is a performance tuning value; when the queue is full, any threads
33 * attempting to enqueue records will block. They will block until they're
34 * signaled, which will occur when the queue is at least 1/fill_fraction
35 * empty. Similar behavior occurs on dequeue; if the queue is empty, threads
36 * block. They will be signalled when the queue has 1/fill_fraction full.
37 * As a result, you must call bqueue_enqueue_flush() when you enqueue your
38 * final record on a thread, in case the dequeuing threads are currently
39 * blocked and that enqueue does not cause them to be woken. Alternatively,
40 * this behavior can be disabled (causing signaling to happen immediately) by
41 * setting fill_fraction to any value larger than size. Return 0 on success,
44 * Note: The caller must ensure that for a given bqueue_t, there's only a
45 * single call to bqueue_enqueue() running at a time (e.g. by calling only
46 * from a single thread, or with locking around the call). Similarly, the
47 * caller must ensure that there's only a single call to bqueue_dequeue()
48 * running at a time. However, the one call to bqueue_enqueue() may be
49 * invoked concurrently with the one call to bqueue_dequeue().
52 bqueue_init(bqueue_t
*q
, uint_t fill_fraction
, size_t size
, size_t node_offset
)
54 if (fill_fraction
== 0) {
57 list_create(&q
->bq_list
, node_offset
+ sizeof (bqueue_node_t
),
58 node_offset
+ offsetof(bqueue_node_t
, bqn_node
));
59 list_create(&q
->bq_dequeuing_list
, node_offset
+ sizeof (bqueue_node_t
),
60 node_offset
+ offsetof(bqueue_node_t
, bqn_node
));
61 list_create(&q
->bq_enqueuing_list
, node_offset
+ sizeof (bqueue_node_t
),
62 node_offset
+ offsetof(bqueue_node_t
, bqn_node
));
63 cv_init(&q
->bq_add_cv
, NULL
, CV_DEFAULT
, NULL
);
64 cv_init(&q
->bq_pop_cv
, NULL
, CV_DEFAULT
, NULL
);
65 mutex_init(&q
->bq_lock
, NULL
, MUTEX_DEFAULT
, NULL
);
66 q
->bq_node_offset
= node_offset
;
68 q
->bq_dequeuing_size
= 0;
69 q
->bq_enqueuing_size
= 0;
71 q
->bq_fill_fraction
= fill_fraction
;
76 * Destroy a blocking queue. This function asserts that there are no
77 * elements in the queue, and no one is blocked on the condition
81 bqueue_destroy(bqueue_t
*q
)
83 mutex_enter(&q
->bq_lock
);
85 ASSERT0(q
->bq_dequeuing_size
);
86 ASSERT0(q
->bq_enqueuing_size
);
87 cv_destroy(&q
->bq_add_cv
);
88 cv_destroy(&q
->bq_pop_cv
);
89 list_destroy(&q
->bq_list
);
90 list_destroy(&q
->bq_dequeuing_list
);
91 list_destroy(&q
->bq_enqueuing_list
);
92 mutex_exit(&q
->bq_lock
);
93 mutex_destroy(&q
->bq_lock
);
97 bqueue_enqueue_impl(bqueue_t
*q
, void *data
, size_t item_size
, boolean_t flush
)
99 ASSERT3U(item_size
, >, 0);
100 ASSERT3U(item_size
, <=, q
->bq_maxsize
);
102 obj2node(q
, data
)->bqn_size
= item_size
;
103 q
->bq_enqueuing_size
+= item_size
;
104 list_insert_tail(&q
->bq_enqueuing_list
, data
);
107 q
->bq_enqueuing_size
>= q
->bq_maxsize
/ q
->bq_fill_fraction
) {
108 /* Append the enquing list to the shared list. */
109 mutex_enter(&q
->bq_lock
);
110 while (q
->bq_size
> q
->bq_maxsize
) {
111 cv_wait_sig(&q
->bq_add_cv
, &q
->bq_lock
);
113 q
->bq_size
+= q
->bq_enqueuing_size
;
114 list_move_tail(&q
->bq_list
, &q
->bq_enqueuing_list
);
115 q
->bq_enqueuing_size
= 0;
116 cv_broadcast(&q
->bq_pop_cv
);
117 mutex_exit(&q
->bq_lock
);
122 * Add data to q, consuming size units of capacity. If there is insufficient
123 * capacity to consume size units, block until capacity exists. Asserts size is
127 bqueue_enqueue(bqueue_t
*q
, void *data
, size_t item_size
)
129 bqueue_enqueue_impl(q
, data
, item_size
, B_FALSE
);
133 * Enqueue an entry, and then flush the queue. This forces the popping threads
134 * to wake up, even if we're below the fill fraction. We have this in a single
135 * function, rather than having a separate call, because it prevents race
136 * conditions between the enqueuing thread and the dequeuing thread, where the
137 * enqueueing thread will wake up the dequeuing thread, that thread will
138 * destroy the condvar before the enqueuing thread is done.
141 bqueue_enqueue_flush(bqueue_t
*q
, void *data
, size_t item_size
)
143 bqueue_enqueue_impl(q
, data
, item_size
, B_TRUE
);
147 * Take the first element off of q. If there are no elements on the queue, wait
148 * until one is put there. Return the removed element.
151 bqueue_dequeue(bqueue_t
*q
)
153 void *ret
= list_remove_head(&q
->bq_dequeuing_list
);
156 * Dequeuing list is empty. Wait for there to be something on
157 * the shared list, then move the entire shared list to the
160 mutex_enter(&q
->bq_lock
);
161 while (q
->bq_size
== 0) {
162 cv_wait_sig(&q
->bq_pop_cv
, &q
->bq_lock
);
164 ASSERT0(q
->bq_dequeuing_size
);
165 ASSERT(list_is_empty(&q
->bq_dequeuing_list
));
166 list_move_tail(&q
->bq_dequeuing_list
, &q
->bq_list
);
167 q
->bq_dequeuing_size
= q
->bq_size
;
169 cv_broadcast(&q
->bq_add_cv
);
170 mutex_exit(&q
->bq_lock
);
171 ret
= list_remove_head(&q
->bq_dequeuing_list
);
173 q
->bq_dequeuing_size
-= obj2node(q
, ret
)->bqn_size
;