10 #include <sys/resource.h>
12 #include <sys/types.h>
13 #include <sys/utsname.h>
14 #include <sys/socket.h>
20 /* job body cannot be greater than this many bytes long */
21 size_t job_data_size_limit
= JOB_DATA_SIZE_LIMIT_DEFAULT
;
24 "ABCDEFGHIJKLMNOPQRSTUVWXYZ" \
25 "abcdefghijklmnopqrstuvwxyz" \
28 #define CMD_PUT "put "
29 #define CMD_PEEKJOB "peek "
30 #define CMD_PEEK_READY "peek-ready"
31 #define CMD_PEEK_DELAYED "peek-delayed"
32 #define CMD_PEEK_BURIED "peek-buried"
33 #define CMD_RESERVE "reserve"
34 #define CMD_RESERVE_TIMEOUT "reserve-with-timeout "
35 #define CMD_RESERVE_JOB "reserve-job "
36 #define CMD_DELETE "delete "
37 #define CMD_RELEASE "release "
38 #define CMD_BURY "bury "
39 #define CMD_KICK "kick "
40 #define CMD_KICKJOB "kick-job "
41 #define CMD_TOUCH "touch "
42 #define CMD_STATS "stats"
43 #define CMD_STATSJOB "stats-job "
44 #define CMD_USE "use "
45 #define CMD_WATCH "watch "
46 #define CMD_IGNORE "ignore "
47 #define CMD_LIST_TUBES "list-tubes"
48 #define CMD_LIST_TUBE_USED "list-tube-used"
49 #define CMD_LIST_TUBES_WATCHED "list-tubes-watched"
50 #define CMD_STATS_TUBE "stats-tube "
51 #define CMD_QUIT "quit"
52 #define CMD_PAUSE_TUBE "pause-tube"
54 #define CONSTSTRLEN(m) (sizeof(m) - 1)
56 #define CMD_PEEK_READY_LEN CONSTSTRLEN(CMD_PEEK_READY)
57 #define CMD_PEEK_DELAYED_LEN CONSTSTRLEN(CMD_PEEK_DELAYED)
58 #define CMD_PEEK_BURIED_LEN CONSTSTRLEN(CMD_PEEK_BURIED)
59 #define CMD_PEEKJOB_LEN CONSTSTRLEN(CMD_PEEKJOB)
60 #define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE)
61 #define CMD_RESERVE_TIMEOUT_LEN CONSTSTRLEN(CMD_RESERVE_TIMEOUT)
62 #define CMD_RESERVE_JOB_LEN CONSTSTRLEN(CMD_RESERVE_JOB)
63 #define CMD_DELETE_LEN CONSTSTRLEN(CMD_DELETE)
64 #define CMD_RELEASE_LEN CONSTSTRLEN(CMD_RELEASE)
65 #define CMD_BURY_LEN CONSTSTRLEN(CMD_BURY)
66 #define CMD_KICK_LEN CONSTSTRLEN(CMD_KICK)
67 #define CMD_KICKJOB_LEN CONSTSTRLEN(CMD_KICKJOB)
68 #define CMD_TOUCH_LEN CONSTSTRLEN(CMD_TOUCH)
69 #define CMD_STATS_LEN CONSTSTRLEN(CMD_STATS)
70 #define CMD_STATSJOB_LEN CONSTSTRLEN(CMD_STATSJOB)
71 #define CMD_USE_LEN CONSTSTRLEN(CMD_USE)
72 #define CMD_WATCH_LEN CONSTSTRLEN(CMD_WATCH)
73 #define CMD_IGNORE_LEN CONSTSTRLEN(CMD_IGNORE)
74 #define CMD_LIST_TUBES_LEN CONSTSTRLEN(CMD_LIST_TUBES)
75 #define CMD_LIST_TUBE_USED_LEN CONSTSTRLEN(CMD_LIST_TUBE_USED)
76 #define CMD_LIST_TUBES_WATCHED_LEN CONSTSTRLEN(CMD_LIST_TUBES_WATCHED)
77 #define CMD_STATS_TUBE_LEN CONSTSTRLEN(CMD_STATS_TUBE)
78 #define CMD_PAUSE_TUBE_LEN CONSTSTRLEN(CMD_PAUSE_TUBE)
80 #define MSG_FOUND "FOUND"
81 #define MSG_NOTFOUND "NOT_FOUND\r\n"
82 #define MSG_RESERVED "RESERVED"
83 #define MSG_DEADLINE_SOON "DEADLINE_SOON\r\n"
84 #define MSG_TIMED_OUT "TIMED_OUT\r\n"
85 #define MSG_DELETED "DELETED\r\n"
86 #define MSG_RELEASED "RELEASED\r\n"
87 #define MSG_BURIED "BURIED\r\n"
88 #define MSG_KICKED "KICKED\r\n"
89 #define MSG_TOUCHED "TOUCHED\r\n"
90 #define MSG_BURIED_FMT "BURIED %"PRIu64"\r\n"
91 #define MSG_INSERTED_FMT "INSERTED %"PRIu64"\r\n"
92 #define MSG_NOT_IGNORED "NOT_IGNORED\r\n"
94 #define MSG_OUT_OF_MEMORY "OUT_OF_MEMORY\r\n"
95 #define MSG_INTERNAL_ERROR "INTERNAL_ERROR\r\n"
96 #define MSG_DRAINING "DRAINING\r\n"
97 #define MSG_BAD_FORMAT "BAD_FORMAT\r\n"
98 #define MSG_UNKNOWN_COMMAND "UNKNOWN_COMMAND\r\n"
99 #define MSG_EXPECTED_CRLF "EXPECTED_CRLF\r\n"
100 #define MSG_JOB_TOO_BIG "JOB_TOO_BIG\r\n"
102 // Connection can be in one of these states:
103 #define STATE_WANT_COMMAND 0 // conn expects a command from the client
104 #define STATE_WANT_DATA 1 // conn expects a job data
105 #define STATE_SEND_JOB 2 // conn sends job to the client
106 #define STATE_SEND_WORD 3 // conn sends a line reply
107 #define STATE_WAIT 4 // client awaits for the job reservation
108 #define STATE_BITBUCKET 5 // conn discards content
109 #define STATE_CLOSE 6 // conn should be closed
110 #define STATE_WANT_ENDLINE 7 // skip until the end of a line
121 #define OP_STATSJOB 9
122 #define OP_PEEK_BURIED 10
126 #define OP_LIST_TUBES 14
127 #define OP_LIST_TUBE_USED 15
128 #define OP_LIST_TUBES_WATCHED 16
129 #define OP_STATS_TUBE 17
130 #define OP_PEEK_READY 18
131 #define OP_PEEK_DELAYED 19
132 #define OP_RESERVE_TIMEOUT 20
135 #define OP_PAUSE_TUBE 23
136 #define OP_KICKJOB 24
137 #define OP_RESERVE_JOB 25
140 #define STATS_FMT "---\n" \
141 "current-jobs-urgent: %" PRIu64 "\n" \
142 "current-jobs-ready: %" PRIu64 "\n" \
143 "current-jobs-reserved: %" PRIu64 "\n" \
144 "current-jobs-delayed: %u\n" \
145 "current-jobs-buried: %" PRIu64 "\n" \
146 "cmd-put: %" PRIu64 "\n" \
147 "cmd-peek: %" PRIu64 "\n" \
148 "cmd-peek-ready: %" PRIu64 "\n" \
149 "cmd-peek-delayed: %" PRIu64 "\n" \
150 "cmd-peek-buried: %" PRIu64 "\n" \
151 "cmd-reserve: %" PRIu64 "\n" \
152 "cmd-reserve-with-timeout: %" PRIu64 "\n" \
153 "cmd-delete: %" PRIu64 "\n" \
154 "cmd-release: %" PRIu64 "\n" \
155 "cmd-use: %" PRIu64 "\n" \
156 "cmd-watch: %" PRIu64 "\n" \
157 "cmd-ignore: %" PRIu64 "\n" \
158 "cmd-bury: %" PRIu64 "\n" \
159 "cmd-kick: %" PRIu64 "\n" \
160 "cmd-touch: %" PRIu64 "\n" \
161 "cmd-stats: %" PRIu64 "\n" \
162 "cmd-stats-job: %" PRIu64 "\n" \
163 "cmd-stats-tube: %" PRIu64 "\n" \
164 "cmd-list-tubes: %" PRIu64 "\n" \
165 "cmd-list-tube-used: %" PRIu64 "\n" \
166 "cmd-list-tubes-watched: %" PRIu64 "\n" \
167 "cmd-pause-tube: %" PRIu64 "\n" \
168 "job-timeouts: %" PRIu64 "\n" \
169 "total-jobs: %" PRIu64 "\n" \
170 "max-job-size: %zu\n" \
171 "current-tubes: %zu\n" \
172 "current-connections: %u\n" \
173 "current-producers: %u\n" \
174 "current-workers: %u\n" \
175 "current-waiting: %" PRIu64 "\n" \
176 "total-connections: %u\n" \
178 "version: \"%s\"\n" \
179 "rusage-utime: %d.%06d\n" \
180 "rusage-stime: %d.%06d\n" \
182 "binlog-oldest-index: %d\n" \
183 "binlog-current-index: %d\n" \
184 "binlog-records-migrated: %" PRId64 "\n" \
185 "binlog-records-written: %" PRId64 "\n" \
186 "binlog-max-size: %d\n" \
189 "hostname: \"%s\"\n" \
191 "platform: \"%s\"\n" \
194 #define STATS_TUBE_FMT "---\n" \
196 "current-jobs-urgent: %" PRIu64 "\n" \
197 "current-jobs-ready: %zu\n" \
198 "current-jobs-reserved: %" PRIu64 "\n" \
199 "current-jobs-delayed: %zu\n" \
200 "current-jobs-buried: %" PRIu64 "\n" \
201 "total-jobs: %" PRIu64 "\n" \
202 "current-using: %u\n" \
203 "current-watching: %u\n" \
204 "current-waiting: %" PRIu64 "\n" \
205 "cmd-delete: %" PRIu64 "\n" \
206 "cmd-pause-tube: %" PRIu64 "\n" \
207 "pause: %" PRIu64 "\n" \
208 "pause-time-left: %" PRId64 "\n" \
211 #define STATS_JOB_FMT "---\n" \
212 "id: %" PRIu64 "\n" \
216 "age: %" PRId64 "\n" \
217 "delay: %" PRId64 "\n" \
218 "ttr: %" PRId64 "\n" \
219 "time-left: %" PRId64 "\n" \
228 // The size of the throw-away (BITBUCKET) buffer. Arbitrary.
229 #define BUCKET_BUF_SIZE 1024
231 static uint64 ready_ct
= 0;
232 static uint64 timeout_ct
= 0;
233 static uint64 op_ct
[TOTAL_OPS
] = {0};
234 static struct stats global_stat
= {0};
236 static Tube
*default_tube
;
238 // If drain_mode is 1, then server does not accept new jobs.
239 // Variable is set by the SIGUSR1 handler.
240 static volatile sig_atomic_t drain_mode
= 0;
242 static int64 started_at
;
244 enum { instance_id_bytes
= 8 };
245 static char instance_hex
[instance_id_bytes
* 2 + 1]; // hex-encoded len of instance_id_bytes
247 static struct utsname node_info
;
249 // Single linked list with connections that require updates
250 // in the event notification mechanism.
253 static const char * op_names
[] = {
270 CMD_LIST_TUBES_WATCHED
,
282 static Job
*remove_ready_job(Job
*j
);
283 static Job
*remove_buried_job(Job
*j
);
285 // epollq_add schedules connection c in the s->conns heap, adds c
286 // to the epollq list to change expected operation in event notifications.
287 // rw='w' means to notify when socket is writeable, 'r' - readable, 'h' - closed.
289 epollq_add(Conn
*c
, char rw
) {
296 // epollq_rmconn removes connection c from the epollq.
298 epollq_rmconn(Conn
*c
)
300 Conn
*x
, *newhead
= NULL
;
303 // x as next element from epollq.
305 epollq
= epollq
->next
;
308 // put x back into newhead list.
317 // Propagate changes to event notification mechanism about expected operations
318 // in connections' sockets. Clear the epollq list.
326 epollq
= epollq
->next
;
328 int r
= sockwant(&c
->sock
, c
->rw
);
336 #define reply_msg(c, m) \
337 reply((c), (m), CONSTSTRLEN(m), STATE_SEND_WORD)
339 #define reply_serr(c, e) \
340 (twarnx("server error: %s", (e)), reply_msg((c), (e)))
343 reply(Conn
*c
, char *line
, int len
, int state
)
355 printf(">%d reply %.*s\n", c
->sock
.fd
, len
-2, line
);
360 reply_line(Conn
*, int, const char*, ...)
361 __attribute__((format(printf
, 3, 4)));
363 // reply_line prints *fmt into c->reply_buffer and
364 // calls reply() for the string and state.
366 reply_line(Conn
*c
, int state
, const char *fmt
, ...)
372 r
= vsnprintf(c
->reply_buf
, LINE_BUF_SIZE
, fmt
, ap
);
375 /* Make sure the buffer was big enough. If not, we have a bug. */
376 if (r
>= LINE_BUF_SIZE
) {
377 reply_serr(c
, MSG_INTERNAL_ERROR
);
381 reply(c
, c
->reply_buf
, r
, state
);
384 // reply_job tells the connection c which job to send,
385 // and replies with this line: <msg> <job_id> <job_size>.
387 reply_job(Conn
*c
, Job
*j
, const char *msg
)
391 reply_line(c
, STATE_SEND_JOB
, "%s %"PRIu64
" %u\r\n",
392 msg
, j
->r
.id
, j
->r
.body_size
- 2);
395 // remove_waiting_conn unsets CONN_TYPE_WAITING for the connection,
396 // removes it from the waiting_conns set of every tube it's watching.
397 // Noop if connection is not waiting.
399 remove_waiting_conn(Conn
*c
)
401 if (!conn_waiting(c
))
404 c
->type
&= ~CONN_TYPE_WAITING
;
405 global_stat
.waiting_ct
--;
407 for (i
= 0; i
< c
->watch
.len
; i
++) {
408 Tube
*t
= c
->watch
.items
[i
];
409 t
->stat
.waiting_ct
--;
410 ms_remove(&t
->waiting_conns
, c
);
414 // enqueue_waiting_conn sets CONN_TYPE_WAITING for the connection,
415 // adds it to the waiting_conns set of every tube it's watching.
417 enqueue_waiting_conn(Conn
*c
)
419 c
->type
|= CONN_TYPE_WAITING
;
420 global_stat
.waiting_ct
++;
422 for (i
= 0; i
< c
->watch
.len
; i
++) {
423 Tube
*t
= c
->watch
.items
[i
];
424 t
->stat
.waiting_ct
++;
425 ms_append(&t
->waiting_conns
, c
);
429 // next_awaited_job iterates through all the tubes with awaiting connections,
430 // returns the next ready job with the smallest priority.
431 // If jobs has the same priority it picks the job with smaller id.
432 // All tubes with expired pause are unpaused.
434 next_awaited_job(int64 now
)
439 for (i
= 0; i
< tubes
.len
; i
++) {
440 Tube
*t
= tubes
.items
[i
];
442 if (t
->unpause_at
> now
)
446 if (t
->waiting_conns
.len
&& t
->ready
.len
) {
447 Job
*candidate
= t
->ready
.data
[0];
448 if (!j
|| job_pri_less(candidate
, j
)) {
456 // process_queue performs reservation for every jobs that is awaited for.
461 int64 now
= nanoseconds();
463 while ((j
= next_awaited_job(now
))) {
464 j
= remove_ready_job(j
);
466 twarnx("job not ready");
469 Conn
*c
= ms_take(&j
->tube
->waiting_conns
);
471 twarnx("waiting_conns is empty");
474 global_stat
.reserved_ct
++;
476 remove_waiting_conn(c
);
477 conn_reserve_job(c
, j
);
478 reply_job(c
, j
, MSG_RESERVED
);
482 // soonest_delayed_job returns the delayed job
483 // with the smallest deadline_at among all tubes.
485 soonest_delayed_job()
490 for (i
= 0; i
< tubes
.len
; i
++) {
491 Tube
*t
= tubes
.items
[i
];
492 if (t
->delay
.len
== 0) {
495 Job
*nj
= t
->delay
.data
[0];
496 if (!j
|| nj
->r
.deadline_at
< j
->r
.deadline_at
)
502 // enqueue_job inserts job j in the tube, returns 1 on success, otherwise 0.
503 // If update_store then it writes an entry to WAL.
504 // On success it processes the queue.
505 // BUG: If maintenance of WAL has failed, it is not reported as error.
507 enqueue_job(Server
*s
, Job
*j
, int64 delay
, char update_store
)
513 j
->r
.deadline_at
= nanoseconds() + delay
;
514 r
= heapinsert(&j
->tube
->delay
, j
);
517 j
->r
.state
= Delayed
;
519 r
= heapinsert(&j
->tube
->ready
, j
);
524 if (j
->r
.pri
< URGENT_THRESHOLD
) {
525 global_stat
.urgent_ct
++;
526 j
->tube
->stat
.urgent_ct
++;
531 if (!walwrite(&s
->wal
, j
)) {
537 // The call below makes this function do too much.
538 // TODO: refactor this call outside so the call is explicit (not hidden)?
544 bury_job(Server
*s
, Job
*j
, char update_store
)
547 int z
= walresvupdate(&s
->wal
);
553 job_list_insert(&j
->tube
->buried
, j
);
554 global_stat
.buried_ct
++;
555 j
->tube
->stat
.buried_ct
++;
561 if (!walwrite(&s
->wal
, j
)) {
571 enqueue_reserved_jobs(Conn
*c
)
573 while (!job_list_is_empty(&c
->reserved_jobs
)) {
574 Job
*j
= job_list_remove(c
->reserved_jobs
.next
);
575 int r
= enqueue_job(c
->srv
, j
, 0, 0);
577 bury_job(c
->srv
, j
, 0);
578 global_stat
.reserved_ct
--;
579 j
->tube
->stat
.reserved_ct
--;
580 c
->soonest_job
= NULL
;
585 kick_buried_job(Server
*s
, Job
*j
)
590 z
= walresvupdate(&s
->wal
);
595 remove_buried_job(j
);
598 r
= enqueue_job(s
, j
, 0, 1);
602 /* ready queue is full, so bury it */
613 for (i
= 0; i
< tubes
.len
; i
++) {
614 Tube
*t
= tubes
.items
[i
];
615 count
+= t
->delay
.len
;
621 kick_delayed_job(Server
*s
, Job
*j
)
626 z
= walresvupdate(&s
->wal
);
631 heapremove(&j
->tube
->delay
, j
->heap_index
);
634 r
= enqueue_job(s
, j
, 0, 1);
638 /* ready queue is full, so delay it again */
639 r
= enqueue_job(s
, j
, j
->r
.delay
, 0);
649 buried_job_p(Tube
*t
)
651 // this function does not do much. inline?
652 return !job_list_is_empty(&t
->buried
);
655 /* return the number of jobs successfully kicked */
657 kick_buried_jobs(Server
*s
, Tube
*t
, uint n
)
660 for (i
= 0; (i
< n
) && buried_job_p(t
); ++i
) {
661 kick_buried_job(s
, t
->buried
.next
);
666 /* return the number of jobs successfully kicked */
668 kick_delayed_jobs(Server
*s
, Tube
*t
, uint n
)
671 for (i
= 0; (i
< n
) && (t
->delay
.len
> 0); ++i
) {
672 kick_delayed_job(s
, (Job
*)t
->delay
.data
[0]);
678 kick_jobs(Server
*s
, Tube
*t
, uint n
)
681 return kick_buried_jobs(s
, t
, n
);
682 return kick_delayed_jobs(s
, t
, n
);
685 // remove_buried_job returns non-NULL value if job j was in the buried state.
686 // It excludes the job from the buried list and updates counters.
688 remove_buried_job(Job
*j
)
690 if (!j
|| j
->r
.state
!= Buried
)
692 j
= job_list_remove(j
);
694 global_stat
.buried_ct
--;
695 j
->tube
->stat
.buried_ct
--;
700 // remove_delayed_job returns non-NULL value if job j was in the delayed state.
701 // It removes the job from the tube delayed heap.
703 remove_delayed_job(Job
*j
)
705 if (!j
|| j
->r
.state
!= Delayed
)
707 heapremove(&j
->tube
->delay
, j
->heap_index
);
712 // remove_ready_job returns non-NULL value if job j was in the ready state.
713 // It removes the job from the tube ready heap and updates counters.
715 remove_ready_job(Job
*j
)
717 if (!j
|| j
->r
.state
!= Ready
)
719 heapremove(&j
->tube
->ready
, j
->heap_index
);
721 if (j
->r
.pri
< URGENT_THRESHOLD
) {
722 global_stat
.urgent_ct
--;
723 j
->tube
->stat
.urgent_ct
--;
729 is_job_reserved_by_conn(Conn
*c
, Job
*j
)
731 return j
&& j
->reserver
== c
&& j
->r
.state
== Reserved
;
735 touch_job(Conn
*c
, Job
*j
)
737 if (is_job_reserved_by_conn(c
, j
)) {
738 j
->r
.deadline_at
= nanoseconds() + j
->r
.ttr
;
739 c
->soonest_job
= NULL
;
746 check_err(Conn
*c
, const char *s
)
752 if (errno
== EWOULDBLOCK
)
756 c
->state
= STATE_CLOSE
;
759 /* Scan the given string for the sequence "\r\n" and return the line length.
760 * Always returns at least 2 if a match is found. Returns 0 if no match. */
762 scan_line_end(const char *s
, int size
)
766 match
= memchr(s
, '\r', size
- 1);
770 /* this is safe because we only scan size - 1 chars above */
771 if (match
[1] == '\n')
772 return match
- s
+ 2;
777 /* parse the command line */
781 #define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);
782 TEST_CMD(c
->cmd
, CMD_PUT
, OP_PUT
);
783 TEST_CMD(c
->cmd
, CMD_PEEKJOB
, OP_PEEKJOB
);
784 TEST_CMD(c
->cmd
, CMD_PEEK_READY
, OP_PEEK_READY
);
785 TEST_CMD(c
->cmd
, CMD_PEEK_DELAYED
, OP_PEEK_DELAYED
);
786 TEST_CMD(c
->cmd
, CMD_PEEK_BURIED
, OP_PEEK_BURIED
);
787 TEST_CMD(c
->cmd
, CMD_RESERVE_TIMEOUT
, OP_RESERVE_TIMEOUT
);
788 TEST_CMD(c
->cmd
, CMD_RESERVE_JOB
, OP_RESERVE_JOB
);
789 TEST_CMD(c
->cmd
, CMD_RESERVE
, OP_RESERVE
);
790 TEST_CMD(c
->cmd
, CMD_DELETE
, OP_DELETE
);
791 TEST_CMD(c
->cmd
, CMD_RELEASE
, OP_RELEASE
);
792 TEST_CMD(c
->cmd
, CMD_BURY
, OP_BURY
);
793 TEST_CMD(c
->cmd
, CMD_KICK
, OP_KICK
);
794 TEST_CMD(c
->cmd
, CMD_KICKJOB
, OP_KICKJOB
);
795 TEST_CMD(c
->cmd
, CMD_TOUCH
, OP_TOUCH
);
796 TEST_CMD(c
->cmd
, CMD_STATSJOB
, OP_STATSJOB
);
797 TEST_CMD(c
->cmd
, CMD_STATS_TUBE
, OP_STATS_TUBE
);
798 TEST_CMD(c
->cmd
, CMD_STATS
, OP_STATS
);
799 TEST_CMD(c
->cmd
, CMD_USE
, OP_USE
);
800 TEST_CMD(c
->cmd
, CMD_WATCH
, OP_WATCH
);
801 TEST_CMD(c
->cmd
, CMD_IGNORE
, OP_IGNORE
);
802 TEST_CMD(c
->cmd
, CMD_LIST_TUBES_WATCHED
, OP_LIST_TUBES_WATCHED
);
803 TEST_CMD(c
->cmd
, CMD_LIST_TUBE_USED
, OP_LIST_TUBE_USED
);
804 TEST_CMD(c
->cmd
, CMD_LIST_TUBES
, OP_LIST_TUBES
);
805 TEST_CMD(c
->cmd
, CMD_QUIT
, OP_QUIT
);
806 TEST_CMD(c
->cmd
, CMD_PAUSE_TUBE
, OP_PAUSE_TUBE
);
810 /* Copy up to body_size trailing bytes into the job, then the rest into the cmd
811 * buffer. If c->in_job exists, this assumes that c->in_job->body is empty.
812 * This function is idempotent(). */
814 fill_extra_data(Conn
*c
)
817 return; /* the connection was closed */
819 return; /* we don't have a complete command */
821 /* how many extra bytes did we read? */
822 int64 extra_bytes
= c
->cmd_read
- c
->cmd_len
;
824 int64 job_data_bytes
= 0;
825 /* how many bytes should we put into the job body? */
827 job_data_bytes
= min(extra_bytes
, c
->in_job
->r
.body_size
);
828 memcpy(c
->in_job
->body
, c
->cmd
+ c
->cmd_len
, job_data_bytes
);
829 c
->in_job_read
= job_data_bytes
;
830 } else if (c
->in_job_read
) {
831 /* we are in bit-bucket mode, throwing away data */
832 job_data_bytes
= min(extra_bytes
, c
->in_job_read
);
833 c
->in_job_read
-= job_data_bytes
;
836 /* how many bytes are left to go into the future cmd? */
837 int64 cmd_bytes
= extra_bytes
- job_data_bytes
;
838 memmove(c
->cmd
, c
->cmd
+ c
->cmd_len
+ job_data_bytes
, cmd_bytes
);
839 c
->cmd_read
= cmd_bytes
;
840 c
->cmd_len
= 0; /* we no longer know the length of the new command */
843 #define skip(conn,n,msg) (_skip(conn, n, msg, CONSTSTRLEN(msg)))
846 _skip(Conn
*c
, int64 n
, char *msg
, int msglen
)
848 /* Invert the meaning of in_job_read while throwing away data -- it
849 * counts the bytes that remain to be thrown away. */
854 if (c
->in_job_read
== 0) {
855 reply(c
, msg
, msglen
, STATE_SEND_WORD
);
860 c
->reply_len
= msglen
;
862 c
->state
= STATE_BITBUCKET
;
866 enqueue_incoming_job(Conn
*c
)
871 c
->in_job
= NULL
; /* the connection no longer owns this job */
874 /* check if the trailer is present and correct */
875 if (memcmp(j
->body
+ j
->r
.body_size
- 2, "\r\n", 2)) {
877 reply_msg(c
, MSG_EXPECTED_CRLF
);
882 printf("<%d job %"PRIu64
"\n", c
->sock
.fd
, j
->r
.id
);
887 reply_serr(c
, MSG_DRAINING
);
892 reply_serr(c
, MSG_INTERNAL_ERROR
);
895 j
->walresv
= walresvput(&c
->srv
->wal
, j
);
897 reply_serr(c
, MSG_OUT_OF_MEMORY
);
901 /* we have a complete job, so let's stick it in the pqueue */
902 r
= enqueue_job(c
->srv
, j
, j
->r
.delay
, 1);
904 // Dead code: condition cannot happen, r can take 1 or 0 values only.
906 reply_serr(c
, MSG_INTERNAL_ERROR
);
910 global_stat
.total_jobs_ct
++;
911 j
->tube
->stat
.total_jobs_ct
++;
914 reply_line(c
, STATE_SEND_WORD
, MSG_INSERTED_FMT
, j
->r
.id
);
918 /* out of memory trying to grow the queue, so it gets buried */
919 bury_job(c
->srv
, j
, 0);
920 reply_line(c
, STATE_SEND_WORD
, MSG_BURIED_FMT
, j
->r
.id
);
926 return (nanoseconds() - started_at
) / 1000000000;
930 fmt_stats(char *buf
, size_t size
, void *x
)
932 int whead
= 0, wcur
= 0;
939 whead
= s
->wal
.head
->seq
;
943 wcur
= s
->wal
.cur
->seq
;
946 getrusage(RUSAGE_SELF
, &ru
); /* don't care if it fails */
947 return snprintf(buf
, size
, STATS_FMT
,
948 global_stat
.urgent_ct
,
950 global_stat
.reserved_ct
,
951 get_delayed_job_ct(),
952 global_stat
.buried_ct
,
955 op_ct
[OP_PEEK_READY
],
956 op_ct
[OP_PEEK_DELAYED
],
957 op_ct
[OP_PEEK_BURIED
],
959 op_ct
[OP_RESERVE_TIMEOUT
],
970 op_ct
[OP_STATS_TUBE
],
971 op_ct
[OP_LIST_TUBES
],
972 op_ct
[OP_LIST_TUBE_USED
],
973 op_ct
[OP_LIST_TUBES_WATCHED
],
974 op_ct
[OP_PAUSE_TUBE
],
976 global_stat
.total_jobs_ct
,
980 count_cur_producers(),
982 global_stat
.waiting_ct
,
986 (int) ru
.ru_utime
.tv_sec
, (int) ru
.ru_utime
.tv_usec
,
987 (int) ru
.ru_stime
.tv_sec
, (int) ru
.ru_stime
.tv_usec
,
994 drain_mode
? "true" : "false",
1001 /* Read an integer from the given buffer and place it in num.
1002 * Parsed integer should fit into uint64.
1003 * Update end to point to the address after the last character consumed.
1004 * num and end can be NULL. If they are both NULL, read_u64() will do the
1005 * conversion and return the status code but not update any values.
1006 * This is an easy way to check for errors.
1007 * If end is NULL, read_u64() will also check that the entire input string
1008 * was consumed and return an error code otherwise.
1009 * Return 0 on success, or nonzero on failure.
1010 * If a failure occurs, num and end are not modified. */
1012 read_u64(uint64
*num
, const char *buf
, char **end
)
1018 while (buf
[0] == ' ')
1020 if (buf
[0] < '0' || '9' < buf
[0])
1022 tnum
= strtoumax(buf
, &tend
, 10);
1027 if (!end
&& tend
[0] != '\0')
1029 if (tnum
> UINT64_MAX
)
1032 if (num
) *num
= (uint64
)tnum
;
1033 if (end
) *end
= tend
;
1037 // Indentical to read_u64() but instead reads into uint32.
1039 read_u32(uint32
*num
, const char *buf
, char **end
)
1045 while (buf
[0] == ' ')
1047 if (buf
[0] < '0' || '9' < buf
[0])
1049 tnum
= strtoumax(buf
, &tend
, 10);
1054 if (!end
&& tend
[0] != '\0')
1056 if (tnum
> UINT32_MAX
)
1059 if (num
) *num
= (uint32
)tnum
;
1060 if (end
) *end
= tend
;
1064 /* Read a delay value in seconds from the given buffer and
1065 place it in duration in nanoseconds.
1066 The interface and behavior are analogous to read_u32(). */
1068 read_duration(int64
*duration
, const char *buf
, char **end
)
1073 r
= read_u32(&dur_sec
, buf
, end
);
1076 *duration
= ((int64
) dur_sec
) * 1000000000;
1080 /* Read a tube name from the given buffer moving the buffer to the name start */
1082 read_tube_name(char **tubename
, char *buf
, char **end
)
1086 while (buf
[0] == ' ')
1088 len
= strspn(buf
, NAME_CHARS
);
1099 wait_for_job(Conn
*c
, int timeout
)
1101 c
->state
= STATE_WAIT
;
1102 enqueue_waiting_conn(c
);
1104 /* Set the pending timeout to the requested timeout amount */
1105 c
->pending_timeout
= timeout
;
1107 // only care if they hang up
1111 typedef int(*fmt_fn
)(char *, size_t, void *);
1114 do_stats(Conn
*c
, fmt_fn fmt
, void *data
)
1118 /* first, measure how big a buffer we will need */
1119 stats_len
= fmt(NULL
, 0, data
) + 16;
1121 c
->out_job
= allocate_job(stats_len
); /* fake job to hold stats data */
1123 reply_serr(c
, MSG_OUT_OF_MEMORY
);
1127 /* Mark this job as a copy so it can be appropriately freed later on */
1128 c
->out_job
->r
.state
= Copy
;
1130 /* now actually format the stats data */
1131 r
= fmt(c
->out_job
->body
, stats_len
, data
);
1132 /* and set the actual body size */
1133 c
->out_job
->r
.body_size
= r
;
1134 if (r
> stats_len
) {
1135 reply_serr(c
, MSG_INTERNAL_ERROR
);
1139 c
->out_job_sent
= 0;
1140 reply_line(c
, STATE_SEND_JOB
, "OK %d\r\n", r
- 2);
1144 do_list_tubes(Conn
*c
, Ms
*l
)
1150 /* first, measure how big a buffer we will need */
1151 resp_z
= 6; /* initial "---\n" and final "\r\n" */
1152 for (i
= 0; i
< l
->len
; i
++) {
1154 resp_z
+= 3 + strlen(t
->name
); /* including "- " and "\n" */
1157 c
->out_job
= allocate_job(resp_z
); /* fake job to hold response data */
1159 reply_serr(c
, MSG_OUT_OF_MEMORY
);
1163 /* Mark this job as a copy so it can be appropriately freed later on */
1164 c
->out_job
->r
.state
= Copy
;
1166 /* now actually format the response */
1167 buf
= c
->out_job
->body
;
1168 buf
+= snprintf(buf
, 5, "---\n");
1169 for (i
= 0; i
< l
->len
; i
++) {
1171 buf
+= snprintf(buf
, 4 + strlen(t
->name
), "- %s\n", t
->name
);
1176 c
->out_job_sent
= 0;
1177 reply_line(c
, STATE_SEND_JOB
, "OK %zu\r\n", resp_z
- 2);
1181 fmt_job_stats(char *buf
, size_t size
, Job
*j
)
1188 if (j
->r
.state
== Reserved
|| j
->r
.state
== Delayed
) {
1189 time_left
= (j
->r
.deadline_at
- t
) / 1000000000;
1194 file
= j
->file
->seq
;
1196 return snprintf(buf
, size
, STATS_JOB_FMT
,
1201 (t
- j
->r
.created_at
) / 1000000000,
1202 j
->r
.delay
/ 1000000000,
1203 j
->r
.ttr
/ 1000000000,
1214 fmt_stats_tube(char *buf
, size_t size
, Tube
*t
)
1219 time_left
= (t
->unpause_at
- nanoseconds()) / 1000000000;
1223 return snprintf(buf
, size
, STATS_TUBE_FMT
,
1227 t
->stat
.reserved_ct
,
1230 t
->stat
.total_jobs_ct
,
1234 t
->stat
.total_delete_ct
,
1236 t
->pause
/ 1000000000,
1241 maybe_enqueue_incoming_job(Conn
*c
)
1245 /* do we have a complete job? */
1246 if (c
->in_job_read
== j
->r
.body_size
) {
1247 enqueue_incoming_job(c
);
1251 /* otherwise we have incomplete data, so just keep waiting */
1252 c
->state
= STATE_WANT_DATA
;
1257 remove_this_reserved_job(Conn
*c
, Job
*j
)
1259 j
= job_list_remove(j
);
1261 global_stat
.reserved_ct
--;
1262 j
->tube
->stat
.reserved_ct
--;
1265 c
->soonest_job
= NULL
;
1270 remove_reserved_job(Conn
*c
, Job
*j
)
1272 if (!is_job_reserved_by_conn(c
, j
))
1274 return remove_this_reserved_job(c
, j
);
1278 is_valid_tube(const char *name
, size_t max
)
1280 size_t len
= strlen(name
);
1281 return 0 < len
&& len
<= max
&&
1282 strspn(name
, NAME_CHARS
) == len
&&
1287 dispatch_cmd(Conn
*c
)
1289 int r
, timeout
= -1;
1294 char *size_buf
, *delay_buf
, *ttr_buf
, *pri_buf
, *end_buf
, *name
;
1301 /* NUL-terminate this string so we can use strtol and friends */
1302 c
->cmd
[c
->cmd_len
- 2] = '\0';
1304 /* check for possible maliciousness */
1305 if (strlen(c
->cmd
) != c
->cmd_len
- 2) {
1306 reply_msg(c
, MSG_BAD_FORMAT
);
1310 type
= which_cmd(c
);
1312 printf("<%d command %s\n", c
->sock
.fd
, op_names
[type
]);
1317 if (read_u32(&pri
, c
->cmd
+ 4, &delay_buf
) ||
1318 read_duration(&delay
, delay_buf
, &ttr_buf
) ||
1319 read_duration(&ttr
, ttr_buf
, &size_buf
) ||
1320 read_u32(&body_size
, size_buf
, &end_buf
)) {
1321 reply_msg(c
, MSG_BAD_FORMAT
);
1326 if (body_size
> job_data_size_limit
) {
1327 /* throw away the job body and respond with JOB_TOO_BIG */
1328 skip(c
, (int64
)body_size
+ 2, MSG_JOB_TOO_BIG
);
1332 /* don't allow trailing garbage */
1333 if (end_buf
[0] != '\0') {
1334 reply_msg(c
, MSG_BAD_FORMAT
);
1340 if (ttr
< 1000000000) {
1344 c
->in_job
= make_job(pri
, delay
, ttr
, body_size
+ 2, c
->use
);
1348 /* throw away the job body and respond with OUT_OF_MEMORY */
1349 twarnx("server error: " MSG_OUT_OF_MEMORY
);
1350 skip(c
, body_size
+ 2, MSG_OUT_OF_MEMORY
);
1356 /* it's possible we already have a complete job */
1357 maybe_enqueue_incoming_job(c
);
1361 /* don't allow trailing garbage */
1362 if (c
->cmd_len
!= CMD_PEEK_READY_LEN
+ 2) {
1363 reply_msg(c
, MSG_BAD_FORMAT
);
1368 if (c
->use
->ready
.len
) {
1369 j
= job_copy(c
->use
->ready
.data
[0]);
1373 reply_msg(c
, MSG_NOTFOUND
);
1376 reply_job(c
, j
, MSG_FOUND
);
1379 case OP_PEEK_DELAYED
:
1380 /* don't allow trailing garbage */
1381 if (c
->cmd_len
!= CMD_PEEK_DELAYED_LEN
+ 2) {
1382 reply_msg(c
, MSG_BAD_FORMAT
);
1387 if (c
->use
->delay
.len
) {
1388 j
= job_copy(c
->use
->delay
.data
[0]);
1392 reply_msg(c
, MSG_NOTFOUND
);
1395 reply_job(c
, j
, MSG_FOUND
);
1398 case OP_PEEK_BURIED
:
1399 /* don't allow trailing garbage */
1400 if (c
->cmd_len
!= CMD_PEEK_BURIED_LEN
+ 2) {
1401 reply_msg(c
, MSG_BAD_FORMAT
);
1406 if (buried_job_p(c
->use
))
1407 j
= job_copy(c
->use
->buried
.next
);
1412 reply_msg(c
, MSG_NOTFOUND
);
1415 reply_job(c
, j
, MSG_FOUND
);
1419 if (read_u64(&id
, c
->cmd
+ CMD_PEEKJOB_LEN
, NULL
)) {
1420 reply_msg(c
, MSG_BAD_FORMAT
);
1425 /* So, peek is annoying, because some other connection might free the
1426 * job while we are still trying to write it out. So we copy it and
1427 * free the copy when it's done sending, in the "conn_want_command" function. */
1428 j
= job_copy(job_find(id
));
1431 reply_msg(c
, MSG_NOTFOUND
);
1434 reply_job(c
, j
, MSG_FOUND
);
1437 case OP_RESERVE_TIMEOUT
:
1439 uint32 utimeout
= 0;
1440 if (read_u32(&utimeout
, c
->cmd
+ CMD_RESERVE_TIMEOUT_LEN
, &end_buf
) != 0 || utimeout
> INT_MAX
) {
1441 reply_msg(c
, MSG_BAD_FORMAT
);
1444 timeout
= (int)utimeout
;
1448 /* don't allow trailing garbage */
1449 if (type
== OP_RESERVE
&& c
->cmd_len
!= CMD_RESERVE_LEN
+ 2) {
1450 reply_msg(c
, MSG_BAD_FORMAT
);
1456 if (conndeadlinesoon(c
) && !conn_ready(c
)) {
1457 reply_msg(c
, MSG_DEADLINE_SOON
);
1461 /* try to get a new job for this guy */
1462 wait_for_job(c
, timeout
);
1466 case OP_RESERVE_JOB
:
1467 if (read_u64(&id
, c
->cmd
+ CMD_RESERVE_JOB_LEN
, NULL
)) {
1468 reply_msg(c
, MSG_BAD_FORMAT
);
1473 // This command could produce "deadline soon" if
1474 // the connection has a reservation about to expire.
1475 // We choose not to do it, because this command does not block
1476 // for an arbitrary amount of time as reserve and reserve-with-timeout.
1480 reply_msg(c
, MSG_NOTFOUND
);
1483 // Check if this job is already reserved.
1484 if (j
->r
.state
== Reserved
|| j
->r
.state
== Invalid
) {
1485 reply_msg(c
, MSG_NOTFOUND
);
1489 // Job can be in ready, buried or delayed states.
1490 if (j
->r
.state
== Ready
) {
1491 j
= remove_ready_job(j
);
1492 } else if (j
->r
.state
== Buried
) {
1493 j
= remove_buried_job(j
);
1494 } else if (j
->r
.state
== Delayed
) {
1495 j
= remove_delayed_job(j
);
1497 reply_serr(c
, MSG_INTERNAL_ERROR
);
1502 global_stat
.reserved_ct
++;
1504 conn_reserve_job(c
, j
);
1505 reply_job(c
, j
, MSG_RESERVED
);
1509 if (read_u64(&id
, c
->cmd
+ CMD_DELETE_LEN
, NULL
)) {
1510 reply_msg(c
, MSG_BAD_FORMAT
);
1516 Job
*jf
= job_find(id
);
1517 j
= remove_reserved_job(c
, jf
);
1519 j
= remove_ready_job(jf
);
1521 j
= remove_buried_job(jf
);
1523 j
= remove_delayed_job(jf
);
1527 reply_msg(c
, MSG_NOTFOUND
);
1531 j
->tube
->stat
.total_delete_ct
++;
1533 j
->r
.state
= Invalid
;
1534 r
= walwrite(&c
->srv
->wal
, j
);
1535 walmaint(&c
->srv
->wal
);
1539 reply_serr(c
, MSG_INTERNAL_ERROR
);
1542 reply_msg(c
, MSG_DELETED
);
1546 if (read_u64(&id
, c
->cmd
+ CMD_RELEASE_LEN
, &pri_buf
) ||
1547 read_u32(&pri
, pri_buf
, &delay_buf
) ||
1548 read_duration(&delay
, delay_buf
, NULL
)) {
1549 reply_msg(c
, MSG_BAD_FORMAT
);
1554 j
= remove_reserved_job(c
, job_find(id
));
1557 reply_msg(c
, MSG_NOTFOUND
);
1561 /* We want to update the delay deadline on disk, so reserve space for
1564 int z
= walresvupdate(&c
->srv
->wal
);
1566 reply_serr(c
, MSG_OUT_OF_MEMORY
);
1576 r
= enqueue_job(c
->srv
, j
, delay
, !!delay
);
1578 reply_serr(c
, MSG_INTERNAL_ERROR
);
1582 reply_msg(c
, MSG_RELEASED
);
1586 /* out of memory trying to grow the queue, so it gets buried */
1587 bury_job(c
->srv
, j
, 0);
1588 reply_msg(c
, MSG_BURIED
);
1592 if (read_u64(&id
, c
->cmd
+ CMD_BURY_LEN
, &pri_buf
) ||
1593 read_u32(&pri
, pri_buf
, NULL
)) {
1594 reply_msg(c
, MSG_BAD_FORMAT
);
1600 j
= remove_reserved_job(c
, job_find(id
));
1603 reply_msg(c
, MSG_NOTFOUND
);
1608 r
= bury_job(c
->srv
, j
, 1);
1610 reply_serr(c
, MSG_INTERNAL_ERROR
);
1613 reply_msg(c
, MSG_BURIED
);
1618 count
= strtoul(c
->cmd
+ CMD_KICK_LEN
, &end_buf
, 10);
1619 if (end_buf
== c
->cmd
+ CMD_KICK_LEN
|| errno
) {
1620 reply_msg(c
, MSG_BAD_FORMAT
);
1626 i
= kick_jobs(c
->srv
, c
->use
, count
);
1627 reply_line(c
, STATE_SEND_WORD
, "KICKED %u\r\n", i
);
1631 if (read_u64(&id
, c
->cmd
+ CMD_KICKJOB_LEN
, NULL
)) {
1632 reply_msg(c
, MSG_BAD_FORMAT
);
1640 reply_msg(c
, MSG_NOTFOUND
);
1644 if ((j
->r
.state
== Buried
&& kick_buried_job(c
->srv
, j
)) ||
1645 (j
->r
.state
== Delayed
&& kick_delayed_job(c
->srv
, j
))) {
1646 reply_msg(c
, MSG_KICKED
);
1648 reply_msg(c
, MSG_NOTFOUND
);
1653 if (read_u64(&id
, c
->cmd
+ CMD_TOUCH_LEN
, NULL
)) {
1654 reply_msg(c
, MSG_BAD_FORMAT
);
1659 if (touch_job(c
, job_find(id
))) {
1660 reply_msg(c
, MSG_TOUCHED
);
1662 reply_msg(c
, MSG_NOTFOUND
);
1667 /* don't allow trailing garbage */
1668 if (c
->cmd_len
!= CMD_STATS_LEN
+ 2) {
1669 reply_msg(c
, MSG_BAD_FORMAT
);
1674 do_stats(c
, fmt_stats
, c
->srv
);
1678 if (read_u64(&id
, c
->cmd
+ CMD_STATSJOB_LEN
, NULL
)) {
1679 reply_msg(c
, MSG_BAD_FORMAT
);
1686 reply_msg(c
, MSG_NOTFOUND
);
1691 reply_serr(c
, MSG_INTERNAL_ERROR
);
1694 do_stats(c
, (fmt_fn
) fmt_job_stats
, j
);
1698 name
= c
->cmd
+ CMD_STATS_TUBE_LEN
;
1699 if (!is_valid_tube(name
, MAX_TUBE_NAME_LEN
- 1)) {
1700 reply_msg(c
, MSG_BAD_FORMAT
);
1705 t
= tube_find(&tubes
, name
);
1707 reply_msg(c
, MSG_NOTFOUND
);
1710 do_stats(c
, (fmt_fn
) fmt_stats_tube
, t
);
1715 /* don't allow trailing garbage */
1716 if (c
->cmd_len
!= CMD_LIST_TUBES_LEN
+ 2) {
1717 reply_msg(c
, MSG_BAD_FORMAT
);
1721 do_list_tubes(c
, &tubes
);
1724 case OP_LIST_TUBE_USED
:
1725 /* don't allow trailing garbage */
1726 if (c
->cmd_len
!= CMD_LIST_TUBE_USED_LEN
+ 2) {
1727 reply_msg(c
, MSG_BAD_FORMAT
);
1731 reply_line(c
, STATE_SEND_WORD
, "USING %s\r\n", c
->use
->name
);
1734 case OP_LIST_TUBES_WATCHED
:
1735 /* don't allow trailing garbage */
1736 if (c
->cmd_len
!= CMD_LIST_TUBES_WATCHED_LEN
+ 2) {
1737 reply_msg(c
, MSG_BAD_FORMAT
);
1741 do_list_tubes(c
, &c
->watch
);
1745 name
= c
->cmd
+ CMD_USE_LEN
;
1746 if (!is_valid_tube(name
, MAX_TUBE_NAME_LEN
- 1)) {
1747 reply_msg(c
, MSG_BAD_FORMAT
);
1752 TUBE_ASSIGN(t
, tube_find_or_make(name
));
1754 reply_serr(c
, MSG_OUT_OF_MEMORY
);
1759 TUBE_ASSIGN(c
->use
, t
);
1760 TUBE_ASSIGN(t
, NULL
);
1763 reply_line(c
, STATE_SEND_WORD
, "USING %s\r\n", c
->use
->name
);
1767 name
= c
->cmd
+ CMD_WATCH_LEN
;
1768 if (!is_valid_tube(name
, MAX_TUBE_NAME_LEN
- 1)) {
1769 reply_msg(c
, MSG_BAD_FORMAT
);
1774 TUBE_ASSIGN(t
, tube_find_or_make(name
));
1776 reply_serr(c
, MSG_OUT_OF_MEMORY
);
1781 if (!ms_contains(&c
->watch
, t
))
1782 r
= ms_append(&c
->watch
, t
);
1783 TUBE_ASSIGN(t
, NULL
);
1785 reply_serr(c
, MSG_OUT_OF_MEMORY
);
1788 reply_line(c
, STATE_SEND_WORD
, "WATCHING %zu\r\n", c
->watch
.len
);
1792 name
= c
->cmd
+ CMD_IGNORE_LEN
;
1793 if (!is_valid_tube(name
, MAX_TUBE_NAME_LEN
- 1)) {
1794 reply_msg(c
, MSG_BAD_FORMAT
);
1799 t
= tube_find(&c
->watch
, name
);
1800 if (t
&& c
->watch
.len
< 2) {
1801 reply_msg(c
, MSG_NOT_IGNORED
);
1806 ms_remove(&c
->watch
, t
); /* may free t if refcount => 0 */
1808 reply_line(c
, STATE_SEND_WORD
, "WATCHING %zu\r\n", c
->watch
.len
);
1812 c
->state
= STATE_CLOSE
;
1816 if (read_tube_name(&name
, c
->cmd
+ CMD_PAUSE_TUBE_LEN
, &delay_buf
) ||
1817 read_duration(&delay
, delay_buf
, NULL
)) {
1818 reply_msg(c
, MSG_BAD_FORMAT
);
1824 if (!is_valid_tube(name
, MAX_TUBE_NAME_LEN
- 1)) {
1825 reply_msg(c
, MSG_BAD_FORMAT
);
1828 t
= tube_find(&tubes
, name
);
1830 reply_msg(c
, MSG_NOTFOUND
);
1834 // Always pause for a positive amount of time, to make sure
1835 // that waiting clients wake up when the deadline arrives.
1840 t
->unpause_at
= nanoseconds() + delay
;
1844 reply_line(c
, STATE_SEND_WORD
, "PAUSED\r\n");
1848 reply_msg(c
, MSG_UNKNOWN_COMMAND
);
1852 /* There are three reasons this function may be called. We need to check for
1855 * 1. A reserved job has run out of time.
1856 * 2. A waiting client's reserved job has entered the safety margin.
1857 * 3. A waiting client's requested timeout has occurred.
1859 * If any of these happen, we must do the appropriate thing. */
1861 conn_timeout(Conn
*c
)
1863 int should_timeout
= 0;
1866 /* Check if the client was trying to reserve a job. */
1867 if (conn_waiting(c
) && conndeadlinesoon(c
))
1870 /* Check if any reserved jobs have run out of time. We should do this
1871 * whether or not the client is waiting for a new reservation. */
1872 while ((j
= connsoonestjob(c
))) {
1873 if (j
->r
.deadline_at
>= nanoseconds())
1876 /* This job is in the middle of being written out. If we return it to
1877 * the ready queue, someone might free it before we finish writing it
1878 * out to the socket. So we'll copy it here and free the copy when it's
1880 if (j
== c
->out_job
) {
1881 c
->out_job
= job_copy(c
->out_job
);
1884 timeout_ct
++; /* stats */
1886 int r
= enqueue_job(c
->srv
, remove_this_reserved_job(c
, j
), 0, 0);
1888 bury_job(c
->srv
, j
, 0); /* out of memory, so bury it */
1892 if (should_timeout
) {
1893 remove_waiting_conn(c
);
1894 reply_msg(c
, MSG_DEADLINE_SOON
);
1895 } else if (conn_waiting(c
) && c
->pending_timeout
>= 0) {
1896 c
->pending_timeout
= -1;
1897 remove_waiting_conn(c
);
1898 reply_msg(c
, MSG_TIMED_OUT
);
1903 enter_drain_mode(int sig
)
1905 UNUSED_PARAMETER(sig
);
1910 conn_want_command(Conn
*c
)
1914 /* was this a peek or stats command? */
1915 if (c
->out_job
&& c
->out_job
->r
.state
== Copy
)
1916 job_free(c
->out_job
);
1919 c
->reply_sent
= 0; /* now that we're done, reset this */
1920 c
->state
= STATE_WANT_COMMAND
;
1924 conn_process_io(Conn
*c
)
1929 struct iovec iov
[2];
1932 case STATE_WANT_COMMAND
:
1933 r
= read(c
->sock
.fd
, c
->cmd
+ c
->cmd_read
, LINE_BUF_SIZE
- c
->cmd_read
);
1935 check_err(c
, "read()");
1939 c
->state
= STATE_CLOSE
;
1944 c
->cmd_len
= scan_line_end(c
->cmd
, c
->cmd_read
);
1946 // We found complete command line. Bail out to h_conn.
1950 // c->cmd_read > LINE_BUF_SIZE can't happen
1952 if (c
->cmd_read
== LINE_BUF_SIZE
) {
1953 // Command line too long.
1954 // Put connection into special state that discards
1955 // the command line until the end line is found.
1956 c
->cmd_read
= 0; // discard the input so far
1957 c
->state
= STATE_WANT_ENDLINE
;
1959 // We have an incomplete line, so just keep waiting.
1962 case STATE_WANT_ENDLINE
:
1963 r
= read(c
->sock
.fd
, c
->cmd
+ c
->cmd_read
, LINE_BUF_SIZE
- c
->cmd_read
);
1965 check_err(c
, "read()");
1969 c
->state
= STATE_CLOSE
;
1974 c
->cmd_len
= scan_line_end(c
->cmd
, c
->cmd_read
);
1976 // Found the EOL. Reply and reuse whatever was read afer the EOL.
1977 reply_msg(c
, MSG_BAD_FORMAT
);
1982 // c->cmd_read > LINE_BUF_SIZE can't happen
1984 if (c
->cmd_read
== LINE_BUF_SIZE
) {
1985 // Keep discarding the input since no EOL was found.
1990 case STATE_BITBUCKET
: {
1991 /* Invert the meaning of in_job_read while throwing away data -- it
1992 * counts the bytes that remain to be thrown away. */
1993 static char bucket
[BUCKET_BUF_SIZE
];
1994 to_read
= min(c
->in_job_read
, BUCKET_BUF_SIZE
);
1995 r
= read(c
->sock
.fd
, bucket
, to_read
);
1997 check_err(c
, "read()");
2001 c
->state
= STATE_CLOSE
;
2005 c
->in_job_read
-= r
; /* we got some bytes */
2007 /* (c->in_job_read < 0) can't happen */
2009 if (c
->in_job_read
== 0) {
2010 reply(c
, c
->reply
, c
->reply_len
, STATE_SEND_WORD
);
2014 case STATE_WANT_DATA
:
2017 r
= read(c
->sock
.fd
, j
->body
+ c
->in_job_read
, j
->r
.body_size
-c
->in_job_read
);
2019 check_err(c
, "read()");
2023 c
->state
= STATE_CLOSE
;
2027 c
->in_job_read
+= r
; /* we got some bytes */
2029 /* (j->in_job_read > j->r.body_size) can't happen */
2031 maybe_enqueue_incoming_job(c
);
2033 case STATE_SEND_WORD
:
2034 r
= write(c
->sock
.fd
, c
->reply
+ c
->reply_sent
, c
->reply_len
- c
->reply_sent
);
2036 check_err(c
, "write()");
2040 c
->state
= STATE_CLOSE
;
2044 c
->reply_sent
+= r
; /* we got some bytes */
2046 /* (c->reply_sent > c->reply_len) can't happen */
2048 if (c
->reply_sent
== c
->reply_len
) {
2049 conn_want_command(c
);
2053 /* otherwise we sent an incomplete reply, so just keep waiting */
2055 case STATE_SEND_JOB
:
2058 iov
[0].iov_base
= (void *)(c
->reply
+ c
->reply_sent
);
2059 iov
[0].iov_len
= c
->reply_len
- c
->reply_sent
; /* maybe 0 */
2060 iov
[1].iov_base
= j
->body
+ c
->out_job_sent
;
2061 iov
[1].iov_len
= j
->r
.body_size
- c
->out_job_sent
;
2063 r
= writev(c
->sock
.fd
, iov
, 2);
2065 check_err(c
, "writev()");
2069 c
->state
= STATE_CLOSE
;
2073 /* update the sent values */
2075 if (c
->reply_sent
>= c
->reply_len
) {
2076 c
->out_job_sent
+= c
->reply_sent
- c
->reply_len
;
2077 c
->reply_sent
= c
->reply_len
;
2080 /* (c->out_job_sent > j->r.body_size) can't happen */
2083 if (c
->out_job_sent
== j
->r
.body_size
) {
2085 printf(">%d job %"PRIu64
"\n", c
->sock
.fd
, j
->r
.id
);
2087 conn_want_command(c
);
2091 /* otherwise we sent incomplete data, so just keep waiting */
2094 if (c
->halfclosed
) {
2095 c
->pending_timeout
= -1;
2096 remove_waiting_conn(c
);
2097 reply_msg(c
, MSG_TIMED_OUT
);
2104 #define want_command(c) ((c)->sock.fd && ((c)->state == STATE_WANT_COMMAND))
2105 #define cmd_data_ready(c) (want_command(c) && (c)->cmd_read)
2108 h_conn(const int fd
, const short which
, Conn
*c
)
2110 if (fd
!= c
->sock
.fd
) {
2111 twarnx("Argh! event fd doesn't match conn fd.");
2123 while (cmd_data_ready(c
) && (c
->cmd_len
= scan_line_end(c
->cmd
, c
->cmd_read
))) {
2127 if (c
->state
== STATE_CLOSE
) {
2135 prothandle(Conn
*c
, int ev
)
2137 h_conn(c
->sock
.fd
, ev
, c
);
2140 // prottick returns nanoseconds till the next work.
2147 int64 period
= 0x34630B8A000LL
; /* 1 hour in nanoseconds */
2150 now
= nanoseconds();
2152 // Enqueue all jobs that are no longer delayed.
2153 // Capture the smallest period from the soonest delayed job.
2154 while ((j
= soonest_delayed_job())) {
2155 d
= j
->r
.deadline_at
- now
;
2157 period
= min(period
, d
);
2160 heapremove(&j
->tube
->delay
, j
->heap_index
);
2161 int r
= enqueue_job(s
, j
, 0, 0);
2163 bury_job(s
, j
, 0); /* out of memory */
2166 // Unpause every possible tube and process the queue.
2167 // Capture the smallest period from the soonest pause deadline.
2169 for (i
= 0; i
< tubes
.len
; i
++) {
2171 d
= t
->unpause_at
- now
;
2172 if (t
->pause
&& d
<= 0) {
2177 period
= min(period
, d
);
2181 // Process connections with pending timeouts. Release jobs with expired ttr.
2182 // Capture the smallest period from the soonest connection.
2183 while (s
->conns
.len
) {
2184 Conn
*c
= s
->conns
.data
[0];
2185 d
= c
->tickat
- now
;
2187 period
= min(period
, d
);
2190 heapremove(&s
->conns
, 0);
2201 h_accept(const int fd
, const short which
, Server
*s
)
2203 UNUSED_PARAMETER(which
);
2204 struct sockaddr_storage addr
;
2206 socklen_t addrlen
= sizeof addr
;
2207 int cfd
= accept(fd
, (struct sockaddr
*)&addr
, &addrlen
);
2209 if (errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
) twarn("accept()");
2214 printf("accept %d\n", cfd
);
2217 int flags
= fcntl(cfd
, F_GETFL
, 0);
2219 twarn("getting flags");
2222 printf("close %d\n", cfd
);
2228 int r
= fcntl(cfd
, F_SETFL
, flags
| O_NONBLOCK
);
2230 twarn("setting O_NONBLOCK");
2233 printf("close %d\n", cfd
);
2239 Conn
*c
= make_conn(cfd
, STATE_WANT_COMMAND
, default_tube
, default_tube
);
2241 twarnx("make_conn() failed");
2244 printf("close %d\n", cfd
);
2251 c
->sock
.f
= (Handle
)prothandle
;
2254 r
= sockwant(&c
->sock
, 'r');
2259 printf("close %d\n", cfd
);
2268 started_at
= nanoseconds();
2269 memset(op_ct
, 0, sizeof(op_ct
));
2271 int dev_random
= open("/dev/urandom", O_RDONLY
);
2272 if (dev_random
< 0) {
2273 twarn("open /dev/urandom");
2278 byte rand_data
[instance_id_bytes
];
2279 r
= read(dev_random
, &rand_data
, instance_id_bytes
);
2280 if (r
!= instance_id_bytes
) {
2281 twarn("read /dev/urandom");
2284 for (i
= 0; i
< instance_id_bytes
; i
++) {
2285 sprintf(instance_hex
+ (i
* 2), "%02x", rand_data
[i
]);
2289 if (uname(&node_info
) == -1) {
2294 ms_init(&tubes
, NULL
, NULL
);
2296 TUBE_ASSIGN(default_tube
, tube_find_or_make("default"));
2298 twarnx("Out of memory during startup!");
2301 // For each job in list, inserts the job into the appropriate data
2302 // structures and adds it to the log.
2304 // Returns 1 on success, 0 on failure.
2306 prot_replay(Server
*s
, Job
*list
)
2312 for (j
= list
->next
; j
!= list
; j
= nj
) {
2315 int z
= walresvupdate(&s
->wal
);
2317 twarnx("failed to reserve space");
2321 switch (j
->r
.state
) {
2328 if (t
< j
->r
.deadline_at
) {
2329 delay
= j
->r
.deadline_at
- t
;
2333 r
= enqueue_job(s
, j
, delay
, 0);
2335 twarnx("error recovering job %"PRIu64
, j
->r
.id
);