2 * Copyright (C) 2012-2020 all contributors <cmogstored-public@yhbt.net>
3 * License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
5 * File descriptor-based memory allocation. We have a fixed slot of
6 * 128 bytes for every file descriptor. Once a file descriptor is
7 * allocated by the OS, we use mog_fd_init()/mog_fd_get() to reserve
8 * userspace memory for that FD. We release that memory by calling
9 * close(2) (via mog_close() wrapper) in mog_fd_put().
11 * mog_fd_get() is a simple offset lookup based on the file
12 * descriptor, so the "allocation" is simple.
14 * This memory is never returned to the kernel, but is bounded by
15 * the file descriptor limit (RLIMIT_NOFILE ("ulimit -n")) of the
16 * process. Allowing 20000 file descriptors will only use 2.5 MB
17 * of userspace memory.
19 * Any sane OS will try to keep file descriptors low and reuse
20 * low-numbered descriptors as they become available, reducing
21 * fragmentation from unused slots. We allocate aligned memory
22 * to 128 bytes (matching slot size).
24 * 128-byte alignment and slot size are used since it:
25 * a) is enough to hold per-client data in common cases without malloc()
26 * b) easy to align with cache line sizes of modern (200x-201x) CPUs,
27 * avoiding unnecessary cache flushing
29 * This 128-byte alignment will need to be expanded to 256 bytes when
30 * 128-bit general purpose CPUs become available.
32 #include "cmogstored.h"
33 #define FD_PAD_SIZE ((size_t)128)
34 verify(sizeof(struct mog_fd
) <= FD_PAD_SIZE
);
36 static size_t fd_heaps
;
37 static const size_t FD_PER_HEAP
= 256;
38 static unsigned char **fd_map
;
39 static pthread_mutex_t fd_lock
= PTHREAD_MUTEX_INITIALIZER
;
40 size_t mog_nr_active_at_quit
;
42 static inline struct mog_fd
*aref(size_t fd
)
44 unsigned char *base
= fd_map
[fd
/ FD_PER_HEAP
];
46 return (struct mog_fd
*)(base
+ (fd
% FD_PER_HEAP
) * FD_PAD_SIZE
);
49 /* only for pedantic correctness, only one thread is running here */
50 static void destroy_spinlocks(void)
55 for (fd
= 0; fd
< max_fd
; fd
++) {
57 CHECK(int, 0, pthread_spin_destroy(&mfd
->expiring
));
61 static void fd_map_atexit(void)
65 while (fd_heaps
-- > 0)
66 free(fd_map
[fd_heaps
]);
70 static void fd_map_init(void)
72 long open_max
= sysconf(_SC_OPEN_MAX
);
73 size_t slots
= open_max
/ FD_PER_HEAP
+ 1;
74 size_t size
= slots
* sizeof(void *);
76 assert(fd_map
== NULL
&& "fd_map reinitialized?");
77 fd_map
= mog_cachealign(size
);
78 atexit(fd_map_atexit
);
81 MOG_NOINLINE
static struct mog_fd
* grow_ref(size_t fd
)
85 assert(fd
< INT_MAX
&& "fd too large");
86 CHECK(int, 0, pthread_mutex_lock(&fd_lock
));
88 if (!fd_map
) fd_map_init();
89 while (fd
>= (size_t)(fd_max
= mog_sync_fetch(&max_fd
))) {
90 unsigned char *base
= mog_cachealign(FD_PAD_SIZE
* FD_PER_HEAP
);
95 for (i
= 0; i
< FD_PER_HEAP
; i
++) {
96 tmp
= (struct mog_fd
*)(base
+ (i
* FD_PAD_SIZE
));
97 tmp
->fd_type
= MOG_FD_TYPE_UNUSED
;
99 rc
= pthread_spin_init(&tmp
->expiring
, 0);
101 die_errno("pthread_spin_init() failed");
102 tmp
->fd
= fd_max
+ i
;
105 fd_map
[fd_heaps
++] = base
;
106 (void)mog_sync_add_and_fetch(&max_fd
, FD_PER_HEAP
);
109 CHECK(int, 0, pthread_mutex_unlock(&fd_lock
));
115 * Look up a mog_fd structure based on fd. This means memory is reused
116 * by us just as FDs are reused by the kernel.
118 static struct mog_fd
*mog_fd_get(int fd
)
120 assert(fd
>= 0 && "FD is negative");
121 if (MOG_LIKELY(fd
< mog_sync_fetch(&max_fd
)))
122 return aref((size_t)fd
);
127 static inline bool mfd_expiring_trylock(struct mog_fd
*mfd
)
129 int rc
= pthread_spin_trylock(&mfd
->expiring
);
131 if (MOG_LIKELY(rc
== 0))
133 assert(rc
== EBUSY
&& "pthread_spin_trylock error");
137 static inline void mfd_expiring_lock(struct mog_fd
*mfd
)
139 CHECK(int, 0, pthread_spin_lock(&mfd
->expiring
));
142 static inline void mfd_expiring_unlock(struct mog_fd
*mfd
)
144 CHECK(int, 0, pthread_spin_unlock(&mfd
->expiring
));
148 * Releases the memory used by mfd and releases the file descriptor
149 * back to the OS. mfd is unusable after this.
151 void mog_fd_put(struct mog_fd
*mfd
)
155 assert(fd
>= 0 && "FD is negative");
156 assert(fd
< mog_sync_fetch(&max_fd
) && "FD too small");
157 assert(aref(fd
) == mfd
&& "tried to put incorrect mog_fd back in");
159 mfd_expiring_lock(mfd
);
160 mfd
->fd_type
= MOG_FD_TYPE_UNUSED
;
161 mfd_expiring_unlock(mfd
);
163 /* mog_fd_get(fd) may be called here in another thread */
166 /* called during shutdown, no other threads are running when this is called */
167 void mog_fdmap_requeue(struct mog_queue
*quit_queue
)
172 for (fd
= max_fd
- 1; fd
>= 0; fd
--) {
174 switch (mfd
->fd_type
) {
175 case MOG_FD_TYPE_MGMT
:
176 /* ignore fsck priority in shutdown: */
177 mfd
->as
.mgmt
.prio
= MOG_PRIO_NONE
;
179 case MOG_FD_TYPE_HTTP
:
180 case MOG_FD_TYPE_HTTPGET
:
181 mog_activeq_add(quit_queue
, mfd
);
182 mog_nr_active_at_quit
++;
189 struct mog_fd
* mog_fd_init(int fd
, enum mog_fd_type fd_type
)
191 struct mog_fd
*mfd
= mog_fd_get(fd
);
193 assert(mfd
->fd
== fd
&& "mfd->fd incorrect");
194 mfd_expiring_lock(mfd
);
195 mfd
->fd_type
= fd_type
;
196 mfd
->ioq_blocked
= 0;
197 mfd_expiring_unlock(mfd
);
203 /* ugh, FreeBSD implements TCP_INFO but doesn't expose the fields we need */
204 size_t mog_fdmap_expire(uint32_t sec
)
208 #else /* Linux TCP_INFO tracks last_data_{sent,recv} */
210 static bool tcp_timedout(struct tcp_info
*info
, uint32_t msec
)
212 bool send_timedout
= !!(info
->tcpi_last_data_sent
> msec
);
215 * tcpi_last_data_recv is not valid unless
216 * tcpi_ato (ACK timeout) is set
218 if (info
->tcpi_ato
== 0)
219 return send_timedout
&& (info
->tcpi_last_ack_recv
> msec
);
221 return send_timedout
&& (info
->tcpi_last_data_recv
> msec
);
224 static size_t expire_http(struct mog_fd
*mfd
, uint32_t msec
)
226 struct tcp_info info
;
227 socklen_t len
= (socklen_t
)sizeof(struct tcp_info
);
229 if (getsockopt(mfd
->fd
, IPPROTO_TCP
, TCP_INFO
, &info
, &len
) == 0) {
230 if (info
.tcpi_state
== TCP_ESTABLISHED
&&
231 tcp_timedout(&info
, msec
)) {
232 if (shutdown(mfd
->fd
, SHUT_RDWR
) == 0)
234 if (errno
!= ENOTCONN
)
236 "BUG? expire_http,shutdown: %m");
239 assert(errno
!= EINVAL
&& "BUG: getsockopt: EINVAL");
240 assert(errno
!= EFAULT
&& "BUG: getsockopt: EFAULT");
241 syslog(LOG_WARNING
, "BUG? expire_http,getsockopt: %m");
247 size_t mog_fdmap_expire(uint32_t sec
)
252 uint32_t msec
= sec
* 1000;
253 static time_t last_expire
;
255 int rc
= pthread_mutex_trylock(&fd_lock
);
258 assert(rc
== EBUSY
&& "pthread_mutex_trylock failed" && rc
);
260 /* sleep on the lock, another thread already doing work */
261 CHECK(int, 0, pthread_mutex_lock(&fd_lock
));
262 CHECK(int, 0, pthread_mutex_unlock(&fd_lock
));
267 if (now
== last_expire
)
270 /* skip stdin, stdout, stderr */
271 for (fd
= 3; fd
< max_fd
; fd
++) {
274 /* bail if another thread just locked it (for close) */
275 if (mfd_expiring_trylock(mfd
)) {
276 switch (mfd
->fd_type
) {
277 case MOG_FD_TYPE_HTTP
:
278 case MOG_FD_TYPE_HTTPGET
:
279 expired
+= expire_http(mfd
, msec
);
281 mfd_expiring_unlock(mfd
);
288 if (expired
> 0 || last_expire
!= now
)
289 syslog(LOG_NOTICE
, "expired %llu idle connections (>%u sec)",
290 (unsigned long long)expired
, (unsigned)sec
);
294 CHECK(int, 0, pthread_mutex_unlock(&fd_lock
));
298 * 1) wake up from epoll_wait()
299 * 2) attempt to read/write
303 for (fd
= (int)expired
* 8; --fd
>= 0; )
308 #endif /* Linux-only */