Changes in v9fs broke pipesrv. Fix it.
[npfs.git] / libnpfs / conn.c
blobd652298efa1523cb0a1e89a84e10e59e61ecebc6
1 /*
2 * Copyright (C) 2005 by Latchesar Ionkov <lucho@ionkov.net>
4 * Permission is hereby granted, free of charge, to any person obtaining a
5 * copy of this software and associated documentation files (the "Software"),
6 * to deal in the Software without restriction, including without limitation
7 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8 * and/or sell copies of the Software, and to permit persons to whom the
9 * Software is furnished to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice (including the next
12 * paragraph) shall be included in all copies or substantial portions of the
13 * Software.
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
18 * LATCHESAR IONKOV AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
19 * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
20 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21 * DEALINGS IN THE SOFTWARE.
23 #include <stdlib.h>
24 #include <stdio.h>
25 #include <string.h>
26 #include <errno.h>
27 #include <assert.h>
28 #include "npfs.h"
29 #include "npfsimpl.h"
31 extern int printfcall(FILE *f, Npfcall *fc, int dotu);
33 static Npfcall *np_conn_new_incall(Npconn *conn);
34 static void np_conn_free_incall(Npconn *, Npfcall *, int);
35 static void *np_conn_read_proc(void *);
37 Npconn*
38 np_conn_create(Npsrv *srv, Nptrans *trans)
40 Npconn *conn;
42 conn = malloc(sizeof(*conn));
43 if (!conn)
44 return NULL;
46 //fprintf(stderr, "np_conn_create %p\n", conn);
47 pthread_mutex_init(&conn->lock, NULL);
48 pthread_mutex_init(&conn->wlock, NULL);
49 pthread_cond_init(&conn->resetcond, NULL);
50 pthread_cond_init(&conn->resetdonecond, NULL);
51 conn->refcount = 0;
52 conn->resetting = 0;
53 conn->srv = srv;
54 conn->msize = srv->msize;
55 conn->dotu = srv->dotu;
56 conn->shutdown = 0;
57 conn->fidpool = np_fidpool_create();
58 conn->trans = trans;
59 conn->aux = NULL;
60 conn->freercnum = 0;
61 conn->freerclist = NULL;
62 np_srv_add_conn(srv, conn);
64 pthread_create(&conn->rthread, NULL, np_conn_read_proc, conn);
65 return conn;
68 void
69 np_conn_incref(Npconn *conn)
71 pthread_mutex_lock(&conn->lock);
72 conn->refcount++;
73 pthread_mutex_unlock(&conn->lock);
76 void
77 np_conn_decref(Npconn *conn)
79 Npfcall *fc, *fc1;
81 pthread_mutex_lock(&conn->lock);
82 assert(conn->refcount > 0);
83 conn->refcount--;
84 if (conn->refcount) {
85 pthread_mutex_unlock(&conn->lock);
86 return;
89 if (conn->fidpool) {
90 np_fidpool_destroy(conn->fidpool);
91 conn->fidpool = NULL;
94 fc = conn->freerclist;
95 conn->freerclist = NULL;
96 while (fc != NULL) {
97 fc1 = fc->next;
98 free(fc);
99 fc = fc1;
102 pthread_mutex_unlock(&conn->lock);
103 pthread_mutex_destroy(&conn->lock);
104 pthread_cond_destroy(&conn->resetcond);
105 pthread_cond_destroy(&conn->resetdonecond);
106 free(conn);
109 static void *
110 np_conn_read_proc(void *a)
112 int i, n, size, msize;
113 Npsrv *srv;
114 Npconn *conn;
115 Nptrans *trans;
116 Npreq *req;
117 Npfcall *fc, *fc1;
119 pthread_detach(pthread_self());
120 conn = a;
121 np_conn_incref(conn);
122 srv = conn->srv;
123 msize = conn->msize;
124 fc = np_conn_new_incall(conn);
125 n = 0;
126 while (conn->trans && (i = np_trans_read(conn->trans, fc->pkt + n, msize - n)) > 0) {
127 pthread_mutex_lock(&conn->lock);
128 if (conn->resetting) {
129 pthread_cond_wait(&conn->resetdonecond, &conn->lock);
130 n = 0; /* discard all input */
131 i = 0;
133 pthread_mutex_unlock(&conn->lock);
134 n += i;
136 again:
137 if (n < 4)
138 continue;
140 size = fc->pkt[0] | (fc->pkt[1]<<8) | (fc->pkt[2]<<16) | (fc->pkt[3]<<24);
141 if (n < size)
142 continue;
144 if (!np_deserialize(fc, fc->pkt, conn->dotu))
145 break;
147 if (conn->srv->debuglevel) {
148 fprintf(stderr, "<<< (%p) ", conn);
149 np_printfcall(stderr, fc, conn->dotu);
150 fprintf(stderr, "\n");
153 fc1 = np_conn_new_incall(conn);
154 if (n > size)
155 memmove(fc1->pkt, fc->pkt + size, n - size);
156 n -= size;
158 req = np_req_alloc(conn, fc);
159 pthread_mutex_lock(&srv->lock);
160 if (!conn->resetting)
161 np_srv_add_req(srv, req);
162 else
163 np_req_unref(req);
164 pthread_mutex_unlock(&srv->lock);
165 fc = fc1;
166 if (n > 0)
167 goto again;
171 pthread_mutex_lock(&conn->lock);
172 trans = conn->trans;
173 conn->trans = NULL;
174 np_conn_free_incall(conn, fc, 0);
175 pthread_mutex_unlock(&conn->lock);
177 np_srv_remove_conn(conn->srv, conn);
178 np_conn_reset(conn, 0, 0);
180 if (trans)
181 np_trans_destroy(trans);
183 np_conn_decref(conn);
184 return NULL;
187 void
188 np_conn_reset(Npconn *conn, u32 msize, int dotu)
190 int i, n;
191 Npsrv *srv;
192 Npreq *req, *req1, *preqs, **reqs;
193 Npfcall *fc, *fc1;
195 pthread_mutex_lock(&conn->lock);
196 conn->resetting = 1;
197 pthread_mutex_unlock(&conn->lock);
199 pthread_mutex_lock(&conn->srv->lock);
200 srv = conn->srv;
201 // first flush all outstanding requests
202 preqs = NULL;
203 req = srv->reqs_first;
204 while (req != NULL) {
205 req1 = req->next;
206 if (req->conn == conn) {
207 np_srv_remove_req(srv, req);
208 req->next = preqs;
209 preqs = req;
211 req = req1;
214 // then flush all working requests
215 n = 0;
216 req = conn->srv->workreqs;
217 while (req != NULL) {
218 if (req->conn == conn && (msize==0 || req->tcall->type != Tversion))
219 n++;
221 req = req->next;
224 reqs = malloc(n * sizeof(Npreq *));
225 n = 0;
226 req = conn->srv->workreqs;
227 while (req != NULL) {
228 if (req->conn == conn && (msize==0 || req->tcall->type != Tversion))
229 reqs[n++] = np_req_ref(req);
230 req = req->next;
232 pthread_mutex_unlock(&conn->srv->lock);
234 req = preqs;
235 while (req != NULL) {
236 req1 = req->next;
237 np_conn_respond(req);
238 np_req_unref(req);
239 req = req1;
242 for(i = 0; i < n; i++) {
243 req = reqs[i];
244 if (req->conn->srv->flush)
245 (*req->conn->srv->flush)(req);
248 /* wait until all working requests finish */
250 pthread_mutex_lock(&conn->lock);
251 while (1) {
252 for(i = 0; i < n; i++)
253 if (!reqs[i]->responded)
254 break;
256 if (i >= n)
257 break;
259 pthread_cond_wait(&conn->resetcond, &conn->lock);
262 pthread_mutex_lock(&srv->lock);
263 while (1) {
264 for(req = srv->workreqs; req != NULL; req = req->next)
265 if (req->conn == conn && (msize==0 || req->tcall->type != Tversion))
266 break;
268 if (req == NULL)
269 break;
271 pthread_cond_wait(&conn->resetcond, &srv->lock);
273 pthread_mutex_unlock(&srv->lock);
275 /* free old pool of fcalls */
276 fc = conn->freerclist;
277 conn->freerclist = NULL;
278 while (fc != NULL) {
279 fc1 = fc->next;
280 free(fc);
281 fc = fc1;
284 if (conn->fidpool) {
285 np_fidpool_destroy(conn->fidpool);
286 conn->fidpool = NULL;
289 if (msize) {
290 conn->dotu = dotu;
291 conn->resetting = 0;
292 conn->fidpool = np_fidpool_create();
293 pthread_cond_broadcast(&conn->resetdonecond);
295 conn->resetting = 0;
296 pthread_mutex_unlock(&conn->lock);
298 /* free the working requests */
299 for(i = 0; i < n; i++)
300 np_req_unref(reqs[i]);
301 free(reqs);
304 void
305 np_conn_shutdown(Npconn *conn)
307 Nptrans *trans;
309 pthread_mutex_lock(&conn->lock);
310 trans = conn->trans;
311 conn->trans = NULL;
312 pthread_mutex_unlock(&conn->lock);
314 if (trans)
315 np_trans_destroy(trans);
318 void
319 np_conn_respond(Npreq *req)
321 int n, send;
322 Npconn *conn;
323 Nptrans *trans;
324 Npfcall *rc;
326 trans = NULL;
327 conn = req->conn;
328 rc = req->rcall;
329 if (!rc)
330 goto done;
332 pthread_mutex_lock(&conn->lock);
333 send = conn->trans && !conn->resetting;
334 pthread_mutex_unlock(&conn->lock);
336 if (send) {
337 pthread_mutex_lock(&conn->wlock);
338 if (conn->srv->debuglevel) {
339 fprintf(stderr, ">>> (%p) ", conn);
340 np_printfcall(stderr, rc, conn->dotu);
341 fprintf(stderr, "\n");
343 n = np_trans_write(conn->trans, rc->pkt, rc->size);
344 pthread_mutex_unlock(&conn->wlock);
346 if (n <= 0) {
347 pthread_mutex_lock(&conn->lock);
348 trans = conn->trans;
349 conn->trans = NULL;
350 pthread_mutex_unlock(&conn->lock);
354 done:
355 np_conn_free_incall(req->conn, req->tcall, 1);
356 free(req->rcall);
357 req->tcall = NULL;
358 req->rcall = NULL;
360 if (conn->resetting) {
361 pthread_mutex_lock(&conn->srv->lock);
362 pthread_cond_broadcast(&conn->resetcond);
363 pthread_mutex_unlock(&conn->srv->lock);
366 if (trans)
367 np_trans_destroy(trans); /* np_conn_read_proc will take care of resetting */
370 static Npfcall *
371 np_conn_new_incall(Npconn *conn)
373 Npfcall *fc;
375 pthread_mutex_lock(&conn->lock);
376 // if (!conn->trans) {
377 // pthread_mutex_unlock(&conn->lock);
378 // return NULL;
379 // }
381 if (conn->freerclist) {
382 fc = conn->freerclist;
383 conn->freerclist = fc->next;
384 conn->freercnum--;
385 } else {
386 fc = malloc(sizeof(*fc) + conn->msize);
389 if (!fc) {
390 pthread_mutex_unlock(&conn->lock);
391 return NULL;
394 fc->pkt = (u8*) fc + sizeof(*fc);
395 pthread_mutex_unlock(&conn->lock);
397 return fc;
400 static void
401 np_conn_free_incall(Npconn* conn, Npfcall *rc, int lock)
403 if (!rc)
404 return;
406 if (lock)
407 pthread_mutex_lock(&conn->lock);
409 if (conn->freercnum < 64) {
410 rc->next = conn->freerclist;
411 conn->freerclist = rc;
412 rc = NULL;
415 if (lock)
416 pthread_mutex_unlock(&conn->lock);
418 if (rc)
419 free(rc);