add changelog for 1.13
[beanstalkd.git] / dat.h
blob94027545814043e85fb43d9c63845d6c814b6969
1 #include <stdint.h>
2 #include <stdlib.h>
4 typedef unsigned char uchar;
5 typedef uchar byte;
6 typedef unsigned int uint;
7 typedef int32_t int32;
8 typedef uint32_t uint32;
9 typedef int64_t int64;
10 typedef uint64_t uint64;
12 typedef struct Ms Ms;
13 typedef struct Job Job;
14 typedef struct Tube Tube;
15 typedef struct Conn Conn;
16 typedef struct Heap Heap;
17 typedef struct Jobrec Jobrec;
18 typedef struct File File;
19 typedef struct Socket Socket;
20 typedef struct Server Server;
21 typedef struct Wal Wal;
23 typedef void(*Handle)(void*, int rw);
24 typedef int(FAlloc)(int, int);
27 // NUM_PRIMES is used in the jobs hashing.
28 #if _LP64
29 #define NUM_PRIMES 48
30 #else
31 #define NUM_PRIMES 19
32 #endif
34 /* Some compilers (e.g. gcc on SmartOS) define NULL as 0.
35 * This is allowed by the C standard, but is unhelpful when
36 * using NULL in most pointer contexts with errors turned on. */
37 #if (defined(sun) || defined(__sun)) && (defined(__SVR4) || defined(__svr4__))
38 #ifdef NULL
39 #undef NULL
40 #endif
41 #define NULL ((void*)0)
42 #endif
44 // The name of a tube cannot be longer than MAX_TUBE_NAME_LEN-1
45 #define MAX_TUBE_NAME_LEN 201
47 // A command can be at most LINE_BUF_SIZE chars, including "\r\n". This value
48 // MUST be enough to hold the longest possible command ("pause-tube a{200} 4294967295\r\n")
49 // or reply line ("USING a{200}\r\n").
50 #define LINE_BUF_SIZE (11 + MAX_TUBE_NAME_LEN + 12)
52 #define min(a,b) ((a)<(b)?(a):(b))
54 // Jobs with priority less than URGENT_THRESHOLD are counted as urgent.
55 #define URGENT_THRESHOLD 1024
57 // The default maximum job size.
58 #define JOB_DATA_SIZE_LIMIT_DEFAULT ((1 << 16) - 1)
60 // The maximum value that job_data_size_limit can be set to via "-z".
61 // It could be up to INT32_MAX-2 (~2GB), but set it to 1024^3 (1GB).
62 // The width is restricted by Jobrec.body_size that is int32.
63 #define JOB_DATA_SIZE_LIMIT_MAX 1073741824
65 // The default value for the fsync (-f) parameter, milliseconds.
66 #define DEFAULT_FSYNC_MS 50
68 // Use this macro to designate unused parameters in functions.
69 #define UNUSED_PARAMETER(x) (void)(x)
71 // version is defined in vers.c, see vers.sh for details.
72 extern const char version[];
74 // verbose holds the count of -V parameters; it's a verbosity level.
75 extern int verbose;
77 extern struct Server srv;
79 // Replaced by tests to simulate failures.
80 extern FAlloc *falloc;
82 // stats structure holds counters for operations, both globally and per tube.
83 struct stats {
84 uint64 urgent_ct;
85 uint64 waiting_ct;
86 uint64 buried_ct;
87 uint64 reserved_ct;
88 uint64 pause_ct;
89 uint64 total_delete_ct;
90 uint64 total_jobs_ct;
94 // less_fn is used by the binary heap to determine the order of elements.
95 typedef int(*less_fn)(void*, void*);
97 // setpos_fn is used by the binary heap to record the new positions of elements
98 // whenever they get moved or inserted.
99 typedef void(*setpos_fn)(void*, size_t);
101 struct Heap {
102 size_t cap; // capacity of the heap
103 size_t len; // amount of elements in the heap
104 void **data; // actual elements
106 less_fn less;
107 setpos_fn setpos;
109 int heapinsert(Heap *h, void *x);
110 void* heapremove(Heap *h, size_t k);
113 struct Socket {
114 // Descriptor for the socket.
115 int fd;
117 // f can point to srvaccept or prothandle.
118 Handle f;
120 // x is passed as first parameter to f.
121 void *x;
123 // added value is platform dependend: on OSX it can be > 1.
124 // Value of 1 - socket was already added to event notifications,
125 // otherwise it is 0.
126 int added;
129 int sockinit(void);
131 // sockwant updates event filter for the socket s. rw designates
132 // the kind of event we should be notified about:
133 // 'r' - read
134 // 'w' - write
135 // 'h' - hangup (closed connection)
136 // 0 - ignore this socket
137 int sockwant(Socket *s, int rw);
139 // socknext waits for the next event at most timeout nanoseconds.
140 // If event happens before timeout then s points to the corresponding socket,
141 // and the kind of event is returned. In case of timeout, 0 is returned.
142 int socknext(Socket **s, int64 timeout);
145 // ms_event_fn is called with the element being inserted/removed and its position.
146 typedef void(*ms_event_fn)(Ms *a, void *item, size_t i);
148 // Resizable multiset
149 struct Ms {
150 size_t len; // amount of stored elements
151 size_t cap; // capacity
152 size_t last; // position of last taken element
153 void **items;
155 ms_event_fn oninsert; // called on insertion of an element
156 ms_event_fn onremove; // called on removal of an element
159 void ms_init(Ms *a, ms_event_fn oninsert, ms_event_fn onremove);
160 void ms_clear(Ms *a);
161 int ms_append(Ms *a, void *item);
162 int ms_remove(Ms *a, void *item);
163 int ms_contains(Ms *a, void *item);
164 void *ms_take(Ms *a);
167 enum // Jobrec.state
169 Invalid,
170 Ready,
171 Reserved,
172 Buried,
173 Delayed,
174 Copy
177 enum
179 Walver = 7
182 // If you modify Jobrec struct, you must increment Walver above.
184 // This workflow is expected:
185 // 1. If any change needs to be made to the format, first increment Walver.
186 // 2. If and only if this is the first such change since the last release:
187 // a. Copy-paste relevant file-reading functions in file.c and
188 // add the old version number to their names. For example,
189 // if you are incrementing Walver from 7 to 8, copy readrec to readrec7.
190 // (Currently, there is only one such function, readrec. But if
191 // a future readrec calls other version-specific functions,
192 // those will have to be copied too.)
193 // 3. Add a switch case to fileread for the old version.
194 // 4. Modify the current reading function (readrec) to reflect your change.
196 // Incrementing Walver for every change, even if not every version
197 // will be released, is helpful even if it "wastes" version numbers.
198 // It is a really easy thing to do and it means during development
199 // you won't have to worry about misinterpreting the contents of a binlog
200 // that you generated with a dev copy of beanstalkd.
202 struct Jobrec {
203 uint64 id;
204 uint32 pri;
205 int64 delay;
206 int64 ttr;
207 int32 body_size;
208 int64 created_at;
210 // deadline_at is a timestamp, in nsec, that points to:
211 // * time when job will become ready for delayed job,
212 // * time when TTR is about to expire for reserved job,
213 // * undefined otherwise.
214 int64 deadline_at;
216 uint32 reserve_ct;
217 uint32 timeout_ct;
218 uint32 release_ct;
219 uint32 bury_ct;
220 uint32 kick_ct;
221 byte state;
224 struct Job {
225 // persistent fields; these get written to the wal
226 Jobrec r;
228 // bookeeping fields; these are in-memory only
229 char pad[6];
230 Tube *tube;
231 Job *prev, *next; // linked list of jobs
232 Job *ht_next; // Next job in a hash table list
233 size_t heap_index; // where is this job in its current heap
234 File *file;
235 Job *fnext;
236 Job *fprev;
237 void *reserver;
238 int walresv;
239 int walused;
241 char *body; // written separately to the wal
244 struct Tube {
245 uint refs;
246 char name[MAX_TUBE_NAME_LEN];
247 Heap ready;
248 Heap delay;
249 Ms waiting_conns; // conns waiting for the job at this moment
250 struct stats stat;
251 uint using_ct;
252 uint watching_ct;
254 // pause is set to the duration of the current pause, otherwise 0, in nsec.
255 int64 pause;
257 // unpause_at is a timestamp when to unpause the tube, in nsec.
258 int64 unpause_at;
260 Job buried; // linked list header
264 // Prints warning message on stderr in the format:
265 // <progname>: FILE:LINE in FUNC: <fmt>: <errno_msg>
266 #define twarn(...) __twarn(__VA_ARGS__, "")
268 // Hack to quiet the compiler. When VA_ARGS in twarn() has one element,
269 // e.g. twarn("OOM"), its replaced with __twarn("OOM", ""),
270 // thus VA_ARGS is expanded to at least one element in warn().
271 #define __twarn(fmt, ...) \
272 warn("%s:%d in %s: " fmt "%s", __FILE__, __LINE__, __func__, __VA_ARGS__)
274 // Prints warning message on stderr in the format:
275 // <progname>: FILE:LINE in FUNC: <fmt>
276 #define twarnx(...) __twarnx(__VA_ARGS__, "")
278 // See __twarn macro.
279 #define __twarnx(fmt, ...) \
280 warnx("%s:%d in %s: " fmt "%s", __FILE__, __LINE__, __func__, __VA_ARGS__)
282 void warn(const char *fmt, ...) __attribute__((format(printf, 1, 2)));
283 void warnx(const char *fmt, ...) __attribute__((format(printf, 1, 2)));
284 char* fmtalloc(char *fmt, ...) __attribute__((format(printf, 1, 2)));
285 void* zalloc(int n);
286 #define new(T) zalloc(sizeof(T))
287 void optparse(Server*, char**);
289 extern const char *progname;
291 int64 nanoseconds(void);
292 int rawfalloc(int fd, int len);
294 // Take ID for a jobs from next_id and allocate and store the job.
295 #define make_job(pri,delay,ttr,body_size,tube) \
296 make_job_with_id(pri,delay,ttr,body_size,tube,0)
298 Job *allocate_job(int body_size);
299 Job *make_job_with_id(uint pri, int64 delay, int64 ttr,
300 int body_size, Tube *tube, uint64 id);
301 void job_free(Job *j);
303 /* Lookup a job by job ID */
304 Job *job_find(uint64 job_id);
306 /* the void* parameters are really job pointers */
307 void job_setpos(void *j, size_t pos);
308 int job_pri_less(void *ja, void *jb);
309 int job_delay_less(void *ja, void *jb);
311 Job *job_copy(Job *j);
313 const char * job_state(Job *j);
315 void job_list_reset(Job *head);
316 int job_list_is_empty(Job *head);
317 Job *job_list_remove(Job *j);
318 void job_list_insert(Job *head, Job *j);
320 /* for unit tests */
321 size_t get_all_jobs_used(void);
324 extern struct Ms tubes;
326 Tube *make_tube(const char *name);
327 void tube_dref(Tube *t);
328 void tube_iref(Tube *t);
329 Tube *tube_find(Ms *tubeset, const char *name);
330 Tube *tube_find_or_make(const char *name);
331 #define TUBE_ASSIGN(a,b) (tube_dref(a), (a) = (b), tube_iref(a))
334 Conn *make_conn(int fd, char start_state, Tube *use, Tube *watch);
336 int count_cur_conns(void);
337 uint count_tot_conns(void);
338 int count_cur_producers(void);
339 int count_cur_workers(void);
342 extern size_t primes[];
345 extern size_t job_data_size_limit;
347 void prot_init(void);
348 int64 prottick(Server *s);
350 void remove_waiting_conn(Conn *c);
352 void enqueue_reserved_jobs(Conn *c);
354 void enter_drain_mode(int sig);
355 void h_accept(const int fd, const short which, Server *s);
356 int prot_replay(Server *s, Job *list);
359 int make_server_socket(char *host, char *port);
362 // CONN_TYPE_* are bit masks used to track the type of connection.
363 // A put command adds the PRODUCER type, "reserve*" adds the WORKER type.
364 // If connection awaits for data, then it has WAITING type.
365 #define CONN_TYPE_PRODUCER 1
366 #define CONN_TYPE_WORKER 2
367 #define CONN_TYPE_WAITING 4
369 struct Conn {
370 Server *srv;
371 Socket sock;
372 char state; // see the STATE_* description
373 char type; // combination of CONN_TYPE_* values
374 Conn *next; // only used in epollq functions
375 Tube *use; // tube currently in use
376 int64 tickat; // time at which to do more work; determines pos in heap
377 size_t tickpos; // position in srv->conns, stale when in_conns=0
378 byte in_conns; // 1 if the conn is in srv->conns heap, 0 otherwise
379 Job *soonest_job;// memoization of the soonest job
380 int rw; // currently want: 'r', 'w', or 'h'
382 // How long client should "wait" for the next job; -1 means forever.
383 int pending_timeout;
385 // Used to inform state machine that client no longer waits for the data.
386 char halfclosed;
388 char cmd[LINE_BUF_SIZE]; // this string is NOT NUL-terminated
389 size_t cmd_len;
390 int cmd_read;
392 char *reply;
393 int reply_len;
394 int reply_sent;
395 char reply_buf[LINE_BUF_SIZE]; // this string IS NUL-terminated
397 // How many bytes of in_job->body have been read so far. If in_job is NULL
398 // while in_job_read is nonzero, we are in bit bucket mode and
399 // in_job_read's meaning is inverted -- then it counts the bytes that
400 // remain to be thrown away.
401 int64 in_job_read;
402 Job *in_job; // a job to be read from the client
404 Job *out_job; // a job to be sent to the client
405 int out_job_sent; // how many bytes of *out_job were sent already
407 Ms watch; // the set of watched tubes by the connection
408 Job reserved_jobs; // linked list header
410 int conn_less(void *ca, void *cb);
411 void conn_setpos(void *c, size_t i);
412 void connsched(Conn *c);
413 void connclose(Conn *c);
414 void connsetproducer(Conn *c);
415 void connsetworker(Conn *c);
416 Job *connsoonestjob(Conn *c);
417 int conndeadlinesoon(Conn *c);
418 int conn_ready(Conn *c);
419 void conn_reserve_job(Conn *c, Job *j);
420 #define conn_waiting(c) ((c)->type & CONN_TYPE_WAITING)
425 enum
427 Filesizedef = (10 << 20)
430 struct Wal {
431 int filesize;
432 int use;
433 char *dir;
434 File *head;
435 File *cur;
436 File *tail;
437 int nfile;
438 int next;
439 int64 resv; // bytes reserved
440 int64 alive; // bytes in use
441 int64 nmig; // migrations
442 int64 nrec; // records written ever
443 int wantsync; // do we sync to disk?
444 int64 syncrate; // how often we sync to disk, in nanoseconds
445 int64 lastsync;
447 int waldirlock(Wal*);
448 void walinit(Wal*, Job *list);
449 int walwrite(Wal*, Job*);
450 void walmaint(Wal*);
451 int walresvput(Wal*, Job*);
452 int walresvupdate(Wal*);
453 void walgc(Wal*);
456 struct File {
457 File *next;
458 uint refs;
459 int seq;
460 int iswopen; // is open for writing
461 int fd;
462 int free;
463 int resv;
464 char *path;
465 Wal *w;
467 Job jlist; // jobs written in this file
469 int fileinit(File*, Wal*, int);
470 Wal* fileadd(File*, Wal*);
471 void fileincref(File*);
472 void filedecref(File*);
473 void fileaddjob(File*, Job*);
474 void filermjob(File*, Job*);
475 int fileread(File*, Job *list);
476 void filewopen(File*);
477 void filewclose(File*);
478 int filewrjobshort(File*, Job*);
479 int filewrjobfull(File*, Job*);
482 #define Portdef "11300"
484 struct Server {
485 char *port;
486 char *addr;
487 char *user;
489 Wal wal;
490 Socket sock;
492 // Connections that must produce deadline or timeout, ordered by the time.
493 Heap conns;
495 void srv_acquire_wal(Server *s);
496 void srvserve(Server *s);
497 void srvaccept(Server *s, int ev);