pohmelfs: Use current logging styles.
[pohmelfs.git] / fs / pohmelfs / net.c
blob79079392a06fc886c2903aec877563fb86a6da81
1 /*
2 * Copyright (C) 2011+ Evgeniy Polyakov <zbr@ioremap.net>
3 */
5 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
7 #include <linux/in.h>
8 #include <linux/in6.h>
9 #include <linux/net.h>
11 #include <net/sock.h>
12 #include <net/tcp.h>
14 #include "pohmelfs.h"
16 void *pohmelfs_scratch_buf;
17 int pohmelfs_scratch_buf_size = 4096;
19 void pohmelfs_print_addr(struct sockaddr_storage *addr, const char *fmt, ...)
21 struct sockaddr *sa = (struct sockaddr *)addr;
22 struct va_format vaf;
23 va_list args;
25 va_start(args, fmt);
26 vaf.fmt = fmt;
27 vaf.va = &args;
29 if (sa->sa_family == AF_INET) {
30 struct sockaddr_in *sin = (struct sockaddr_in *)addr;
31 pr_info("%pI4:%d: %pV",
32 &sin->sin_addr.s_addr, ntohs(sin->sin_port), &vaf);
33 } else if (sa->sa_family == AF_INET6) {
34 struct sockaddr_in6 *sin = (struct sockaddr_in6 *)addr;
35 pr_info("%pI6:%d: %pV",
36 &sin->sin6_addr, ntohs(sin->sin6_port), &vaf);
39 va_end(args);
43 * Basic network sending/receiving functions.
44 * Blocked mode is used.
46 int pohmelfs_data_recv(struct pohmelfs_state *st, void *buf, u64 size, unsigned int flags)
48 struct msghdr msg;
49 struct kvec iov;
50 int err;
52 BUG_ON(!size);
54 iov.iov_base = buf;
55 iov.iov_len = size;
57 msg.msg_iov = (struct iovec *)&iov;
58 msg.msg_iovlen = 1;
59 msg.msg_name = NULL;
60 msg.msg_namelen = 0;
61 msg.msg_control = NULL;
62 msg.msg_controllen = 0;
63 msg.msg_flags = flags;
65 err = kernel_recvmsg(st->sock, &msg, &iov, 1, iov.iov_len, msg.msg_flags);
66 if (err < 0)
67 goto err_out_exit;
69 err_out_exit:
70 return err;
73 int pohmelfs_recv(struct pohmelfs_trans *t, struct pohmelfs_state *recv, void *data, int size)
75 int err;
77 err = pohmelfs_data_recv(recv, data, size, MSG_DONTWAIT);
78 if (err < 0)
79 return err;
81 t->io_offset += err;
82 return err;
85 static int pohmelfs_data_send(struct pohmelfs_trans *t)
87 struct msghdr msg;
88 struct iovec io;
89 int err;
91 msg.msg_name = NULL;
92 msg.msg_namelen = 0;
93 msg.msg_control = NULL;
94 msg.msg_controllen = 0;
95 msg.msg_flags = MSG_DONTWAIT;
97 msg.msg_iov = &io;
98 msg.msg_iovlen = 1;
101 if (t->io_offset < t->header_size) {
102 io.iov_base = (void *)(&t->cmd) + t->io_offset;
103 io.iov_len = t->header_size - t->io_offset;
105 err = kernel_sendmsg(t->st->sock, &msg, (struct kvec *)msg.msg_iov, 1, io.iov_len);
106 if (err < 0) {
107 if (err == 0)
108 err = -ECONNRESET;
109 goto err_out_exit;
112 t->io_offset += err;
115 if ((t->io_offset >= t->header_size) && t->data) {
116 size_t sent_size = t->io_offset - t->header_size;
117 io.iov_base = t->data + sent_size;
118 io.iov_len = t->data_size - sent_size;
120 err = kernel_sendmsg(t->st->sock, &msg, (struct kvec *)msg.msg_iov, 1, io.iov_len);
121 if (err < 0) {
122 if (err == 0)
123 err = -ECONNRESET;
124 goto err_out_exit;
127 t->io_offset += err;
131 err = 0;
133 err_out_exit:
134 return err;
137 static int pohmelfs_page_send(struct pohmelfs_trans *t)
139 struct pohmelfs_write_ctl *ctl = t->wctl;
140 struct msghdr msg;
141 struct iovec io;
142 unsigned i;
143 int err = -EINVAL;
145 if (t->io_offset < t->header_size) {
146 io.iov_base = (void *)(&t->cmd) + t->io_offset;
147 io.iov_len = t->header_size - t->io_offset;
149 msg.msg_name = NULL;
150 msg.msg_namelen = 0;
151 msg.msg_control = NULL;
152 msg.msg_controllen = 0;
153 msg.msg_flags = MSG_DONTWAIT;
155 msg.msg_iov = &io;
156 msg.msg_iovlen = 1;
158 err = kernel_sendmsg(t->st->sock, &msg, (struct kvec *)msg.msg_iov, 1, io.iov_len);
159 if (err < 0) {
160 if (err == 0)
161 err = -ECONNRESET;
162 goto err_out_exit;
165 t->io_offset += err;
168 if (t->io_offset >= t->header_size) {
169 size_t skip_offset = 0;
170 size_t size = le64_to_cpu(t->cmd.cmd.size) + sizeof(struct dnet_cmd) - t->io_offset;
171 size_t current_io_offset = t->io_offset - t->header_size;
173 for (i = 0; i < pagevec_count(&ctl->pvec); ++i) {
174 struct page *page = ctl->pvec.pages[i];
175 size_t sz = PAGE_CACHE_SIZE;
177 if (sz > size)
178 sz = size;
180 if (current_io_offset > skip_offset + sz) {
181 skip_offset += sz;
182 continue;
185 sz -= current_io_offset - skip_offset;
187 err = kernel_sendpage(t->st->sock, page, current_io_offset - skip_offset, sz, MSG_DONTWAIT);
189 pr_debug("%s: %d/%d: total-size: %llu, io-offset: %llu, rest-size: %zd, current-io: %zd, skip-offset: %zd, sz: %zu: %d\n",
190 pohmelfs_dump_id(pohmelfs_inode(t->inode)->id.id),
191 i, pagevec_count(&ctl->pvec),
192 (unsigned long long)le64_to_cpu(t->cmd.cmd.size) + sizeof(struct dnet_cmd),
193 t->io_offset, size, current_io_offset,
194 skip_offset, sz, err);
196 if (err <= 0) {
197 if (err == 0)
198 err = -ECONNRESET;
199 goto err_out_exit;
202 current_io_offset += err;
203 skip_offset = current_io_offset;
204 size -= err;
205 t->io_offset += err;
207 err = 0;
211 err_out_exit:
212 return err;
216 * Polling machinery.
219 struct pohmelfs_poll_helper {
220 poll_table pt;
221 struct pohmelfs_state *st;
224 static int pohmelfs_queue_wake(wait_queue_t *wait, unsigned mode, int sync, void *key)
226 struct pohmelfs_state *st = container_of(wait, struct pohmelfs_state, wait);
228 if (!st->conn->need_exit)
229 queue_work(st->conn->wq, &st->io_work);
230 return 0;
233 static void pohmelfs_queue_func(struct file *file, wait_queue_head_t *whead, poll_table *pt)
235 struct pohmelfs_state *st = container_of(pt, struct pohmelfs_poll_helper, pt)->st;
237 st->whead = whead;
239 init_waitqueue_func_entry(&st->wait, pohmelfs_queue_wake);
240 add_wait_queue(whead, &st->wait);
243 static void pohmelfs_poll_exit(struct pohmelfs_state *st)
245 if (st->whead) {
246 remove_wait_queue(st->whead, &st->wait);
247 st->whead = NULL;
251 static int pohmelfs_poll_init(struct pohmelfs_state *st)
253 struct pohmelfs_poll_helper ph;
255 ph.st = st;
256 init_poll_funcptr(&ph.pt, &pohmelfs_queue_func);
258 st->sock->ops->poll(NULL, st->sock, &ph.pt);
259 return 0;
262 static int pohmelfs_revents(struct pohmelfs_state *st, unsigned mask)
264 unsigned revents;
266 revents = st->sock->ops->poll(NULL, st->sock, NULL);
267 if (revents & mask)
268 return 0;
270 if (revents & (POLLERR | POLLHUP | POLLNVAL | POLLRDHUP | POLLREMOVE)) {
271 pohmelfs_print_addr(&st->sa, "error revents: %x\n", revents);
272 return -ECONNRESET;
275 return -EAGAIN;
278 static int pohmelfs_state_send(struct pohmelfs_state *st)
280 struct pohmelfs_trans *t = NULL;
281 int trans_put = 0;
282 size_t size;
283 int err = -EAGAIN;
285 mutex_lock(&st->trans_lock);
286 if (!list_empty(&st->trans_list))
287 t = list_first_entry(&st->trans_list, struct pohmelfs_trans, trans_entry);
288 mutex_unlock(&st->trans_lock);
290 if (!t)
291 goto err_out_exit;
293 err = pohmelfs_revents(st, POLLOUT);
294 if (err)
295 goto err_out_exit;
297 size = le64_to_cpu(t->cmd.cmd.size) + sizeof(struct dnet_cmd);
298 pr_debug("%s: starting sending: %llu/%zd\n",
299 pohmelfs_dump_id(pohmelfs_inode(t->inode)->id.id),
300 t->io_offset, size);
302 if (t->wctl)
303 err = pohmelfs_page_send(t);
304 else
305 err = pohmelfs_data_send(t);
307 pr_debug("%s: sent: %llu/%zd: %d\n",
308 pohmelfs_dump_id(pohmelfs_inode(t->inode)->id.id),
309 t->io_offset, size, err);
310 if (!err && (t->io_offset == size)) {
311 mutex_lock(&st->trans_lock);
312 list_del_init(&t->trans_entry);
313 err = pohmelfs_trans_insert_tree(st, t);
314 if (err)
315 trans_put = 1;
316 t->io_offset = 0;
317 mutex_unlock(&st->trans_lock);
320 BUG_ON(t->io_offset > size);
322 if (trans_put)
323 pohmelfs_trans_put(t);
325 if ((err < 0) && (err != -EAGAIN))
326 goto err_out_exit;
328 err_out_exit:
329 return err;
332 static void pohmelfs_suck_scratch(struct pohmelfs_state *st)
334 struct dnet_cmd *cmd = &st->cmd;
335 int err = 0;
337 pr_debug("%llu\n", (unsigned long long)cmd->size);
339 while (cmd->size) {
340 int sz = pohmelfs_scratch_buf_size;
342 if (cmd->size < sz)
343 sz = cmd->size;
345 err = pohmelfs_data_recv(st, pohmelfs_scratch_buf, sz, MSG_WAITALL);
346 if (err < 0) {
347 pohmelfs_print_addr(&st->sa, "recv-scratch err: %d\n", err);
348 goto err_out_exit;
351 cmd->size -= err;
354 err_out_exit:
355 st->cmd_read = 1;
358 static int pohmelfs_state_recv(struct pohmelfs_state *st)
360 struct dnet_cmd *cmd = &st->cmd;
361 struct pohmelfs_trans *t;
362 unsigned long long trans;
363 int err;
365 err = pohmelfs_revents(st, POLLIN);
366 if (err)
367 goto err_out_exit;
369 if (st->cmd_read) {
370 err = pohmelfs_data_recv(st, cmd, sizeof(struct dnet_cmd), MSG_WAITALL);
371 if (err <= 0) {
372 if (err == 0)
373 err = -ECONNRESET;
375 pohmelfs_print_addr(&st->sa, "recv error: %d\n", err);
376 goto err_out_exit;
379 dnet_convert_cmd(cmd);
381 trans = cmd->trans & ~DNET_TRANS_REPLY;
382 st->cmd_read = 0;
385 t = pohmelfs_trans_lookup(st, cmd);
386 if (!t) {
387 pohmelfs_suck_scratch(st);
389 err = 0;
390 goto err_out_exit;
392 if (cmd->size && (t->io_offset != cmd->size)) {
393 err = t->cb.recv_reply(t, st);
394 if (err && (err != -EAGAIN)) {
395 pohmelfs_print_addr(&st->sa, "recv-reply error: %d\n", err);
396 goto err_out_remove;
399 if (t->io_offset != cmd->size)
400 goto err_out_put;
403 err = t->cb.complete(t, st);
404 if (err) {
405 pohmelfs_print_addr(&st->sa, "recv-complete err: %d\n", err);
408 kfree(t->recv_data);
409 t->recv_data = NULL;
410 t->io_offset = 0;
412 err_out_remove:
413 /* only remove and free transaction if there is error or there will be no more replies */
414 if (!(cmd->flags & DNET_FLAGS_MORE) || err) {
415 pohmelfs_trans_remove(t);
418 * refcnt was grabbed twice:
419 * in pohmelfs_trans_lookup()
420 * and at transaction creation
422 pohmelfs_trans_put(t);
424 st->cmd_read = 1;
425 if (err) {
426 cmd->size -= t->io_offset;
427 t->io_offset = 0;
430 err_out_put:
431 pohmelfs_trans_put(t);
432 err_out_exit:
433 return err;
436 static void pohmelfs_state_io_work(struct work_struct *work)
438 struct pohmelfs_state *st = container_of(work, struct pohmelfs_state, io_work);
439 int send_err, recv_err;
441 send_err = recv_err = -EAGAIN;
442 while (!st->conn->psb->need_exit) {
443 send_err = pohmelfs_state_send(st);
444 if (send_err && (send_err != -EAGAIN)) {
445 pohmelfs_print_addr(&st->sa, "state send error: %d\n", send_err);
446 goto err_out_exit;
449 recv_err = pohmelfs_state_recv(st);
450 if (recv_err && (recv_err != -EAGAIN)) {
451 pohmelfs_print_addr(&st->sa, "state recv error: %d\n", recv_err);
452 goto err_out_exit;
455 if ((send_err == -EAGAIN) && (recv_err == -EAGAIN))
456 break;
459 err_out_exit:
460 if ((send_err && (send_err != -EAGAIN)) || (recv_err && (recv_err != -EAGAIN))) {
461 pohmelfs_state_add_reconnect(st);
463 return;
466 struct pohmelfs_state *pohmelfs_addr_exist(struct pohmelfs_connection *conn, struct sockaddr_storage *sa, int addrlen)
468 struct pohmelfs_state *st;
470 list_for_each_entry(st, &conn->state_list, state_entry) {
471 if (st->addrlen != addrlen)
472 continue;
474 if (!memcmp(&st->sa, sa, addrlen)) {
475 return st;
479 return 0;
482 struct pohmelfs_state *pohmelfs_state_create(struct pohmelfs_connection *conn, struct sockaddr_storage *sa, int addrlen,
483 int ask_route, int group_id)
485 int err = 0;
486 struct pohmelfs_state *st;
487 struct sockaddr *addr = (struct sockaddr *)sa;
489 /* early check - this state can be inserted into route table, no need to create state and check again */
490 spin_lock(&conn->state_lock);
491 if (pohmelfs_addr_exist(conn, sa, addrlen))
492 err = -EEXIST;
493 spin_unlock(&conn->state_lock);
495 if (err)
496 goto err_out_exit;
498 st = kzalloc(sizeof(struct pohmelfs_state), GFP_KERNEL);
499 if (!st) {
500 err = -ENOMEM;
501 goto err_out_exit;
504 st->conn = conn;
505 mutex_init(&st->trans_lock);
506 INIT_LIST_HEAD(&st->trans_list);
507 st->trans_root = RB_ROOT;
509 st->group_id = group_id;
511 kref_init(&st->refcnt);
513 INIT_WORK(&st->io_work, pohmelfs_state_io_work);
515 st->cmd_read = 1;
517 err = sock_create_kern(addr->sa_family, SOCK_STREAM, IPPROTO_TCP, &st->sock);
518 if (err) {
519 pohmelfs_print_addr(sa, "sock_create: failed family: %d, err: %d\n", addr->sa_family, err);
520 goto err_out_free;
523 st->sock->sk->sk_allocation = GFP_NOIO;
524 st->sock->sk->sk_sndtimeo = st->sock->sk->sk_rcvtimeo = msecs_to_jiffies(60000);
526 err = 1;
527 sock_setsockopt(st->sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&err, 4);
529 tcp_setsockopt(st->sock->sk, SOL_TCP, TCP_KEEPIDLE, (char *)&conn->psb->keepalive_idle, 4);
530 tcp_setsockopt(st->sock->sk, SOL_TCP, TCP_KEEPINTVL, (char *)&conn->psb->keepalive_interval, 4);
531 tcp_setsockopt(st->sock->sk, SOL_TCP, TCP_KEEPCNT, (char *)&conn->psb->keepalive_cnt, 4);
533 err = kernel_connect(st->sock, (struct sockaddr *)addr, addrlen, 0);
534 if (err) {
535 pohmelfs_print_addr(sa, "kernel_connect: failed family: %d, err: %d\n", addr->sa_family, err);
536 goto err_out_release;
538 st->sock->sk->sk_sndtimeo = st->sock->sk->sk_rcvtimeo = msecs_to_jiffies(60000);
540 memcpy(&st->sa, sa, sizeof(struct sockaddr_storage));
541 st->addrlen = addrlen;
543 err = pohmelfs_poll_init(st);
544 if (err)
545 goto err_out_shutdown;
548 spin_lock(&conn->state_lock);
549 err = -EEXIST;
550 if (!pohmelfs_addr_exist(conn, sa, addrlen)) {
551 list_add_tail(&st->state_entry, &conn->state_list);
552 err = 0;
554 spin_unlock(&conn->state_lock);
556 if (err)
557 goto err_out_poll_exit;
559 if (ask_route) {
560 err = pohmelfs_route_request(st);
561 if (err)
562 goto err_out_poll_exit;
565 pohmelfs_print_addr(sa, "%d: connected\n", st->conn->idx);
567 return st;
569 err_out_poll_exit:
570 pohmelfs_poll_exit(st);
571 err_out_shutdown:
572 st->sock->ops->shutdown(st->sock, 2);
573 err_out_release:
574 sock_release(st->sock);
575 err_out_free:
576 kfree(st);
577 err_out_exit:
578 if (err != -EEXIST) {
579 pohmelfs_print_addr(sa, "state creation failed: %d\n", err);
581 return ERR_PTR(err);
584 static void pohmelfs_state_exit(struct pohmelfs_state *st)
586 if (!st->sock)
587 return;
589 pohmelfs_poll_exit(st);
590 st->sock->ops->shutdown(st->sock, 2);
592 pohmelfs_print_addr(&st->sa, "disconnected\n");
593 sock_release(st->sock);
596 static void pohmelfs_state_release(struct kref *kref)
598 struct pohmelfs_state *st = container_of(kref, struct pohmelfs_state, refcnt);
599 pohmelfs_state_exit(st);
602 void pohmelfs_state_put(struct pohmelfs_state *st)
604 kref_put(&st->refcnt, pohmelfs_state_release);
607 static void pohmelfs_state_clean(struct pohmelfs_state *st)
609 struct pohmelfs_trans *t, *tmp;
611 pohmelfs_route_remove_all(st);
613 mutex_lock(&st->trans_lock);
614 list_for_each_entry_safe(t, tmp, &st->trans_list, trans_entry) {
615 list_del(&t->trans_entry);
617 pohmelfs_trans_put(t);
620 while (1) {
621 struct rb_node *n = rb_first(&st->trans_root);
622 if (!n)
623 break;
625 t = rb_entry(n, struct pohmelfs_trans, trans_node);
627 rb_erase(&t->trans_node, &st->trans_root);
628 pohmelfs_trans_put(t);
630 mutex_unlock(&st->trans_lock);
632 cancel_work_sync(&st->io_work);
635 void pohmelfs_state_kill(struct pohmelfs_state *st)
637 BUG_ON(!list_empty(&st->state_entry));
639 pohmelfs_state_clean(st);
640 pohmelfs_state_put(st);
643 void pohmelfs_state_schedule(struct pohmelfs_state *st)
645 if (!st->conn->need_exit)
646 queue_work(st->conn->wq, &st->io_work);
649 int pohmelfs_state_add_reconnect(struct pohmelfs_state *st)
651 struct pohmelfs_connection *conn = st->conn;
652 struct pohmelfs_reconnect *r, *tmp;
653 int err = 0;
655 pohmelfs_route_remove_all(st);
657 r = kzalloc(sizeof(struct pohmelfs_reconnect), GFP_NOIO);
658 if (!r) {
659 err = -ENOMEM;
660 goto err_out_exit;
663 memcpy(&r->sa, &st->sa, sizeof(struct sockaddr_storage));
664 r->addrlen = st->addrlen;
665 r->group_id = st->group_id;
667 mutex_lock(&conn->reconnect_lock);
668 list_for_each_entry(tmp, &conn->reconnect_list, reconnect_entry) {
669 if (tmp->addrlen != r->addrlen)
670 continue;
672 if (memcmp(&tmp->sa, &r->sa, r->addrlen))
673 continue;
675 err = -EEXIST;
676 break;
679 if (!err) {
680 list_add_tail(&r->reconnect_entry, &conn->reconnect_list);
682 mutex_unlock(&conn->reconnect_lock);
684 if (err)
685 goto err_out_free;
687 pohmelfs_print_addr(&st->sa, "reconnection added\n");
688 err = 0;
689 goto err_out_exit;
691 err_out_free:
692 kfree(r);
693 err_out_exit:
695 spin_lock(&conn->state_lock);
696 list_move(&st->state_entry, &conn->kill_state_list);
697 spin_unlock(&conn->state_lock);
699 /* we do not really care if this work will not be processed immediately */
700 queue_delayed_work(conn->wq, &conn->reconnect_work, 0);
702 return err;