10 #define SAFETY_MARGIN (1000000000) /* 1 second */
12 static int cur_conn_ct
= 0, cur_worker_ct
= 0, cur_producer_ct
= 0;
13 static uint tot_conn_ct
= 0;
17 on_watch(Ms
*a
, Tube
*t
, size_t i
)
26 on_ignore(Ms
*a
, Tube
*t
, size_t i
)
35 make_conn(int fd
, char start_state
, Tube
*use
, Tube
*watch
)
43 ms_init(&c
->watch
, (ms_event_fn
) on_watch
, (ms_event_fn
) on_ignore
);
44 if (!ms_append(&c
->watch
, watch
)) {
50 TUBE_ASSIGN(c
->use
, use
);
53 c
->state
= start_state
;
54 c
->pending_timeout
= -1;
55 c
->tickpos
= 0; // Does not mean anything if in_conns is set to 0.
59 job_list_reset(&c
->reserved_jobs
);
69 connsetproducer(Conn
*c
)
71 if (c
->type
& CONN_TYPE_PRODUCER
) return;
72 c
->type
|= CONN_TYPE_PRODUCER
;
73 cur_producer_ct
++; /* stats */
77 connsetworker(Conn
*c
)
79 if (c
->type
& CONN_TYPE_WORKER
) return;
80 c
->type
|= CONN_TYPE_WORKER
;
81 cur_worker_ct
++; /* stats */
99 return cur_producer_ct
;
105 return cur_worker_ct
;
109 has_reserved_job(Conn
*c
)
111 return !job_list_is_empty(&c
->reserved_jobs
);
115 // Returns positive nanoseconds when c should tick, 0 otherwise.
119 int margin
= 0, should_timeout
= 0;
122 if (conn_waiting(c
)) {
123 margin
= SAFETY_MARGIN
;
126 if (has_reserved_job(c
)) {
127 t
= connsoonestjob(c
)->r
.deadline_at
- nanoseconds() - margin
;
130 if (c
->pending_timeout
>= 0) {
131 t
= min(t
, ((int64
)c
->pending_timeout
) * 1000000000);
135 if (should_timeout
) {
136 return nanoseconds() + t
;
142 // Remove c from the c->srv heap and reschedule it using the value
143 // returned by conntickat if there is an outstanding timeout in the c.
148 heapremove(&c
->srv
->conns
, c
->tickpos
);
151 c
->tickat
= conntickat(c
);
153 heapinsert(&c
->srv
->conns
, c
);
158 // conn_set_soonestjob updates c->soonest_job with j
159 // if j should be handled sooner than c->soonest_job.
161 conn_set_soonestjob(Conn
*c
, Job
*j
) {
162 if (!c
->soonest_job
|| j
->r
.deadline_at
< c
->soonest_job
->r
.deadline_at
) {
167 // Return the reserved job with the earliest deadline,
168 // or NULL if there's no reserved job.
170 connsoonestjob(Conn
*c
)
172 // use cached value and bail out.
173 if (c
->soonest_job
!= NULL
)
174 return c
->soonest_job
;
177 for (j
= c
->reserved_jobs
.next
; j
!= &c
->reserved_jobs
; j
= j
->next
) {
178 conn_set_soonestjob(c
, j
);
180 return c
->soonest_job
;
184 conn_reserve_job(Conn
*c
, Job
*j
) {
185 j
->tube
->stat
.reserved_ct
++;
188 j
->r
.deadline_at
= nanoseconds() + j
->r
.ttr
;
189 j
->r
.state
= Reserved
;
190 job_list_insert(&c
->reserved_jobs
, j
);
192 c
->pending_timeout
= -1;
193 conn_set_soonestjob(c
, j
);
196 // Return true if c has a reserved job with less than one second until its
199 conndeadlinesoon(Conn
*c
)
201 int64 t
= nanoseconds();
202 Job
*j
= connsoonestjob(c
);
204 return j
&& t
>= j
->r
.deadline_at
- SAFETY_MARGIN
;
212 for (i
= 0; i
< c
->watch
.len
; i
++) {
213 if (((Tube
*) c
->watch
.items
[i
])->ready
.len
)
221 conn_less(void *ca
, void *cb
)
223 Conn
*a
= (Conn
*)ca
;
224 Conn
*b
= (Conn
*)cb
;
225 return a
->tickat
< b
->tickat
;
230 conn_setpos(void *c
, size_t i
)
232 ((Conn
*)c
)->tickpos
= i
;
239 sockwant(&c
->sock
, 0);
242 printf("close %d\n", c
->sock
.fd
);
247 /* was this a peek or stats command? */
248 if (c
->out_job
&& c
->out_job
->r
.state
== Copy
)
249 job_free(c
->out_job
);
251 c
->in_job
= c
->out_job
= NULL
;
254 if (c
->type
& CONN_TYPE_PRODUCER
) cur_producer_ct
--; /* stats */
255 if (c
->type
& CONN_TYPE_WORKER
) cur_worker_ct
--; /* stats */
257 cur_conn_ct
--; /* stats */
259 remove_waiting_conn(c
);
260 if (has_reserved_job(c
))
261 enqueue_reserved_jobs(c
);
265 TUBE_ASSIGN(c
->use
, NULL
);
268 heapremove(&c
->srv
->conns
, c
->tickpos
);