2 * Copyright (C) 2011+ Evgeniy Polyakov <zbr@ioremap.net>
5 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
16 void *pohmelfs_scratch_buf
;
17 int pohmelfs_scratch_buf_size
= 4096;
19 void pohmelfs_print_addr(struct sockaddr_storage
*addr
, const char *fmt
, ...)
21 struct sockaddr
*sa
= (struct sockaddr
*)addr
;
29 if (sa
->sa_family
== AF_INET
) {
30 struct sockaddr_in
*sin
= (struct sockaddr_in
*)addr
;
31 pr_info("%pI4:%d: %pV",
32 &sin
->sin_addr
.s_addr
, ntohs(sin
->sin_port
), &vaf
);
33 } else if (sa
->sa_family
== AF_INET6
) {
34 struct sockaddr_in6
*sin
= (struct sockaddr_in6
*)addr
;
35 pr_info("%pI6:%d: %pV",
36 &sin
->sin6_addr
, ntohs(sin
->sin6_port
), &vaf
);
43 * Basic network sending/receiving functions.
44 * Blocked mode is used.
46 int pohmelfs_data_recv(struct pohmelfs_state
*st
, void *buf
, u64 size
, unsigned int flags
)
57 msg
.msg_iov
= (struct iovec
*)&iov
;
61 msg
.msg_control
= NULL
;
62 msg
.msg_controllen
= 0;
63 msg
.msg_flags
= flags
;
65 err
= kernel_recvmsg(st
->sock
, &msg
, &iov
, 1, iov
.iov_len
, msg
.msg_flags
);
73 int pohmelfs_recv(struct pohmelfs_trans
*t
, struct pohmelfs_state
*recv
, void *data
, int size
)
77 err
= pohmelfs_data_recv(recv
, data
, size
, MSG_DONTWAIT
);
85 static int pohmelfs_data_send(struct pohmelfs_trans
*t
)
93 msg
.msg_control
= NULL
;
94 msg
.msg_controllen
= 0;
95 msg
.msg_flags
= MSG_DONTWAIT
;
101 if (t
->io_offset
< t
->header_size
) {
102 io
.iov_base
= (void *)(&t
->cmd
) + t
->io_offset
;
103 io
.iov_len
= t
->header_size
- t
->io_offset
;
105 err
= kernel_sendmsg(t
->st
->sock
, &msg
, (struct kvec
*)msg
.msg_iov
, 1, io
.iov_len
);
115 if ((t
->io_offset
>= t
->header_size
) && t
->data
) {
116 size_t sent_size
= t
->io_offset
- t
->header_size
;
117 io
.iov_base
= t
->data
+ sent_size
;
118 io
.iov_len
= t
->data_size
- sent_size
;
120 err
= kernel_sendmsg(t
->st
->sock
, &msg
, (struct kvec
*)msg
.msg_iov
, 1, io
.iov_len
);
137 static int pohmelfs_page_send(struct pohmelfs_trans
*t
)
139 struct pohmelfs_write_ctl
*ctl
= t
->wctl
;
145 if (t
->io_offset
< t
->header_size
) {
146 io
.iov_base
= (void *)(&t
->cmd
) + t
->io_offset
;
147 io
.iov_len
= t
->header_size
- t
->io_offset
;
151 msg
.msg_control
= NULL
;
152 msg
.msg_controllen
= 0;
153 msg
.msg_flags
= MSG_DONTWAIT
;
158 err
= kernel_sendmsg(t
->st
->sock
, &msg
, (struct kvec
*)msg
.msg_iov
, 1, io
.iov_len
);
168 if (t
->io_offset
>= t
->header_size
) {
169 size_t skip_offset
= 0;
170 size_t size
= le64_to_cpu(t
->cmd
.cmd
.size
) + sizeof(struct dnet_cmd
) - t
->io_offset
;
171 size_t current_io_offset
= t
->io_offset
- t
->header_size
;
173 for (i
= 0; i
< pagevec_count(&ctl
->pvec
); ++i
) {
174 struct page
*page
= ctl
->pvec
.pages
[i
];
175 size_t sz
= PAGE_CACHE_SIZE
;
180 if (current_io_offset
> skip_offset
+ sz
) {
185 sz
-= current_io_offset
- skip_offset
;
187 err
= kernel_sendpage(t
->st
->sock
, page
, current_io_offset
- skip_offset
, sz
, MSG_DONTWAIT
);
189 pr_debug("%s: %d/%d: total-size: %llu, io-offset: %llu, rest-size: %zd, current-io: %zd, skip-offset: %zd, sz: %zu: %d\n",
190 pohmelfs_dump_id(pohmelfs_inode(t
->inode
)->id
.id
),
191 i
, pagevec_count(&ctl
->pvec
),
192 (unsigned long long)le64_to_cpu(t
->cmd
.cmd
.size
) + sizeof(struct dnet_cmd
),
193 t
->io_offset
, size
, current_io_offset
,
194 skip_offset
, sz
, err
);
202 current_io_offset
+= err
;
203 skip_offset
= current_io_offset
;
219 struct pohmelfs_poll_helper
{
221 struct pohmelfs_state
*st
;
224 static int pohmelfs_queue_wake(wait_queue_t
*wait
, unsigned mode
, int sync
, void *key
)
226 struct pohmelfs_state
*st
= container_of(wait
, struct pohmelfs_state
, wait
);
228 if (!st
->conn
->need_exit
)
229 queue_work(st
->conn
->wq
, &st
->io_work
);
233 static void pohmelfs_queue_func(struct file
*file
, wait_queue_head_t
*whead
, poll_table
*pt
)
235 struct pohmelfs_state
*st
= container_of(pt
, struct pohmelfs_poll_helper
, pt
)->st
;
239 init_waitqueue_func_entry(&st
->wait
, pohmelfs_queue_wake
);
240 add_wait_queue(whead
, &st
->wait
);
243 static void pohmelfs_poll_exit(struct pohmelfs_state
*st
)
246 remove_wait_queue(st
->whead
, &st
->wait
);
251 static int pohmelfs_poll_init(struct pohmelfs_state
*st
)
253 struct pohmelfs_poll_helper ph
;
256 init_poll_funcptr(&ph
.pt
, &pohmelfs_queue_func
);
258 st
->sock
->ops
->poll(NULL
, st
->sock
, &ph
.pt
);
262 static int pohmelfs_revents(struct pohmelfs_state
*st
, unsigned mask
)
266 revents
= st
->sock
->ops
->poll(NULL
, st
->sock
, NULL
);
270 if (revents
& (POLLERR
| POLLHUP
| POLLNVAL
| POLLRDHUP
| POLLREMOVE
)) {
271 pohmelfs_print_addr(&st
->sa
, "error revents: %x\n", revents
);
278 static int pohmelfs_state_send(struct pohmelfs_state
*st
)
280 struct pohmelfs_trans
*t
= NULL
;
285 mutex_lock(&st
->trans_lock
);
286 if (!list_empty(&st
->trans_list
))
287 t
= list_first_entry(&st
->trans_list
, struct pohmelfs_trans
, trans_entry
);
288 mutex_unlock(&st
->trans_lock
);
293 err
= pohmelfs_revents(st
, POLLOUT
);
297 size
= le64_to_cpu(t
->cmd
.cmd
.size
) + sizeof(struct dnet_cmd
);
298 pr_debug("%s: starting sending: %llu/%zd\n",
299 pohmelfs_dump_id(pohmelfs_inode(t
->inode
)->id
.id
),
303 err
= pohmelfs_page_send(t
);
305 err
= pohmelfs_data_send(t
);
307 pr_debug("%s: sent: %llu/%zd: %d\n",
308 pohmelfs_dump_id(pohmelfs_inode(t
->inode
)->id
.id
),
309 t
->io_offset
, size
, err
);
310 if (!err
&& (t
->io_offset
== size
)) {
311 mutex_lock(&st
->trans_lock
);
312 list_del_init(&t
->trans_entry
);
313 err
= pohmelfs_trans_insert_tree(st
, t
);
317 mutex_unlock(&st
->trans_lock
);
320 BUG_ON(t
->io_offset
> size
);
323 pohmelfs_trans_put(t
);
325 if ((err
< 0) && (err
!= -EAGAIN
))
332 static void pohmelfs_suck_scratch(struct pohmelfs_state
*st
)
334 struct dnet_cmd
*cmd
= &st
->cmd
;
337 pr_debug("%llu\n", (unsigned long long)cmd
->size
);
340 int sz
= pohmelfs_scratch_buf_size
;
345 err
= pohmelfs_data_recv(st
, pohmelfs_scratch_buf
, sz
, MSG_WAITALL
);
347 pohmelfs_print_addr(&st
->sa
, "recv-scratch err: %d\n", err
);
358 static int pohmelfs_state_recv(struct pohmelfs_state
*st
)
360 struct dnet_cmd
*cmd
= &st
->cmd
;
361 struct pohmelfs_trans
*t
;
362 unsigned long long trans
;
365 err
= pohmelfs_revents(st
, POLLIN
);
370 err
= pohmelfs_data_recv(st
, cmd
, sizeof(struct dnet_cmd
), MSG_WAITALL
);
375 pohmelfs_print_addr(&st
->sa
, "recv error: %d\n", err
);
379 dnet_convert_cmd(cmd
);
381 trans
= cmd
->trans
& ~DNET_TRANS_REPLY
;
385 t
= pohmelfs_trans_lookup(st
, cmd
);
387 pohmelfs_suck_scratch(st
);
392 if (cmd
->size
&& (t
->io_offset
!= cmd
->size
)) {
393 err
= t
->cb
.recv_reply(t
, st
);
394 if (err
&& (err
!= -EAGAIN
)) {
395 pohmelfs_print_addr(&st
->sa
, "recv-reply error: %d\n", err
);
399 if (t
->io_offset
!= cmd
->size
)
403 err
= t
->cb
.complete(t
, st
);
405 pohmelfs_print_addr(&st
->sa
, "recv-complete err: %d\n", err
);
413 /* only remove and free transaction if there is error or there will be no more replies */
414 if (!(cmd
->flags
& DNET_FLAGS_MORE
) || err
) {
415 pohmelfs_trans_remove(t
);
418 * refcnt was grabbed twice:
419 * in pohmelfs_trans_lookup()
420 * and at transaction creation
422 pohmelfs_trans_put(t
);
426 cmd
->size
-= t
->io_offset
;
431 pohmelfs_trans_put(t
);
436 static void pohmelfs_state_io_work(struct work_struct
*work
)
438 struct pohmelfs_state
*st
= container_of(work
, struct pohmelfs_state
, io_work
);
439 int send_err
, recv_err
;
441 send_err
= recv_err
= -EAGAIN
;
442 while (!st
->conn
->psb
->need_exit
) {
443 send_err
= pohmelfs_state_send(st
);
444 if (send_err
&& (send_err
!= -EAGAIN
)) {
445 pohmelfs_print_addr(&st
->sa
, "state send error: %d\n", send_err
);
449 recv_err
= pohmelfs_state_recv(st
);
450 if (recv_err
&& (recv_err
!= -EAGAIN
)) {
451 pohmelfs_print_addr(&st
->sa
, "state recv error: %d\n", recv_err
);
455 if ((send_err
== -EAGAIN
) && (recv_err
== -EAGAIN
))
460 if ((send_err
&& (send_err
!= -EAGAIN
)) || (recv_err
&& (recv_err
!= -EAGAIN
))) {
461 pohmelfs_state_add_reconnect(st
);
466 struct pohmelfs_state
*pohmelfs_addr_exist(struct pohmelfs_connection
*conn
, struct sockaddr_storage
*sa
, int addrlen
)
468 struct pohmelfs_state
*st
;
470 list_for_each_entry(st
, &conn
->state_list
, state_entry
) {
471 if (st
->addrlen
!= addrlen
)
474 if (!memcmp(&st
->sa
, sa
, addrlen
)) {
482 struct pohmelfs_state
*pohmelfs_state_create(struct pohmelfs_connection
*conn
, struct sockaddr_storage
*sa
, int addrlen
,
483 int ask_route
, int group_id
)
486 struct pohmelfs_state
*st
;
487 struct sockaddr
*addr
= (struct sockaddr
*)sa
;
489 /* early check - this state can be inserted into route table, no need to create state and check again */
490 spin_lock(&conn
->state_lock
);
491 if (pohmelfs_addr_exist(conn
, sa
, addrlen
))
493 spin_unlock(&conn
->state_lock
);
498 st
= kzalloc(sizeof(struct pohmelfs_state
), GFP_KERNEL
);
505 mutex_init(&st
->trans_lock
);
506 INIT_LIST_HEAD(&st
->trans_list
);
507 st
->trans_root
= RB_ROOT
;
509 st
->group_id
= group_id
;
511 kref_init(&st
->refcnt
);
513 INIT_WORK(&st
->io_work
, pohmelfs_state_io_work
);
517 err
= sock_create_kern(addr
->sa_family
, SOCK_STREAM
, IPPROTO_TCP
, &st
->sock
);
519 pohmelfs_print_addr(sa
, "sock_create: failed family: %d, err: %d\n", addr
->sa_family
, err
);
523 st
->sock
->sk
->sk_allocation
= GFP_NOIO
;
524 st
->sock
->sk
->sk_sndtimeo
= st
->sock
->sk
->sk_rcvtimeo
= msecs_to_jiffies(60000);
527 sock_setsockopt(st
->sock
, SOL_SOCKET
, SO_KEEPALIVE
, (char *)&err
, 4);
529 tcp_setsockopt(st
->sock
->sk
, SOL_TCP
, TCP_KEEPIDLE
, (char *)&conn
->psb
->keepalive_idle
, 4);
530 tcp_setsockopt(st
->sock
->sk
, SOL_TCP
, TCP_KEEPINTVL
, (char *)&conn
->psb
->keepalive_interval
, 4);
531 tcp_setsockopt(st
->sock
->sk
, SOL_TCP
, TCP_KEEPCNT
, (char *)&conn
->psb
->keepalive_cnt
, 4);
533 err
= kernel_connect(st
->sock
, (struct sockaddr
*)addr
, addrlen
, 0);
535 pohmelfs_print_addr(sa
, "kernel_connect: failed family: %d, err: %d\n", addr
->sa_family
, err
);
536 goto err_out_release
;
538 st
->sock
->sk
->sk_sndtimeo
= st
->sock
->sk
->sk_rcvtimeo
= msecs_to_jiffies(60000);
540 memcpy(&st
->sa
, sa
, sizeof(struct sockaddr_storage
));
541 st
->addrlen
= addrlen
;
543 err
= pohmelfs_poll_init(st
);
545 goto err_out_shutdown
;
548 spin_lock(&conn
->state_lock
);
550 if (!pohmelfs_addr_exist(conn
, sa
, addrlen
)) {
551 list_add_tail(&st
->state_entry
, &conn
->state_list
);
554 spin_unlock(&conn
->state_lock
);
557 goto err_out_poll_exit
;
560 err
= pohmelfs_route_request(st
);
562 goto err_out_poll_exit
;
565 pohmelfs_print_addr(sa
, "%d: connected\n", st
->conn
->idx
);
570 pohmelfs_poll_exit(st
);
572 st
->sock
->ops
->shutdown(st
->sock
, 2);
574 sock_release(st
->sock
);
578 if (err
!= -EEXIST
) {
579 pohmelfs_print_addr(sa
, "state creation failed: %d\n", err
);
584 static void pohmelfs_state_exit(struct pohmelfs_state
*st
)
589 pohmelfs_poll_exit(st
);
590 st
->sock
->ops
->shutdown(st
->sock
, 2);
592 pohmelfs_print_addr(&st
->sa
, "disconnected\n");
593 sock_release(st
->sock
);
596 static void pohmelfs_state_release(struct kref
*kref
)
598 struct pohmelfs_state
*st
= container_of(kref
, struct pohmelfs_state
, refcnt
);
599 pohmelfs_state_exit(st
);
602 void pohmelfs_state_put(struct pohmelfs_state
*st
)
604 kref_put(&st
->refcnt
, pohmelfs_state_release
);
607 static void pohmelfs_state_clean(struct pohmelfs_state
*st
)
609 struct pohmelfs_trans
*t
, *tmp
;
611 pohmelfs_route_remove_all(st
);
613 mutex_lock(&st
->trans_lock
);
614 list_for_each_entry_safe(t
, tmp
, &st
->trans_list
, trans_entry
) {
615 list_del(&t
->trans_entry
);
617 pohmelfs_trans_put(t
);
621 struct rb_node
*n
= rb_first(&st
->trans_root
);
625 t
= rb_entry(n
, struct pohmelfs_trans
, trans_node
);
627 rb_erase(&t
->trans_node
, &st
->trans_root
);
628 pohmelfs_trans_put(t
);
630 mutex_unlock(&st
->trans_lock
);
632 cancel_work_sync(&st
->io_work
);
635 void pohmelfs_state_kill(struct pohmelfs_state
*st
)
637 BUG_ON(!list_empty(&st
->state_entry
));
639 pohmelfs_state_clean(st
);
640 pohmelfs_state_put(st
);
643 void pohmelfs_state_schedule(struct pohmelfs_state
*st
)
645 if (!st
->conn
->need_exit
)
646 queue_work(st
->conn
->wq
, &st
->io_work
);
649 int pohmelfs_state_add_reconnect(struct pohmelfs_state
*st
)
651 struct pohmelfs_connection
*conn
= st
->conn
;
652 struct pohmelfs_reconnect
*r
, *tmp
;
655 pohmelfs_route_remove_all(st
);
657 r
= kzalloc(sizeof(struct pohmelfs_reconnect
), GFP_NOIO
);
663 memcpy(&r
->sa
, &st
->sa
, sizeof(struct sockaddr_storage
));
664 r
->addrlen
= st
->addrlen
;
665 r
->group_id
= st
->group_id
;
667 mutex_lock(&conn
->reconnect_lock
);
668 list_for_each_entry(tmp
, &conn
->reconnect_list
, reconnect_entry
) {
669 if (tmp
->addrlen
!= r
->addrlen
)
672 if (memcmp(&tmp
->sa
, &r
->sa
, r
->addrlen
))
680 list_add_tail(&r
->reconnect_entry
, &conn
->reconnect_list
);
682 mutex_unlock(&conn
->reconnect_lock
);
687 pohmelfs_print_addr(&st
->sa
, "reconnection added\n");
695 spin_lock(&conn
->state_lock
);
696 list_move(&st
->state_entry
, &conn
->kill_state_list
);
697 spin_unlock(&conn
->state_lock
);
699 /* we do not really care if this work will not be processed immediately */
700 queue_delayed_work(conn
->wq
, &conn
->reconnect_work
, 0);