1 // SPDX-License-Identifier: GPL-2.0
2 /* Watch queue and general notification mechanism, built on pipes
4 * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved.
5 * Written by David Howells (dhowells@redhat.com)
7 * See Documentation/watch_queue.rst
10 #define pr_fmt(fmt) "watchq: " fmt
11 #include <linux/module.h>
12 #include <linux/init.h>
13 #include <linux/sched.h>
14 #include <linux/slab.h>
15 #include <linux/printk.h>
16 #include <linux/miscdevice.h>
19 #include <linux/pagemap.h>
20 #include <linux/poll.h>
21 #include <linux/uaccess.h>
22 #include <linux/vmalloc.h>
23 #include <linux/file.h>
24 #include <linux/security.h>
25 #include <linux/cred.h>
26 #include <linux/sched/signal.h>
27 #include <linux/watch_queue.h>
28 #include <linux/pipe_fs_i.h>
30 MODULE_DESCRIPTION("Watch queue");
31 MODULE_AUTHOR("Red Hat, Inc.");
32 MODULE_LICENSE("GPL");
34 #define WATCH_QUEUE_NOTE_SIZE 128
35 #define WATCH_QUEUE_NOTES_PER_PAGE (PAGE_SIZE / WATCH_QUEUE_NOTE_SIZE)
37 static void watch_queue_pipe_buf_release(struct pipe_inode_info
*pipe
,
38 struct pipe_buffer
*buf
)
40 struct watch_queue
*wqueue
= (struct watch_queue
*)buf
->private;
44 /* We need to work out which note within the page this refers to, but
45 * the note might have been maximum size, so merely ANDing the offset
46 * off doesn't work. OTOH, the note must've been more than zero size.
48 bit
= buf
->offset
+ buf
->len
;
49 if ((bit
& (WATCH_QUEUE_NOTE_SIZE
- 1)) == 0)
50 bit
-= WATCH_QUEUE_NOTE_SIZE
;
51 bit
/= WATCH_QUEUE_NOTE_SIZE
;
56 set_bit(bit
, wqueue
->notes_bitmap
);
59 // No try_steal function => no stealing
60 #define watch_queue_pipe_buf_try_steal NULL
62 /* New data written to a pipe may be appended to a buffer with this type. */
63 static const struct pipe_buf_operations watch_queue_pipe_buf_ops
= {
64 .release
= watch_queue_pipe_buf_release
,
65 .try_steal
= watch_queue_pipe_buf_try_steal
,
66 .get
= generic_pipe_buf_get
,
70 * Post a notification to a watch queue.
72 static bool post_one_notification(struct watch_queue
*wqueue
,
73 struct watch_notification
*n
)
76 struct pipe_inode_info
*pipe
= wqueue
->pipe
;
77 struct pipe_buffer
*buf
;
79 unsigned int head
, tail
, mask
, note
, offset
, len
;
85 spin_lock_irq(&pipe
->rd_wait
.lock
);
90 mask
= pipe
->ring_size
- 1;
93 if (pipe_full(head
, tail
, pipe
->ring_size
))
96 note
= find_first_bit(wqueue
->notes_bitmap
, wqueue
->nr_notes
);
97 if (note
>= wqueue
->nr_notes
)
100 page
= wqueue
->notes
[note
/ WATCH_QUEUE_NOTES_PER_PAGE
];
101 offset
= note
% WATCH_QUEUE_NOTES_PER_PAGE
* WATCH_QUEUE_NOTE_SIZE
;
103 len
= n
->info
& WATCH_INFO_LENGTH
;
104 p
= kmap_atomic(page
);
105 memcpy(p
+ offset
, n
, len
);
108 buf
= &pipe
->bufs
[head
& mask
];
110 buf
->private = (unsigned long)wqueue
;
111 buf
->ops
= &watch_queue_pipe_buf_ops
;
112 buf
->offset
= offset
;
114 buf
->flags
= PIPE_BUF_FLAG_WHOLE
;
115 pipe
->head
= head
+ 1;
117 if (!test_and_clear_bit(note
, wqueue
->notes_bitmap
)) {
118 spin_unlock_irq(&pipe
->rd_wait
.lock
);
121 wake_up_interruptible_sync_poll_locked(&pipe
->rd_wait
, EPOLLIN
| EPOLLRDNORM
);
125 spin_unlock_irq(&pipe
->rd_wait
.lock
);
127 kill_fasync(&pipe
->fasync_readers
, SIGIO
, POLL_IN
);
131 buf
= &pipe
->bufs
[(head
- 1) & mask
];
132 buf
->flags
|= PIPE_BUF_FLAG_LOSS
;
137 * Apply filter rules to a notification.
139 static bool filter_watch_notification(const struct watch_filter
*wf
,
140 const struct watch_notification
*n
)
142 const struct watch_type_filter
*wt
;
143 unsigned int st_bits
= sizeof(wt
->subtype_filter
[0]) * 8;
144 unsigned int st_index
= n
->subtype
/ st_bits
;
145 unsigned int st_bit
= 1U << (n
->subtype
% st_bits
);
148 if (!test_bit(n
->type
, wf
->type_filter
))
151 for (i
= 0; i
< wf
->nr_filters
; i
++) {
152 wt
= &wf
->filters
[i
];
153 if (n
->type
== wt
->type
&&
154 (wt
->subtype_filter
[st_index
] & st_bit
) &&
155 (n
->info
& wt
->info_mask
) == wt
->info_filter
)
159 return false; /* If there is a filter, the default is to reject. */
163 * __post_watch_notification - Post an event notification
164 * @wlist: The watch list to post the event to.
165 * @n: The notification record to post.
166 * @cred: The creds of the process that triggered the notification.
167 * @id: The ID to match on the watch.
169 * Post a notification of an event into a set of watch queues and let the users
172 * The size of the notification should be set in n->info & WATCH_INFO_LENGTH and
173 * should be in units of sizeof(*n).
175 void __post_watch_notification(struct watch_list
*wlist
,
176 struct watch_notification
*n
,
177 const struct cred
*cred
,
180 const struct watch_filter
*wf
;
181 struct watch_queue
*wqueue
;
184 if (((n
->info
& WATCH_INFO_LENGTH
) >> WATCH_INFO_LENGTH__SHIFT
) == 0) {
191 hlist_for_each_entry_rcu(watch
, &wlist
->watchers
, list_node
) {
194 n
->info
&= ~WATCH_INFO_ID
;
195 n
->info
|= watch
->info_id
;
197 wqueue
= rcu_dereference(watch
->queue
);
198 wf
= rcu_dereference(wqueue
->filter
);
199 if (wf
&& !filter_watch_notification(wf
, n
))
202 if (security_post_notification(watch
->cred
, cred
, n
) < 0)
205 post_one_notification(wqueue
, n
);
210 EXPORT_SYMBOL(__post_watch_notification
);
213 * Allocate sufficient pages to preallocation for the requested number of
216 long watch_queue_set_size(struct pipe_inode_info
*pipe
, unsigned int nr_notes
)
218 struct watch_queue
*wqueue
= pipe
->watch_queue
;
220 unsigned long *bitmap
;
221 unsigned long user_bufs
;
223 int ret
, i
, nr_pages
;
231 nr_notes
> 512) /* TODO: choose a better hard limit */
234 nr_pages
= (nr_notes
+ WATCH_QUEUE_NOTES_PER_PAGE
- 1);
235 nr_pages
/= WATCH_QUEUE_NOTES_PER_PAGE
;
236 user_bufs
= account_pipe_buffers(pipe
->user
, pipe
->nr_accounted
, nr_pages
);
238 if (nr_pages
> pipe
->max_usage
&&
239 (too_many_pipe_buffers_hard(user_bufs
) ||
240 too_many_pipe_buffers_soft(user_bufs
)) &&
241 pipe_is_unprivileged_user()) {
246 ret
= pipe_resize_ring(pipe
, nr_notes
);
250 pages
= kcalloc(sizeof(struct page
*), nr_pages
, GFP_KERNEL
);
254 for (i
= 0; i
< nr_pages
; i
++) {
255 pages
[i
] = alloc_page(GFP_KERNEL
);
258 pages
[i
]->index
= i
* WATCH_QUEUE_NOTES_PER_PAGE
;
261 bmsize
= (nr_notes
+ BITS_PER_LONG
- 1) / BITS_PER_LONG
;
262 bmsize
*= sizeof(unsigned long);
263 bitmap
= kmalloc(bmsize
, GFP_KERNEL
);
267 memset(bitmap
, 0xff, bmsize
);
268 wqueue
->notes
= pages
;
269 wqueue
->notes_bitmap
= bitmap
;
270 wqueue
->nr_pages
= nr_pages
;
271 wqueue
->nr_notes
= nr_pages
* WATCH_QUEUE_NOTES_PER_PAGE
;
275 for (i
= 0; i
< nr_pages
; i
++)
276 __free_page(pages
[i
]);
279 (void) account_pipe_buffers(pipe
->user
, nr_pages
, pipe
->nr_accounted
);
284 * Set the filter on a watch queue.
286 long watch_queue_set_filter(struct pipe_inode_info
*pipe
,
287 struct watch_notification_filter __user
*_filter
)
289 struct watch_notification_type_filter
*tf
;
290 struct watch_notification_filter filter
;
291 struct watch_type_filter
*q
;
292 struct watch_filter
*wfilter
;
293 struct watch_queue
*wqueue
= pipe
->watch_queue
;
294 int ret
, nr_filter
= 0, i
;
300 /* Remove the old filter */
305 /* Grab the user's filter specification */
306 if (copy_from_user(&filter
, _filter
, sizeof(filter
)) != 0)
308 if (filter
.nr_filters
== 0 ||
309 filter
.nr_filters
> 16 ||
310 filter
.__reserved
!= 0)
313 tf
= memdup_user(_filter
->filters
, filter
.nr_filters
* sizeof(*tf
));
318 for (i
= 0; i
< filter
.nr_filters
; i
++) {
319 if ((tf
[i
].info_filter
& ~tf
[i
].info_mask
) ||
320 tf
[i
].info_mask
& WATCH_INFO_LENGTH
)
322 /* Ignore any unknown types */
323 if (tf
[i
].type
>= sizeof(wfilter
->type_filter
) * 8)
328 /* Now we need to build the internal filter from only the relevant
329 * user-specified filters.
332 wfilter
= kzalloc(struct_size(wfilter
, filters
, nr_filter
), GFP_KERNEL
);
335 wfilter
->nr_filters
= nr_filter
;
337 q
= wfilter
->filters
;
338 for (i
= 0; i
< filter
.nr_filters
; i
++) {
339 if (tf
[i
].type
>= sizeof(wfilter
->type_filter
) * BITS_PER_LONG
)
342 q
->type
= tf
[i
].type
;
343 q
->info_filter
= tf
[i
].info_filter
;
344 q
->info_mask
= tf
[i
].info_mask
;
345 q
->subtype_filter
[0] = tf
[i
].subtype_filter
[0];
346 __set_bit(q
->type
, wfilter
->type_filter
);
353 wfilter
= rcu_replace_pointer(wqueue
->filter
, wfilter
,
354 lockdep_is_held(&pipe
->mutex
));
357 kfree_rcu(wfilter
, rcu
);
365 static void __put_watch_queue(struct kref
*kref
)
367 struct watch_queue
*wqueue
=
368 container_of(kref
, struct watch_queue
, usage
);
369 struct watch_filter
*wfilter
;
372 for (i
= 0; i
< wqueue
->nr_pages
; i
++)
373 __free_page(wqueue
->notes
[i
]);
375 wfilter
= rcu_access_pointer(wqueue
->filter
);
377 kfree_rcu(wfilter
, rcu
);
378 kfree_rcu(wqueue
, rcu
);
382 * put_watch_queue - Dispose of a ref on a watchqueue.
383 * @wqueue: The watch queue to unref.
385 void put_watch_queue(struct watch_queue
*wqueue
)
387 kref_put(&wqueue
->usage
, __put_watch_queue
);
389 EXPORT_SYMBOL(put_watch_queue
);
391 static void free_watch(struct rcu_head
*rcu
)
393 struct watch
*watch
= container_of(rcu
, struct watch
, rcu
);
395 put_watch_queue(rcu_access_pointer(watch
->queue
));
396 atomic_dec(&watch
->cred
->user
->nr_watches
);
397 put_cred(watch
->cred
);
400 static void __put_watch(struct kref
*kref
)
402 struct watch
*watch
= container_of(kref
, struct watch
, usage
);
404 call_rcu(&watch
->rcu
, free_watch
);
410 static void put_watch(struct watch
*watch
)
412 kref_put(&watch
->usage
, __put_watch
);
416 * init_watch_queue - Initialise a watch
417 * @watch: The watch to initialise.
418 * @wqueue: The queue to assign.
420 * Initialise a watch and set the watch queue.
422 void init_watch(struct watch
*watch
, struct watch_queue
*wqueue
)
424 kref_init(&watch
->usage
);
425 INIT_HLIST_NODE(&watch
->list_node
);
426 INIT_HLIST_NODE(&watch
->queue_node
);
427 rcu_assign_pointer(watch
->queue
, wqueue
);
431 * add_watch_to_object - Add a watch on an object to a watch list
432 * @watch: The watch to add
433 * @wlist: The watch list to add to
435 * @watch->queue must have been set to point to the queue to post notifications
436 * to and the watch list of the object to be watched. @watch->cred must also
437 * have been set to the appropriate credentials and a ref taken on them.
439 * The caller must pin the queue and the list both and must hold the list
440 * locked against racing watch additions/removals.
442 int add_watch_to_object(struct watch
*watch
, struct watch_list
*wlist
)
444 struct watch_queue
*wqueue
= rcu_access_pointer(watch
->queue
);
447 hlist_for_each_entry(w
, &wlist
->watchers
, list_node
) {
448 struct watch_queue
*wq
= rcu_access_pointer(w
->queue
);
449 if (wqueue
== wq
&& watch
->id
== w
->id
)
453 watch
->cred
= get_current_cred();
454 rcu_assign_pointer(watch
->watch_list
, wlist
);
456 if (atomic_inc_return(&watch
->cred
->user
->nr_watches
) >
457 task_rlimit(current
, RLIMIT_NOFILE
)) {
458 atomic_dec(&watch
->cred
->user
->nr_watches
);
459 put_cred(watch
->cred
);
463 spin_lock_bh(&wqueue
->lock
);
464 kref_get(&wqueue
->usage
);
465 kref_get(&watch
->usage
);
466 hlist_add_head(&watch
->queue_node
, &wqueue
->watches
);
467 spin_unlock_bh(&wqueue
->lock
);
469 hlist_add_head(&watch
->list_node
, &wlist
->watchers
);
472 EXPORT_SYMBOL(add_watch_to_object
);
475 * remove_watch_from_object - Remove a watch or all watches from an object.
476 * @wlist: The watch list to remove from
477 * @wq: The watch queue of interest (ignored if @all is true)
478 * @id: The ID of the watch to remove (ignored if @all is true)
479 * @all: True to remove all objects
481 * Remove a specific watch or all watches from an object. A notification is
482 * sent to the watcher to tell them that this happened.
484 int remove_watch_from_object(struct watch_list
*wlist
, struct watch_queue
*wq
,
487 struct watch_notification_removal n
;
488 struct watch_queue
*wqueue
;
495 spin_lock(&wlist
->lock
);
496 hlist_for_each_entry(watch
, &wlist
->watchers
, list_node
) {
498 (watch
->id
== id
&& rcu_access_pointer(watch
->queue
) == wq
))
501 spin_unlock(&wlist
->lock
);
506 hlist_del_init_rcu(&watch
->list_node
);
507 rcu_assign_pointer(watch
->watch_list
, NULL
);
508 spin_unlock(&wlist
->lock
);
510 /* We now own the reference on watch that used to belong to wlist. */
512 n
.watch
.type
= WATCH_TYPE_META
;
513 n
.watch
.subtype
= WATCH_META_REMOVAL_NOTIFICATION
;
514 n
.watch
.info
= watch
->info_id
| watch_sizeof(n
.watch
);
517 n
.watch
.info
= watch
->info_id
| watch_sizeof(n
);
519 wqueue
= rcu_dereference(watch
->queue
);
521 /* We don't need the watch list lock for the next bit as RCU is
522 * protecting *wqueue from deallocation.
525 post_one_notification(wqueue
, &n
.watch
);
527 spin_lock_bh(&wqueue
->lock
);
529 if (!hlist_unhashed(&watch
->queue_node
)) {
530 hlist_del_init_rcu(&watch
->queue_node
);
534 spin_unlock_bh(&wqueue
->lock
);
537 if (wlist
->release_watch
) {
538 void (*release_watch
)(struct watch
*);
540 release_watch
= wlist
->release_watch
;
542 (*release_watch
)(watch
);
547 if (all
&& !hlist_empty(&wlist
->watchers
))
553 EXPORT_SYMBOL(remove_watch_from_object
);
556 * Remove all the watches that are contributory to a queue. This has the
557 * potential to race with removal of the watches by the destruction of the
558 * objects being watched or with the distribution of notifications.
560 void watch_queue_clear(struct watch_queue
*wqueue
)
562 struct watch_list
*wlist
;
567 spin_lock_bh(&wqueue
->lock
);
569 /* Prevent new additions and prevent notifications from happening */
570 wqueue
->defunct
= true;
572 while (!hlist_empty(&wqueue
->watches
)) {
573 watch
= hlist_entry(wqueue
->watches
.first
, struct watch
, queue_node
);
574 hlist_del_init_rcu(&watch
->queue_node
);
575 /* We now own a ref on the watch. */
576 spin_unlock_bh(&wqueue
->lock
);
578 /* We can't do the next bit under the queue lock as we need to
579 * get the list lock - which would cause a deadlock if someone
580 * was removing from the opposite direction at the same time or
581 * posting a notification.
583 wlist
= rcu_dereference(watch
->watch_list
);
585 void (*release_watch
)(struct watch
*);
587 spin_lock(&wlist
->lock
);
589 release
= !hlist_unhashed(&watch
->list_node
);
591 hlist_del_init_rcu(&watch
->list_node
);
592 rcu_assign_pointer(watch
->watch_list
, NULL
);
594 /* We now own a second ref on the watch. */
597 release_watch
= wlist
->release_watch
;
598 spin_unlock(&wlist
->lock
);
603 /* This might need to call dput(), so
604 * we have to drop all the locks.
606 (*release_watch
)(watch
);
614 spin_lock_bh(&wqueue
->lock
);
617 spin_unlock_bh(&wqueue
->lock
);
622 * get_watch_queue - Get a watch queue from its file descriptor.
623 * @fd: The fd to query.
625 struct watch_queue
*get_watch_queue(int fd
)
627 struct pipe_inode_info
*pipe
;
628 struct watch_queue
*wqueue
= ERR_PTR(-EINVAL
);
633 pipe
= get_pipe_info(f
.file
, false);
634 if (pipe
&& pipe
->watch_queue
) {
635 wqueue
= pipe
->watch_queue
;
636 kref_get(&wqueue
->usage
);
643 EXPORT_SYMBOL(get_watch_queue
);
646 * Initialise a watch queue
648 int watch_queue_init(struct pipe_inode_info
*pipe
)
650 struct watch_queue
*wqueue
;
652 wqueue
= kzalloc(sizeof(*wqueue
), GFP_KERNEL
);
657 kref_init(&wqueue
->usage
);
658 spin_lock_init(&wqueue
->lock
);
659 INIT_HLIST_HEAD(&wqueue
->watches
);
661 pipe
->watch_queue
= wqueue
;