4 typedef unsigned char uchar
;
6 typedef unsigned int uint
;
8 typedef uint32_t uint32
;
10 typedef uint64_t uint64
;
13 typedef struct Job Job
;
14 typedef struct Tube Tube
;
15 typedef struct Conn Conn
;
16 typedef struct Heap Heap
;
17 typedef struct Jobrec Jobrec
;
18 typedef struct File File
;
19 typedef struct Socket Socket
;
20 typedef struct Server Server
;
21 typedef struct Wal Wal
;
23 typedef void(*Handle
)(void*, int rw
);
24 typedef int(FAlloc
)(int, int);
27 // NUM_PRIMES is used in the jobs hashing.
34 /* Some compilers (e.g. gcc on SmartOS) define NULL as 0.
35 * This is allowed by the C standard, but is unhelpful when
36 * using NULL in most pointer contexts with errors turned on. */
37 #if (defined(sun) || defined(__sun)) && (defined(__SVR4) || defined(__svr4__))
41 #define NULL ((void*)0)
44 // The name of a tube cannot be longer than MAX_TUBE_NAME_LEN-1
45 #define MAX_TUBE_NAME_LEN 201
47 // A command can be at most LINE_BUF_SIZE chars, including "\r\n". This value
48 // MUST be enough to hold the longest possible command ("pause-tube a{200} 4294967295\r\n")
49 // or reply line ("USING a{200}\r\n").
50 #define LINE_BUF_SIZE (11 + MAX_TUBE_NAME_LEN + 12)
52 #define min(a,b) ((a)<(b)?(a):(b))
54 // Jobs with priority less than URGENT_THRESHOLD are counted as urgent.
55 #define URGENT_THRESHOLD 1024
57 // The default maximum job size.
58 #define JOB_DATA_SIZE_LIMIT_DEFAULT ((1 << 16) - 1)
60 // The maximum value that job_data_size_limit can be set to via "-z".
61 // It could be up to INT32_MAX-2 (~2GB), but set it to 1024^3 (1GB).
62 // The width is restricted by Jobrec.body_size that is int32.
63 #define JOB_DATA_SIZE_LIMIT_MAX 1073741824
65 // The default value for the fsync (-f) parameter, milliseconds.
66 #define DEFAULT_FSYNC_MS 50
68 // Use this macro to designate unused parameters in functions.
69 #define UNUSED_PARAMETER(x) (void)(x)
71 // version is defined in vers.c, see vers.sh for details.
72 extern const char version
[];
74 // verbose holds the count of -V parameters; it's a verbosity level.
77 extern struct Server srv
;
79 // Replaced by tests to simulate failures.
80 extern FAlloc
*falloc
;
82 // stats structure holds counters for operations, both globally and per tube.
89 uint64 total_delete_ct
;
94 // less_fn is used by the binary heap to determine the order of elements.
95 typedef int(*less_fn
)(void*, void*);
97 // setpos_fn is used by the binary heap to record the new positions of elements
98 // whenever they get moved or inserted.
99 typedef void(*setpos_fn
)(void*, size_t);
102 size_t cap
; // capacity of the heap
103 size_t len
; // amount of elements in the heap
104 void **data
; // actual elements
109 int heapinsert(Heap
*h
, void *x
);
110 void* heapremove(Heap
*h
, size_t k
);
114 // Descriptor for the socket.
117 // f can point to srvaccept or prothandle.
120 // x is passed as first parameter to f.
123 // added value is platform dependend: on OSX it can be > 1.
124 // Value of 1 - socket was already added to event notifications,
125 // otherwise it is 0.
131 // sockwant updates event filter for the socket s. rw designates
132 // the kind of event we should be notified about:
135 // 'h' - hangup (closed connection)
136 // 0 - ignore this socket
137 int sockwant(Socket
*s
, int rw
);
139 // socknext waits for the next event at most timeout nanoseconds.
140 // If event happens before timeout then s points to the corresponding socket,
141 // and the kind of event is returned. In case of timeout, 0 is returned.
142 int socknext(Socket
**s
, int64 timeout
);
145 // ms_event_fn is called with the element being inserted/removed and its position.
146 typedef void(*ms_event_fn
)(Ms
*a
, void *item
, size_t i
);
148 // Resizable multiset
150 size_t len
; // amount of stored elements
151 size_t cap
; // capacity
152 size_t last
; // position of last taken element
155 ms_event_fn oninsert
; // called on insertion of an element
156 ms_event_fn onremove
; // called on removal of an element
159 void ms_init(Ms
*a
, ms_event_fn oninsert
, ms_event_fn onremove
);
160 void ms_clear(Ms
*a
);
161 int ms_append(Ms
*a
, void *item
);
162 int ms_remove(Ms
*a
, void *item
);
163 int ms_contains(Ms
*a
, void *item
);
164 void *ms_take(Ms
*a
);
182 // If you modify Jobrec struct, you must increment Walver above.
184 // This workflow is expected:
185 // 1. If any change needs to be made to the format, first increment Walver.
186 // 2. If and only if this is the first such change since the last release:
187 // a. Copy-paste relevant file-reading functions in file.c and
188 // add the old version number to their names. For example,
189 // if you are incrementing Walver from 7 to 8, copy readrec to readrec7.
190 // (Currently, there is only one such function, readrec. But if
191 // a future readrec calls other version-specific functions,
192 // those will have to be copied too.)
193 // 3. Add a switch case to fileread for the old version.
194 // 4. Modify the current reading function (readrec) to reflect your change.
196 // Incrementing Walver for every change, even if not every version
197 // will be released, is helpful even if it "wastes" version numbers.
198 // It is a really easy thing to do and it means during development
199 // you won't have to worry about misinterpreting the contents of a binlog
200 // that you generated with a dev copy of beanstalkd.
210 // deadline_at is a timestamp, in nsec, that points to:
211 // * time when job will become ready for delayed job,
212 // * time when TTR is about to expire for reserved job,
213 // * undefined otherwise.
225 // persistent fields; these get written to the wal
228 // bookeeping fields; these are in-memory only
231 Job
*prev
, *next
; // linked list of jobs
232 Job
*ht_next
; // Next job in a hash table list
233 size_t heap_index
; // where is this job in its current heap
241 char *body
; // written separately to the wal
246 char name
[MAX_TUBE_NAME_LEN
];
249 Ms waiting_conns
; // conns waiting for the job at this moment
254 // pause is set to the duration of the current pause, otherwise 0, in nsec.
257 // unpause_at is a timestamp when to unpause the tube, in nsec.
260 Job buried
; // linked list header
264 // Prints warning message on stderr in the format:
265 // <progname>: FILE:LINE in FUNC: <fmt>: <errno_msg>
266 #define twarn(...) __twarn(__VA_ARGS__, "")
268 // Hack to quiet the compiler. When VA_ARGS in twarn() has one element,
269 // e.g. twarn("OOM"), its replaced with __twarn("OOM", ""),
270 // thus VA_ARGS is expanded to at least one element in warn().
271 #define __twarn(fmt, ...) \
272 warn("%s:%d in %s: " fmt "%s", __FILE__, __LINE__, __func__, __VA_ARGS__)
274 // Prints warning message on stderr in the format:
275 // <progname>: FILE:LINE in FUNC: <fmt>
276 #define twarnx(...) __twarnx(__VA_ARGS__, "")
278 // See __twarn macro.
279 #define __twarnx(fmt, ...) \
280 warnx("%s:%d in %s: " fmt "%s", __FILE__, __LINE__, __func__, __VA_ARGS__)
282 void warn(const char *fmt
, ...) __attribute__((format(printf
, 1, 2)));
283 void warnx(const char *fmt
, ...) __attribute__((format(printf
, 1, 2)));
284 char* fmtalloc(char *fmt
, ...) __attribute__((format(printf
, 1, 2)));
286 #define new(T) zalloc(sizeof(T))
287 void optparse(Server
*, char**);
289 extern const char *progname
;
291 int64
nanoseconds(void);
292 int rawfalloc(int fd
, int len
);
294 // Take ID for a jobs from next_id and allocate and store the job.
295 #define make_job(pri,delay,ttr,body_size,tube) \
296 make_job_with_id(pri,delay,ttr,body_size,tube,0)
298 Job
*allocate_job(int body_size
);
299 Job
*make_job_with_id(uint pri
, int64 delay
, int64 ttr
,
300 int body_size
, Tube
*tube
, uint64 id
);
301 void job_free(Job
*j
);
303 /* Lookup a job by job ID */
304 Job
*job_find(uint64 job_id
);
306 /* the void* parameters are really job pointers */
307 void job_setpos(void *j
, size_t pos
);
308 int job_pri_less(void *ja
, void *jb
);
309 int job_delay_less(void *ja
, void *jb
);
311 Job
*job_copy(Job
*j
);
313 const char * job_state(Job
*j
);
315 void job_list_reset(Job
*head
);
316 int job_list_is_empty(Job
*head
);
317 Job
*job_list_remove(Job
*j
);
318 void job_list_insert(Job
*head
, Job
*j
);
321 size_t get_all_jobs_used(void);
324 extern struct Ms tubes
;
326 Tube
*make_tube(const char *name
);
327 void tube_dref(Tube
*t
);
328 void tube_iref(Tube
*t
);
329 Tube
*tube_find(Ms
*tubeset
, const char *name
);
330 Tube
*tube_find_or_make(const char *name
);
331 #define TUBE_ASSIGN(a,b) (tube_dref(a), (a) = (b), tube_iref(a))
334 Conn
*make_conn(int fd
, char start_state
, Tube
*use
, Tube
*watch
);
336 int count_cur_conns(void);
337 uint
count_tot_conns(void);
338 int count_cur_producers(void);
339 int count_cur_workers(void);
342 extern size_t primes
[];
345 extern size_t job_data_size_limit
;
347 void prot_init(void);
348 int64
prottick(Server
*s
);
350 void remove_waiting_conn(Conn
*c
);
352 void enqueue_reserved_jobs(Conn
*c
);
354 void enter_drain_mode(int sig
);
355 void h_accept(const int fd
, const short which
, Server
*s
);
356 int prot_replay(Server
*s
, Job
*list
);
359 int make_server_socket(char *host
, char *port
);
362 // CONN_TYPE_* are bit masks used to track the type of connection.
363 // A put command adds the PRODUCER type, "reserve*" adds the WORKER type.
364 // If connection awaits for data, then it has WAITING type.
365 #define CONN_TYPE_PRODUCER 1
366 #define CONN_TYPE_WORKER 2
367 #define CONN_TYPE_WAITING 4
372 char state
; // see the STATE_* description
373 char type
; // combination of CONN_TYPE_* values
374 Conn
*next
; // only used in epollq functions
375 Tube
*use
; // tube currently in use
376 int64 tickat
; // time at which to do more work; determines pos in heap
377 size_t tickpos
; // position in srv->conns, stale when in_conns=0
378 byte in_conns
; // 1 if the conn is in srv->conns heap, 0 otherwise
379 Job
*soonest_job
;// memoization of the soonest job
380 int rw
; // currently want: 'r', 'w', or 'h'
382 // How long client should "wait" for the next job; -1 means forever.
385 // Used to inform state machine that client no longer waits for the data.
388 char cmd
[LINE_BUF_SIZE
]; // this string is NOT NUL-terminated
395 char reply_buf
[LINE_BUF_SIZE
]; // this string IS NUL-terminated
397 // How many bytes of in_job->body have been read so far. If in_job is NULL
398 // while in_job_read is nonzero, we are in bit bucket mode and
399 // in_job_read's meaning is inverted -- then it counts the bytes that
400 // remain to be thrown away.
402 Job
*in_job
; // a job to be read from the client
404 Job
*out_job
; // a job to be sent to the client
405 int out_job_sent
; // how many bytes of *out_job were sent already
407 Ms watch
; // the set of watched tubes by the connection
408 Job reserved_jobs
; // linked list header
410 int conn_less(void *ca
, void *cb
);
411 void conn_setpos(void *c
, size_t i
);
412 void connsched(Conn
*c
);
413 void connclose(Conn
*c
);
414 void connsetproducer(Conn
*c
);
415 void connsetworker(Conn
*c
);
416 Job
*connsoonestjob(Conn
*c
);
417 int conndeadlinesoon(Conn
*c
);
418 int conn_ready(Conn
*c
);
419 void conn_reserve_job(Conn
*c
, Job
*j
);
420 #define conn_waiting(c) ((c)->type & CONN_TYPE_WAITING)
427 Filesizedef
= (10 << 20)
439 int64 resv
; // bytes reserved
440 int64 alive
; // bytes in use
441 int64 nmig
; // migrations
442 int64 nrec
; // records written ever
443 int wantsync
; // do we sync to disk?
444 int64 syncrate
; // how often we sync to disk, in nanoseconds
447 int waldirlock(Wal
*);
448 void walinit(Wal
*, Job
*list
);
449 int walwrite(Wal
*, Job
*);
451 int walresvput(Wal
*, Job
*);
452 int walresvupdate(Wal
*);
460 int iswopen
; // is open for writing
467 Job jlist
; // jobs written in this file
469 int fileinit(File
*, Wal
*, int);
470 Wal
* fileadd(File
*, Wal
*);
471 void fileincref(File
*);
472 void filedecref(File
*);
473 void fileaddjob(File
*, Job
*);
474 void filermjob(File
*, Job
*);
475 int fileread(File
*, Job
*list
);
476 void filewopen(File
*);
477 void filewclose(File
*);
478 int filewrjobshort(File
*, Job
*);
479 int filewrjobfull(File
*, Job
*);
482 #define Portdef "11300"
492 // Connections that must produce deadline or timeout, ordered by the time.
495 void srv_acquire_wal(Server
*s
);
496 void srvserve(Server
*s
);
497 void srvaccept(Server
*s
, int ev
);