2 * Copyright (C) 2011+ Evgeniy Polyakov <zbr@ioremap.net>
14 void *pohmelfs_scratch_buf
;
15 int pohmelfs_scratch_buf_size
= 4096;
17 void pohmelfs_print_addr(struct sockaddr_storage
*addr
, const char *fmt
, ...)
19 struct sockaddr
*sa
= (struct sockaddr
*)addr
;
24 ptr
= kvasprintf(GFP_NOIO
, fmt
, args
);
28 if (sa
->sa_family
== AF_INET
) {
29 struct sockaddr_in
*sin
= (struct sockaddr_in
*)addr
;
30 pr_info("pohmelfs: %pI4:%d: %s", &sin
->sin_addr
.s_addr
, ntohs(sin
->sin_port
), ptr
);
31 } else if (sa
->sa_family
== AF_INET6
) {
32 struct sockaddr_in6
*sin
= (struct sockaddr_in6
*)addr
;
33 pr_info("pohmelfs: %pI6:%d: %s", &sin
->sin6_addr
, ntohs(sin
->sin6_port
), ptr
);
42 * Basic network sending/receiving functions.
43 * Blocked mode is used.
45 int pohmelfs_data_recv(struct pohmelfs_state
*st
, void *buf
, u64 size
, unsigned int flags
)
56 msg
.msg_iov
= (struct iovec
*)&iov
;
60 msg
.msg_control
= NULL
;
61 msg
.msg_controllen
= 0;
62 msg
.msg_flags
= flags
;
64 err
= kernel_recvmsg(st
->sock
, &msg
, &iov
, 1, iov
.iov_len
, msg
.msg_flags
);
72 int pohmelfs_recv(struct pohmelfs_trans
*t
, struct pohmelfs_state
*recv
, void *data
, int size
)
76 err
= pohmelfs_data_recv(recv
, data
, size
, MSG_DONTWAIT
);
84 static int pohmelfs_data_send(struct pohmelfs_trans
*t
)
92 msg
.msg_control
= NULL
;
93 msg
.msg_controllen
= 0;
94 msg
.msg_flags
= MSG_DONTWAIT
;
100 if (t
->io_offset
< t
->header_size
) {
101 io
.iov_base
= (void *)(&t
->cmd
) + t
->io_offset
;
102 io
.iov_len
= t
->header_size
- t
->io_offset
;
104 err
= kernel_sendmsg(t
->st
->sock
, &msg
, (struct kvec
*)msg
.msg_iov
, 1, io
.iov_len
);
111 if ((t
->io_offset
>= t
->header_size
) && t
->data
) {
112 size_t sent_size
= t
->io_offset
- t
->header_size
;
113 io
.iov_base
= t
->data
+ sent_size
;
114 io
.iov_len
= t
->data_size
- sent_size
;
116 err
= kernel_sendmsg(t
->st
->sock
, &msg
, (struct kvec
*)msg
.msg_iov
, 1, io
.iov_len
);
130 static int pohmelfs_page_send(struct pohmelfs_trans
*t
)
132 struct pohmelfs_write_ctl
*ctl
= t
->wctl
;
138 if (t
->io_offset
< t
->header_size
) {
139 io
.iov_base
= (void *)(&t
->cmd
) + t
->io_offset
;
140 io
.iov_len
= t
->header_size
- t
->io_offset
;
144 msg
.msg_control
= NULL
;
145 msg
.msg_controllen
= 0;
146 msg
.msg_flags
= MSG_DONTWAIT
;
151 err
= kernel_sendmsg(t
->st
->sock
, &msg
, (struct kvec
*)msg
.msg_iov
, 1, io
.iov_len
);
158 if (t
->io_offset
>= t
->header_size
) {
159 size_t skip_offset
= 0;
160 size_t size
= le64_to_cpu(t
->cmd
.cmd
.size
) + sizeof(struct dnet_cmd
) - t
->io_offset
;
161 size_t current_io_offset
= t
->io_offset
- t
->header_size
;
163 for (i
= 0; i
< pagevec_count(&ctl
->pvec
); ++i
) {
164 struct page
*page
= ctl
->pvec
.pages
[i
];
165 pgoff_t off
= page_offset(page
) & ~PAGE_CACHE_MASK
;
166 size_t sz
= PAGE_CACHE_SIZE
- off
;
171 if (current_io_offset
> skip_offset
+ sz
) {
176 pr_debug("pohmelfs: %s: %d/%d: total-size: %llu, io-offset: %llu, rest-size: %zd, current-io: %zd, "
177 "skip-offset: %zd, page-offset: %ld, off: %ld, sz: %zu\n",
178 pohmelfs_dump_id(pohmelfs_inode(t
->inode
)->id
.id
), i
, pagevec_count(&ctl
->pvec
),
179 (unsigned long long)le64_to_cpu(t
->cmd
.cmd
.size
) + sizeof(struct dnet_cmd
),
180 t
->io_offset
, size
, current_io_offset
, skip_offset
, (unsigned long)page_offset(page
),
181 (unsigned long)off
, sz
);
183 sz
-= current_io_offset
- skip_offset
;
184 off
+= current_io_offset
- skip_offset
;
186 err
= kernel_sendpage(t
->st
->sock
, page
, off
, sz
, MSG_DONTWAIT
);
190 current_io_offset
+= err
;
207 struct pohmelfs_poll_helper
{
209 struct pohmelfs_state
*st
;
212 static int pohmelfs_queue_wake(wait_queue_t
*wait
, unsigned mode
, int sync
, void *key
)
214 struct pohmelfs_state
*st
= container_of(wait
, struct pohmelfs_state
, wait
);
216 if (!st
->conn
->need_exit
)
217 queue_work(st
->conn
->wq
, &st
->io_work
);
221 static void pohmelfs_queue_func(struct file
*file
, wait_queue_head_t
*whead
, poll_table
*pt
)
223 struct pohmelfs_state
*st
= container_of(pt
, struct pohmelfs_poll_helper
, pt
)->st
;
227 init_waitqueue_func_entry(&st
->wait
, pohmelfs_queue_wake
);
228 add_wait_queue(whead
, &st
->wait
);
231 static void pohmelfs_poll_exit(struct pohmelfs_state
*st
)
234 remove_wait_queue(st
->whead
, &st
->wait
);
239 static int pohmelfs_poll_init(struct pohmelfs_state
*st
)
241 struct pohmelfs_poll_helper ph
;
244 init_poll_funcptr(&ph
.pt
, &pohmelfs_queue_func
);
246 st
->sock
->ops
->poll(NULL
, st
->sock
, &ph
.pt
);
250 static int pohmelfs_revents(struct pohmelfs_state
*st
, unsigned mask
)
254 revents
= st
->sock
->ops
->poll(NULL
, st
->sock
, NULL
);
258 if (revents
& (POLLERR
| POLLHUP
| POLLNVAL
| POLLRDHUP
| POLLREMOVE
)) {
259 pohmelfs_print_addr(&st
->sa
, "error revents: %x\n", revents
);
266 static int pohmelfs_state_send(struct pohmelfs_state
*st
)
268 struct pohmelfs_trans
*t
= NULL
;
273 mutex_lock(&st
->trans_lock
);
274 if (!list_empty(&st
->trans_list
))
275 t
= list_first_entry(&st
->trans_list
, struct pohmelfs_trans
, trans_entry
);
276 mutex_unlock(&st
->trans_lock
);
281 err
= pohmelfs_revents(st
, POLLOUT
);
285 size
= le64_to_cpu(t
->cmd
.cmd
.size
) + sizeof(struct dnet_cmd
);
286 pr_debug("pohmelfs: %s: starting sending: %llu/%zd\n", pohmelfs_dump_id(pohmelfs_inode(t
->inode
)->id
.id
), t
->io_offset
, size
);
289 err
= pohmelfs_page_send(t
);
291 err
= pohmelfs_data_send(t
);
293 pr_debug("pohmelfs: %s: sent: %llu/%zd: %d\n", pohmelfs_dump_id(pohmelfs_inode(t
->inode
)->id
.id
), t
->io_offset
, size
, err
);
294 if (!err
&& (t
->io_offset
== size
)) {
295 mutex_lock(&st
->trans_lock
);
296 list_del_init(&t
->trans_entry
);
297 err
= pohmelfs_trans_insert_tree(st
, t
);
301 mutex_unlock(&st
->trans_lock
);
304 BUG_ON(t
->io_offset
> size
);
307 pohmelfs_trans_put(t
);
309 if ((err
< 0) && (err
!= -EAGAIN
))
316 static void pohmelfs_suck_scratch(struct pohmelfs_state
*st
)
318 struct dnet_cmd
*cmd
= &st
->cmd
;
321 pr_debug("pohmelfs_suck_scratch: %llu\n", (unsigned long long)cmd
->size
);
324 int sz
= pohmelfs_scratch_buf_size
;
329 err
= pohmelfs_data_recv(st
, pohmelfs_scratch_buf
, sz
, MSG_WAITALL
);
331 pohmelfs_print_addr(&st
->sa
, "recv-scratch err: %d\n", err
);
342 static int pohmelfs_state_recv(struct pohmelfs_state
*st
)
344 struct dnet_cmd
*cmd
= &st
->cmd
;
345 struct pohmelfs_trans
*t
;
346 unsigned long long trans
;
349 err
= pohmelfs_revents(st
, POLLIN
);
354 err
= pohmelfs_data_recv(st
, cmd
, sizeof(struct dnet_cmd
), MSG_WAITALL
);
359 pohmelfs_print_addr(&st
->sa
, "recv error: %d\n", err
);
363 dnet_convert_cmd(cmd
);
365 trans
= cmd
->trans
& ~DNET_TRANS_REPLY
;
369 t
= pohmelfs_trans_lookup(st
, cmd
);
371 pohmelfs_suck_scratch(st
);
376 if (cmd
->size
&& (t
->io_offset
!= cmd
->size
)) {
377 err
= t
->cb
.recv_reply(t
, st
);
378 if (err
&& (err
!= -EAGAIN
)) {
379 pohmelfs_print_addr(&st
->sa
, "recv-reply error: %d\n", err
);
383 if (t
->io_offset
!= cmd
->size
)
387 err
= t
->cb
.complete(t
, st
);
389 pohmelfs_print_addr(&st
->sa
, "recv-complete err: %d\n", err
);
397 /* only remove and free transaction if there is error or there will be no more replies */
398 if (!(cmd
->flags
& DNET_FLAGS_MORE
) || err
) {
399 pohmelfs_trans_remove(t
);
402 * refcnt was grabbed twice:
403 * in pohmelfs_trans_lookup()
404 * and at transaction creation
406 pohmelfs_trans_put(t
);
410 cmd
->size
-= t
->io_offset
;
415 pohmelfs_trans_put(t
);
420 static void pohmelfs_state_io_work(struct work_struct
*work
)
422 struct pohmelfs_state
*st
= container_of(work
, struct pohmelfs_state
, io_work
);
423 int send_err
, recv_err
;
425 send_err
= recv_err
= -EAGAIN
;
426 while (!st
->conn
->psb
->need_exit
) {
427 send_err
= pohmelfs_state_send(st
);
428 if (send_err
&& (send_err
!= -EAGAIN
)) {
429 pohmelfs_print_addr(&st
->sa
, "state send error: %d\n", send_err
);
433 recv_err
= pohmelfs_state_recv(st
);
434 if (recv_err
&& (recv_err
!= -EAGAIN
)) {
435 pohmelfs_print_addr(&st
->sa
, "state recv error: %d\n", recv_err
);
439 if ((send_err
== -EAGAIN
) && (recv_err
== -EAGAIN
))
444 if ((send_err
&& (send_err
!= -EAGAIN
)) || (recv_err
&& (recv_err
!= -EAGAIN
))) {
445 pohmelfs_state_add_reconnect(st
);
450 struct pohmelfs_state
*pohmelfs_addr_exist(struct pohmelfs_connection
*conn
, struct sockaddr_storage
*sa
, int addrlen
)
452 struct pohmelfs_state
*st
;
454 list_for_each_entry(st
, &conn
->state_list
, state_entry
) {
455 if (st
->addrlen
!= addrlen
)
458 if (!memcmp(&st
->sa
, sa
, addrlen
)) {
466 struct pohmelfs_state
*pohmelfs_state_create(struct pohmelfs_connection
*conn
, struct sockaddr_storage
*sa
, int addrlen
,
467 int ask_route
, int group_id
)
470 struct pohmelfs_state
*st
;
471 struct sockaddr
*addr
= (struct sockaddr
*)sa
;
473 /* early check - this state can be inserted into route table, no need to create state and check again */
474 spin_lock(&conn
->state_lock
);
475 if (pohmelfs_addr_exist(conn
, sa
, addrlen
))
477 spin_unlock(&conn
->state_lock
);
482 st
= kzalloc(sizeof(struct pohmelfs_state
), GFP_KERNEL
);
489 mutex_init(&st
->trans_lock
);
490 INIT_LIST_HEAD(&st
->trans_list
);
491 st
->trans_root
= RB_ROOT
;
493 st
->group_id
= group_id
;
495 kref_init(&st
->refcnt
);
497 INIT_WORK(&st
->io_work
, pohmelfs_state_io_work
);
501 err
= sock_create_kern(addr
->sa_family
, SOCK_STREAM
, IPPROTO_TCP
, &st
->sock
);
503 pohmelfs_print_addr(sa
, "sock_create: failed family: %d, err: %d\n", addr
->sa_family
, err
);
507 st
->sock
->sk
->sk_allocation
= GFP_NOIO
;
508 st
->sock
->sk
->sk_sndtimeo
= st
->sock
->sk
->sk_rcvtimeo
= msecs_to_jiffies(60000);
511 sock_setsockopt(st
->sock
, SOL_SOCKET
, SO_KEEPALIVE
, (char *)&err
, 4);
513 tcp_setsockopt(st
->sock
->sk
, SOL_TCP
, TCP_KEEPIDLE
, (char *)&conn
->psb
->keepalive_idle
, 4);
514 tcp_setsockopt(st
->sock
->sk
, SOL_TCP
, TCP_KEEPINTVL
, (char *)&conn
->psb
->keepalive_interval
, 4);
515 tcp_setsockopt(st
->sock
->sk
, SOL_TCP
, TCP_KEEPCNT
, (char *)&conn
->psb
->keepalive_cnt
, 4);
517 err
= kernel_connect(st
->sock
, (struct sockaddr
*)addr
, addrlen
, 0);
519 pohmelfs_print_addr(sa
, "kernel_connect: failed family: %d, err: %d\n", addr
->sa_family
, err
);
520 goto err_out_release
;
522 st
->sock
->sk
->sk_sndtimeo
= st
->sock
->sk
->sk_rcvtimeo
= msecs_to_jiffies(60000);
524 memcpy(&st
->sa
, sa
, sizeof(struct sockaddr_storage
));
525 st
->addrlen
= addrlen
;
527 err
= pohmelfs_poll_init(st
);
529 goto err_out_shutdown
;
532 spin_lock(&conn
->state_lock
);
534 if (!pohmelfs_addr_exist(conn
, sa
, addrlen
)) {
535 list_add_tail(&st
->state_entry
, &conn
->state_list
);
538 spin_unlock(&conn
->state_lock
);
541 goto err_out_poll_exit
;
544 err
= pohmelfs_route_request(st
);
546 goto err_out_poll_exit
;
549 pohmelfs_print_addr(sa
, "%d: connected\n", st
->conn
->idx
);
554 pohmelfs_poll_exit(st
);
556 st
->sock
->ops
->shutdown(st
->sock
, 2);
558 sock_release(st
->sock
);
562 if (err
!= -EEXIST
) {
563 pohmelfs_print_addr(sa
, "state creation failed: %d\n", err
);
568 static void pohmelfs_state_exit(struct pohmelfs_state
*st
)
573 pohmelfs_poll_exit(st
);
574 st
->sock
->ops
->shutdown(st
->sock
, 2);
576 pohmelfs_print_addr(&st
->sa
, "disconnected\n");
577 sock_release(st
->sock
);
580 static void pohmelfs_state_release(struct kref
*kref
)
582 struct pohmelfs_state
*st
= container_of(kref
, struct pohmelfs_state
, refcnt
);
583 pohmelfs_state_exit(st
);
586 void pohmelfs_state_put(struct pohmelfs_state
*st
)
588 kref_put(&st
->refcnt
, pohmelfs_state_release
);
591 static void pohmelfs_state_clean(struct pohmelfs_state
*st
)
593 struct pohmelfs_trans
*t
, *tmp
;
595 pohmelfs_route_remove_all(st
);
597 mutex_lock(&st
->trans_lock
);
598 list_for_each_entry_safe(t
, tmp
, &st
->trans_list
, trans_entry
) {
599 list_del(&t
->trans_entry
);
601 pohmelfs_trans_put(t
);
605 struct rb_node
*n
= rb_first(&st
->trans_root
);
609 t
= rb_entry(n
, struct pohmelfs_trans
, trans_node
);
611 rb_erase(&t
->trans_node
, &st
->trans_root
);
612 pohmelfs_trans_put(t
);
614 mutex_unlock(&st
->trans_lock
);
616 cancel_work_sync(&st
->io_work
);
619 void pohmelfs_state_kill(struct pohmelfs_state
*st
)
621 BUG_ON(!list_empty(&st
->state_entry
));
623 pohmelfs_state_clean(st
);
624 pohmelfs_state_put(st
);
627 void pohmelfs_state_schedule(struct pohmelfs_state
*st
)
629 if (!st
->conn
->need_exit
)
630 queue_work(st
->conn
->wq
, &st
->io_work
);
633 int pohmelfs_state_add_reconnect(struct pohmelfs_state
*st
)
635 struct pohmelfs_connection
*conn
= st
->conn
;
636 struct pohmelfs_reconnect
*r
, *tmp
;
639 pohmelfs_route_remove_all(st
);
641 r
= kzalloc(sizeof(struct pohmelfs_reconnect
), GFP_NOIO
);
647 memcpy(&r
->sa
, &st
->sa
, sizeof(struct sockaddr_storage
));
648 r
->addrlen
= st
->addrlen
;
649 r
->group_id
= st
->group_id
;
651 mutex_lock(&conn
->reconnect_lock
);
652 list_for_each_entry(tmp
, &conn
->reconnect_list
, reconnect_entry
) {
653 if (tmp
->addrlen
!= r
->addrlen
)
656 if (memcmp(&tmp
->sa
, &r
->sa
, r
->addrlen
))
664 list_add_tail(&r
->reconnect_entry
, &conn
->reconnect_list
);
666 mutex_unlock(&conn
->reconnect_lock
);
671 pohmelfs_print_addr(&st
->sa
, "reconnection added\n");
679 spin_lock(&conn
->state_lock
);
680 list_move(&st
->state_entry
, &conn
->kill_state_list
);
681 spin_unlock(&conn
->state_lock
);
683 /* we do not really care if this work will not be processed immediately */
684 queue_delayed_work(conn
->wq
, &conn
->reconnect_work
, 0);