qos_queue locking+content free, conn_ack_rcvd reset check, neighbor_operations lock...
[cor_2_6_31.git] / net / cor / snd.c
blobda16bd01682d369a631f0e73e3faae4a4360233e
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2011 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/gfp.h>
22 #include <linux/jiffies.h>
23 #include <linux/slab.h>
25 #include "cor.h"
27 struct kmem_cache *connretrans_slab;
29 struct conn_retrans {
30 /* timeout_list and conn_list share a single ref */
31 struct kref ref;
32 struct list_head timeout_list;
33 struct list_head conn_list;
34 struct htab_entry htab_entry;
35 struct conn *trgt_out_o;
36 __u32 seqno;
37 __u32 length;
38 __u8 ackrcvd;
39 unsigned long timeout;
42 static void free_connretrans(struct kref *ref)
44 struct conn_retrans *cr = container_of(ref, struct conn_retrans, ref);
45 kmem_cache_free(connretrans_slab, cr);
46 kref_put(&(cr->trgt_out_o->ref), free_conn);
49 DEFINE_MUTEX(queues_lock);
50 LIST_HEAD(queues);
51 struct delayed_work qos_resume_work;
52 int qos_resume_scheduled;
54 void free_qos(struct kref *ref)
56 struct qos_queue *q = container_of(ref, struct qos_queue, ref);
57 kfree(q);
60 /* Highest bidder "pays" the credits the second has bid */
61 static int _resume_conns(struct qos_queue *q)
63 struct conn *best = 0;
64 __u64 bestcredit = 0;
65 __u64 secondcredit = 0;
67 int rc;
69 struct list_head *lh = q->conns_waiting.next;
71 while (lh != &(q->conns_waiting)) {
72 struct conn *trgt_out_o = container_of(lh, struct conn,
73 target.out.rb.lh);
74 __u64 credits;
76 lh = lh->next;
78 refresh_conn_credits(trgt_out_o, 0, 0);
80 mutex_lock(&(trgt_out_o->rcv_lock));
82 BUG_ON(trgt_out_o->targettype != TARGET_OUT);
84 if (unlikely(trgt_out_o->isreset != 0)) {
85 trgt_out_o->target.out.rb.in_queue = 0;
86 list_del(&(trgt_out_o->target.out.rb.lh));
87 mutex_unlock(&(trgt_out_o->rcv_lock));
88 kref_put(&(trgt_out_o->ref), free_conn);
90 continue;
93 BUG_ON(trgt_out_o->data_buf.read_remaining == 0);
95 if (may_alloc_control_msg(trgt_out_o->target.out.nb,
96 ACM_PRIORITY_LOW) == 0)
97 continue;
99 if (trgt_out_o->credits <= 0)
100 credits = 0;
101 else
102 credits = multiply_div(trgt_out_o->credits, 1LL << 24,
103 trgt_out_o->data_buf.read_remaining);
104 mutex_unlock(&(trgt_out_o->rcv_lock));
106 if (best == 0 || bestcredit < credits) {
107 secondcredit = bestcredit;
108 best = trgt_out_o;
109 bestcredit = credits;
110 } else if (secondcredit < credits) {
111 secondcredit = credits;
115 if (best == 0)
116 return RC_FLUSH_CONN_OUT_OK;
118 mutex_lock(&(best->rcv_lock));
119 rc = flush_out(best, 1, (__u32) (secondcredit >> 32));
121 if (rc == RC_FLUSH_CONN_OUT_OK || rc == RC_FLUSH_CONN_OUT_OK_SENT) {
122 best->target.out.rb.in_queue = 0;
123 list_del(&(best->target.out.rb.lh));
125 mutex_unlock(&(best->rcv_lock));
127 refresh_conn_credits(best, 0, 0);
129 if (rc == RC_FLUSH_CONN_OUT_OK_SENT)
130 wake_sender(best);
132 if (rc == RC_FLUSH_CONN_OUT_OK || rc == RC_FLUSH_CONN_OUT_OK_SENT)
133 kref_put(&(best->ref), free_conn);
135 return rc;
138 static int resume_conns(struct qos_queue *q)
140 while (list_empty(&(q->conns_waiting)) == 0) {
141 int rc = _resume_conns(q);
142 if (rc != RC_FLUSH_CONN_OUT_OK &&
143 rc != RC_FLUSH_CONN_OUT_OK_SENT)
144 return 1;
146 return 0;
149 static int send_retrans(struct neighbor *nb, int fromqos);
151 static int __qos_resume(struct qos_queue *q, int caller)
153 unsigned long iflags;
154 int rc = 0;
155 struct list_head *lh;
157 spin_lock_irqsave(&(q->qlock), iflags);
159 if (caller == QOS_CALLER_KPACKET)
160 lh = &(q->conn_retrans_waiting);
161 else if (caller == QOS_CALLER_CONN_RETRANS)
162 lh = &(q->kpackets_waiting);
163 else if (caller == QOS_CALLER_ANNOUNCE)
164 lh = &(q->announce_waiting);
165 else
166 BUG();
168 while (list_empty(lh) == 0) {
169 struct list_head *curr = lh->next;
170 struct resume_block *rb = container_of(curr,
171 struct resume_block, lh);
172 rb->in_queue = 0;
173 list_del(curr);
175 if (caller == QOS_CALLER_KPACKET) {
176 struct neighbor *nb = container_of(rb, struct neighbor,
177 rb_kp);
178 kref_put(&(nb->ref), neighbor_free);
179 spin_unlock_irqrestore(&(q->qlock), iflags);
180 rc = send_messages(nb, 1);
181 kref_put(&(nb->ref), neighbor_free);
182 spin_lock_irqsave(&(q->qlock), iflags);
183 } else if (caller == QOS_CALLER_CONN_RETRANS) {
184 struct neighbor *nb = container_of(rb, struct neighbor,
185 rb_cr);
186 #warning todo do not send if neighbor is stalled
187 kref_put(&(nb->ref), neighbor_free);
188 spin_unlock_irqrestore(&(q->qlock), iflags);
189 rc = send_retrans(nb, 1);
190 kref_put(&(nb->ref), neighbor_free);
191 spin_lock_irqsave(&(q->qlock), iflags);
192 } else if (caller == QOS_CALLER_ANNOUNCE) {
193 struct announce_data *ann = container_of(rb,
194 struct announce_data, rb);
195 kref_get(&(ann->ref));
196 spin_unlock_irqrestore(&(q->qlock), iflags);
197 rc = send_announce_qos(ann);
198 kref_put(&(ann->ref), announce_data_free);
199 spin_lock_irqsave(&(q->qlock), iflags);
200 } else {
201 BUG();
204 if (rc != 0) {
205 if (rb->in_queue == 0) {
206 rb->in_queue = 1;
207 list_add(curr, lh);
209 break;
212 if (caller == QOS_CALLER_KPACKET) {
213 kref_put(&(container_of(rb, struct neighbor,
214 rb_kp)->ref), neighbor_free);
215 } else if (caller == QOS_CALLER_CONN_RETRANS) {
216 kref_put(&(container_of(rb, struct neighbor,
217 rb_cr)->ref), neighbor_free);
218 } else if (caller == QOS_CALLER_ANNOUNCE) {
219 kref_put(&(container_of(rb,
220 struct announce_data, rb)->ref),
221 announce_data_free);
222 } else {
223 BUG();
227 spin_unlock_irqrestore(&(q->qlock), iflags);
229 return rc;
232 static int _qos_resume(struct qos_queue *q)
234 int rc = 0;
235 unsigned long iflags;
236 int i;
238 spin_lock_irqsave(&(q->qlock), iflags);
240 for (i=0;i<4 && rc == 0;i++) {
241 struct list_head *lh;
242 int rc;
244 if (i == QOS_CALLER_KPACKET)
245 lh = &(q->conn_retrans_waiting);
246 else if (i == QOS_CALLER_CONN_RETRANS)
247 lh = &(q->kpackets_waiting);
248 else if (i == QOS_CALLER_ANNOUNCE)
249 lh = &(q->announce_waiting);
250 else if (i == QOS_CALLER_CONN)
251 lh = &(q->conns_waiting);
252 else
253 BUG();
255 if (list_empty(lh))
256 continue;
258 spin_unlock_irqrestore(&(q->qlock), iflags);
259 if (i == QOS_CALLER_CONN)
260 rc = resume_conns(q);
261 else
262 rc = __qos_resume(q, i);
263 spin_lock_irqsave(&(q->qlock), iflags);
265 i = 0;
268 spin_unlock_irqrestore(&(q->qlock), iflags);
270 return rc;
273 static void qos_resume(struct work_struct *work)
275 struct list_head *curr;
277 int congested = 0;
279 mutex_lock(&(queues_lock));
281 curr = queues.next;
282 while (curr != (&queues)) {
283 struct qos_queue *q = container_of(curr,
284 struct qos_queue, queue_list);
286 if (_qos_resume(q))
287 congested = 1;
289 curr = curr->next;
292 if (congested) {
293 schedule_delayed_work(&(qos_resume_work), 1);
294 } else {
295 qos_resume_scheduled = 0;
298 mutex_unlock(&(queues_lock));
301 struct qos_queue *get_queue(struct net_device *dev)
303 struct qos_queue *ret = 0;
304 struct list_head *curr;
306 mutex_lock(&(queues_lock));
307 curr = queues.next;
308 while (curr != (&queues)) {
309 struct qos_queue *q = container_of(curr,
310 struct qos_queue, queue_list);
311 if (q->dev == dev) {
312 ret = q;
313 break;
316 mutex_unlock(&(queues_lock));
317 return ret;
320 static void _destroy_queue(struct qos_queue *q, int caller)
322 struct list_head *lh;
324 if (caller == QOS_CALLER_KPACKET)
325 lh = &(q->conn_retrans_waiting);
326 else if (caller == QOS_CALLER_CONN_RETRANS)
327 lh = &(q->kpackets_waiting);
328 else if (caller == QOS_CALLER_ANNOUNCE)
329 lh = &(q->announce_waiting);
330 else
331 BUG();
333 while (list_empty(lh) == 0) {
334 struct list_head *curr = lh->next;
335 struct resume_block *rb = container_of(curr,
336 struct resume_block, lh);
337 rb->in_queue = 0;
338 list_del(curr);
340 if (caller == QOS_CALLER_KPACKET) {
341 kref_put(&(container_of(rb, struct neighbor,
342 rb_kp)->ref), neighbor_free);
343 } else if (caller == QOS_CALLER_CONN_RETRANS) {
344 kref_put(&(container_of(rb, struct neighbor,
345 rb_cr)->ref), neighbor_free);
346 } else if (caller == QOS_CALLER_ANNOUNCE) {
347 kref_put(&(container_of(rb,
348 struct announce_data, rb)->ref),
349 announce_data_free);
350 } else {
351 BUG();
356 int destroy_queue(struct net_device *dev)
358 int unlink;
359 unsigned long iflags;
360 struct qos_queue *q = get_queue(dev);
361 if (q == 0)
362 return 1;
364 spin_lock_irqsave(&(q->qlock), iflags);
365 unlink = (q->dev != 0);
366 q->dev = 0;
367 _destroy_queue(q, QOS_CALLER_KPACKET);
368 _destroy_queue(q, QOS_CALLER_CONN_RETRANS);
369 _destroy_queue(q, QOS_CALLER_ANNOUNCE);
370 spin_unlock_irqrestore(&(q->qlock), iflags);
372 if (unlink) {
373 dev_put(dev);
374 mutex_lock(&(queues_lock));
375 list_del(&(q->queue_list));
376 mutex_unlock(&(queues_lock));
377 kref_put(&(q->ref), free_qos);
379 kref_put(&(q->ref), free_qos);
381 return 0;
384 int create_queue(struct net_device *dev)
386 struct qos_queue *q = kmalloc(sizeof(struct qos_queue), GFP_KERNEL);
388 if (q == 0) {
389 printk(KERN_ERR "cor: unable to allocate memory for device "
390 "queue, not enabling device");
391 return 1;
394 spin_lock_init(&(q->qlock));
396 q->dev = dev;
397 dev_hold(dev);
399 INIT_LIST_HEAD(&(q->kpackets_waiting));
400 INIT_LIST_HEAD(&(q->conn_retrans_waiting));
401 INIT_LIST_HEAD(&(q->announce_waiting));
402 INIT_LIST_HEAD(&(q->conns_waiting));
404 mutex_lock(&(queues_lock));
405 list_add(&(q->queue_list), &queues);
406 mutex_unlock(&(queues_lock));
408 return 0;
411 void qos_enqueue(struct qos_queue *q, struct resume_block *rb, int caller)
413 unsigned long iflags;
415 spin_lock_irqsave(&(q->qlock), iflags);
417 if (rb->in_queue)
418 goto out;
420 rb->in_queue = 1;
422 if (caller == QOS_CALLER_KPACKET) {
423 list_add(&(rb->lh) , &(q->conn_retrans_waiting));
424 kref_get(&(container_of(rb, struct neighbor, rb_kp)->ref));
425 } else if (caller == QOS_CALLER_CONN_RETRANS) {
426 list_add(&(rb->lh), &(q->kpackets_waiting));
427 kref_get(&(container_of(rb, struct neighbor, rb_cr)->ref));
428 } else if (caller == QOS_CALLER_ANNOUNCE) {
429 list_add(&(rb->lh), &(q->announce_waiting));
430 kref_get(&(container_of(rb, struct announce_data, rb)->ref));
431 } else if (caller == QOS_CALLER_CONN) {
432 list_add(&(rb->lh), &(q->conns_waiting));
433 kref_get(&(container_of(rb, struct conn, target.out.rb)->ref));
434 } else {
435 BUG();
438 if (qos_resume_scheduled == 0) {
439 schedule_delayed_work(&(qos_resume_work), 1);
440 qos_resume_scheduled = 1;
443 out:
444 spin_unlock_irqrestore(&(q->qlock), iflags);
447 void qos_remove_conn(struct conn *trgt_out_l)
449 unsigned long iflags;
450 struct qos_queue *q;
452 BUG_ON(trgt_out_l->targettype != TARGET_OUT);
454 if (trgt_out_l->target.out.rb.in_queue == 0)
455 return;
457 q = trgt_out_l->target.out.nb->queue;
458 BUG_ON(q == 0);
460 trgt_out_l->target.out.rb.in_queue = 0;
461 spin_lock_irqsave(&(q->qlock), iflags);
462 list_del(&(trgt_out_l->target.out.rb.lh));
463 spin_unlock_irqrestore(&(q->qlock), iflags);
465 kref_put(&(trgt_out_l->ref), free_conn);
468 static void qos_enqueue_conn(struct conn *trgt_out_l)
470 qos_enqueue(trgt_out_l->target.out.nb->queue,
471 &(trgt_out_l->target.out.rb), QOS_CALLER_CONN);
474 static int may_send_conn_retrans(struct neighbor *nb)
476 unsigned long iflags;
477 int rc;
479 BUG_ON(nb->queue == 0);
481 spin_lock_irqsave(&(nb->queue->qlock), iflags);
482 rc = (list_empty(&(nb->queue->kpackets_waiting)));
483 spin_unlock_irqrestore(&(nb->queue->qlock), iflags);
485 return rc;
488 static int may_send_conn(struct conn *trgt_out_l)
490 unsigned long iflags;
491 struct qos_queue *q = trgt_out_l->target.out.nb->queue;
492 int rc;
494 BUG_ON(q == 0);
496 spin_lock_irqsave(&(q->qlock), iflags);
497 rc = (list_empty(&(q->kpackets_waiting)) &&
498 list_empty(&(q->conn_retrans_waiting)) &&
499 list_empty(&(q->announce_waiting)) &&
500 list_empty(&(q->conns_waiting)));
501 spin_unlock_irqrestore(&(q->qlock), iflags);
503 return rc;
507 struct sk_buff *create_packet(struct neighbor *nb, int size,
508 gfp_t alloc_flags, __u32 conn_id, __u32 seqno)
510 struct sk_buff *ret;
511 char *dest;
513 ret = alloc_skb(size + 9 + LL_ALLOCATED_SPACE(nb->dev), alloc_flags);
514 if (unlikely(0 == ret))
515 return 0;
517 ret->protocol = htons(ETH_P_COR);
518 ret->dev = nb->dev;
520 skb_reserve(ret, LL_RESERVED_SPACE(nb->dev));
521 if(unlikely(dev_hard_header(ret, nb->dev, ETH_P_COR, nb->mac,
522 nb->dev->dev_addr, ret->len) < 0))
523 return 0;
524 skb_reset_network_header(ret);
526 dest = skb_put(ret, 9);
527 BUG_ON(0 == dest);
529 dest[0] = PACKET_TYPE_DATA;
530 dest += 1;
532 put_u32(dest, conn_id, 1);
533 dest += 4;
534 put_u32(dest, seqno, 1);
535 dest += 4;
537 return ret;
540 static void set_conn_retrans_timeout(struct conn_retrans *cr)
542 struct neighbor *nb = cr->trgt_out_o->target.out.nb;
543 cr->timeout = jiffies + usecs_to_jiffies(100000 +
544 ((__u32) atomic_read(&(nb->latency))) +
545 ((__u32) atomic_read(&(nb->max_remote_cmsg_delay))));
548 static struct conn_retrans *readd_conn_retrans(struct conn_retrans *cr,
549 struct neighbor *nb, __u32 length, int *dontsend)
551 unsigned long iflags;
553 struct conn_retrans *ret = 0;
555 spin_lock_irqsave(&(nb->retrans_lock), iflags);
557 if (unlikely(cr->ackrcvd)) {
558 *dontsend = 1;
559 goto out;
560 } else
561 *dontsend = 0;
563 if (unlikely(cr->length > length)) {
564 ret = kmem_cache_alloc(connretrans_slab, GFP_ATOMIC);
565 if (unlikely(ret == 0)) {
566 cr->timeout = jiffies + 1;
567 goto out;
570 memset(ret, 0, sizeof (struct conn_retrans));
571 ret->trgt_out_o = cr->trgt_out_o;
572 kref_get(&(cr->trgt_out_o->ref));
573 ret->seqno = cr->seqno + length;
574 ret->length = cr->length - length;
575 kref_init(&(ret->ref));
577 list_add(&(ret->timeout_list), &(nb->retrans_list_conn));
578 list_add(&(ret->conn_list), &(cr->conn_list));
580 cr->length = length;
581 } else {
582 list_del(&(cr->timeout_list));
583 list_add_tail(&(cr->timeout_list), &(nb->retrans_list_conn));
584 set_conn_retrans_timeout(cr);
586 BUG_ON(cr->length != length);
589 out:
590 spin_unlock_irqrestore(&(nb->retrans_lock), iflags);
592 return ret;
595 void cancel_retrans(struct conn *trgt_out_l)
597 unsigned long iflags;
598 struct neighbor *nb = trgt_out_l->target.out.nb;
600 spin_lock_irqsave(&(nb->retrans_lock), iflags);
602 while (list_empty(&(trgt_out_l->target.out.retrans_list)) == 0) {
603 struct conn_retrans *cr = container_of(
604 trgt_out_l->target.out.retrans_list.next,
605 struct conn_retrans, conn_list);
606 BUG_ON(cr->trgt_out_o != trgt_out_l);
608 list_del(&(cr->timeout_list));
609 list_del(&(cr->conn_list));
610 cr->ackrcvd = 1;
611 kref_put(&(cr->ref), free_connretrans);
613 #warning reschedule timer
615 spin_unlock_irqrestore(&(nb->retrans_lock), iflags);
618 static int _send_retrans(struct neighbor *nb, struct conn_retrans *cr)
620 int targetmss = mss(nb);
621 int dontsend;
622 int queuefull = 0;
624 mutex_lock(&(cr->trgt_out_o->rcv_lock));
626 BUG_ON(cr->trgt_out_o->targettype != TARGET_OUT);
627 BUG_ON(cr->trgt_out_o->target.out.nb != nb);
629 kref_get(&(cr->trgt_out_o->ref));
631 if (unlikely(cr->trgt_out_o->isreset != 0)) {
632 cancel_retrans(cr->trgt_out_o);
633 goto out;
636 while (cr->length >= targetmss) {
637 struct sk_buff *skb;
638 char *dst;
639 struct conn_retrans *cr2;
640 int rc;
642 if (may_send_conn_retrans(nb) == 0)
643 goto qos_enqueue;
645 skb = create_packet(nb, targetmss, GFP_KERNEL,
646 cr->trgt_out_o->target.out.conn_id, cr->seqno);
647 if (unlikely(skb == 0)) {
648 cr->timeout = jiffies + 1;
649 goto out;
652 cr2 = readd_conn_retrans(cr, nb, targetmss, &dontsend);
653 if (unlikely(unlikely(dontsend) || unlikely(cr2 == 0 &&
654 unlikely(cr->length > targetmss)))) {
655 kfree_skb(skb);
656 goto out;
659 dst = skb_put(skb, targetmss);
661 databuf_pullold(cr->trgt_out_o, cr->seqno, dst, targetmss);
662 rc = dev_queue_xmit(skb);
664 if (rc != 0) {
665 unsigned long iflags;
667 spin_lock_irqsave(&(nb->retrans_lock), iflags);
668 if (unlikely(cr->ackrcvd)) {
669 dontsend = 1;
670 } else {
671 list_del(&(cr->timeout_list));
672 list_add(&(cr->timeout_list),
673 &(nb->retrans_list_conn));
675 spin_unlock_irqrestore(&(nb->retrans_lock), iflags);
676 if (dontsend == 0)
677 goto qos_enqueue;
680 cr = cr2;
682 if (likely(cr == 0))
683 goto out;
686 if (unlikely(cr->length <= 0)) {
687 BUG();
688 } else {
689 struct control_msg_out *cm;
690 char *buf = kmalloc(cr->length, GFP_KERNEL);
692 if (unlikely(buf == 0)) {
693 cr->timeout = jiffies + 1;
694 goto out;
697 cm = alloc_control_msg(nb, ACM_PRIORITY_LOW);
698 if (unlikely(cm == 0)) {
699 cr->timeout = jiffies + 1;
700 kfree(buf);
701 goto out;
704 databuf_pullold(cr->trgt_out_o, cr->seqno, buf, cr->length);
706 if (unlikely(readd_conn_retrans(cr, nb, cr->length, &dontsend)
707 != 0))
708 BUG();
710 if (likely(dontsend == 0)) {
711 send_conndata(cm, cr->trgt_out_o->target.out.conn_id,
712 cr->seqno, buf, buf, cr->length);
716 if (0) {
717 qos_enqueue:
718 queuefull = 1;
720 out:
721 mutex_unlock(&(cr->trgt_out_o->rcv_lock));
723 kref_put(&(cr->trgt_out_o->ref), free_conn);
725 return queuefull;
728 static int send_retrans(struct neighbor *nb, int fromqos)
730 unsigned long iflags;
732 struct conn_retrans *cr = 0;
734 int nbstate;
735 int rescheduled = 0;
736 int queuefull = 0;
738 spin_lock_irqsave(&(nb->state_lock), iflags);
739 nbstate = nb->state;
740 spin_unlock_irqrestore(&(nb->state_lock), iflags);
742 while (1) {
743 spin_lock_irqsave(&(nb->retrans_lock), iflags);
745 if (list_empty(&(nb->retrans_list_conn))) {
746 nb->retrans_timer_conn_running = 0;
747 spin_unlock_irqrestore(&(nb->retrans_lock), iflags);
748 break;
751 cr = container_of(nb->retrans_list_conn.next,
752 struct conn_retrans, timeout_list);
754 if (unlikely(nbstate == NEIGHBOR_STATE_KILLED)) {
755 list_del(&(cr->timeout_list));
756 list_del(&(cr->conn_list));
757 spin_unlock_irqrestore(&(nb->retrans_lock), iflags);
759 kref_put(&(cr->ref), free_connretrans);
760 continue;
763 #warning todo check window limit
765 if (time_after(cr->timeout, jiffies)) {
766 schedule_delayed_work(&(nb->retrans_timer_conn),
767 cr->timeout - jiffies);
768 if (fromqos)
769 kref_get(&(nb->ref));
770 rescheduled = 1;
771 spin_unlock_irqrestore(&(nb->retrans_lock), iflags);
772 break;
775 kref_get(&(cr->ref));
776 spin_unlock_irqrestore(&(nb->retrans_lock), iflags);
777 queuefull = _send_retrans(nb, cr);
778 kref_put(&(cr->ref), free_connretrans);
779 if (queuefull) {
780 if (fromqos == 0)
781 qos_enqueue(nb->queue, &(nb->rb_cr),
782 QOS_CALLER_CONN_RETRANS);
783 break;
787 if (rescheduled == 0 && fromqos == 0)
788 kref_put(&(nb->ref), neighbor_free);
790 return queuefull;
793 void retransmit_conn_timerfunc(struct work_struct *work)
795 struct neighbor *nb = container_of(to_delayed_work(work),
796 struct neighbor, retrans_timer_conn);
798 send_retrans(nb, 0);
801 void conn_ack_ooo_rcvd(struct neighbor *nb, __u32 conn_id,
802 struct conn *trgt_out, __u32 seqno_ooo, __u32 length)
804 unsigned long iflags;
805 struct list_head *curr;
807 if (unlikely(length == 0))
808 return;
810 mutex_lock(&(trgt_out->rcv_lock));
812 if (unlikely(trgt_out->targettype != TARGET_OUT))
813 goto out;
814 if (unlikely(trgt_out->target.out.nb != nb))
815 goto out;
816 if (unlikely(trgt_out->target.out.conn_id != conn_id))
817 goto out;
819 spin_lock_irqsave(&(nb->retrans_lock), iflags);
821 curr = trgt_out->target.out.retrans_list.next;
823 while (curr != &(trgt_out->target.out.retrans_list)) {
824 struct conn_retrans *cr = container_of(curr,
825 struct conn_retrans, conn_list);
827 if (((__s32)(cr->seqno + cr->length - seqno_ooo)) > 0)
828 goto cont;
830 if (((__s32)(cr->seqno + cr->length - seqno_ooo - length)) >0) {
831 if (((__s32)(cr->seqno - seqno_ooo - length)) < 0) {
832 __u32 newseqno = seqno_ooo + length;
833 cr->length -= (newseqno - cr->seqno);
834 cr->seqno = newseqno;
837 break;
840 if (((__s32)(cr->seqno - seqno_ooo)) < 0 &&
841 ((__s32)(cr->seqno + cr->length - seqno_ooo -
842 length)) <= 0) {
843 __u32 diff = seqno_ooo + length - cr->seqno -
844 cr->length;
845 cr->seqno += diff;
846 cr->length -= diff;
847 } else {
848 list_del(&(cr->timeout_list));
849 list_del(&(cr->conn_list));
850 cr->ackrcvd = 1;
851 kref_put(&(cr->ref), free_connretrans);
854 cont:
855 curr = curr->next;
858 if (unlikely(list_empty(&(trgt_out->target.out.retrans_list))) == 0) {
859 struct conn_retrans *cr = container_of(
860 trgt_out->target.out.retrans_list.next,
861 struct conn_retrans, conn_list);
862 if (unlikely(((__s32) (cr->seqno -
863 trgt_out->target.out.seqno_acked)) > 0))
864 trgt_out->target.out.seqno_acked = cr->seqno;
867 spin_unlock_irqrestore(&(nb->retrans_lock), iflags);
869 out:
870 mutex_unlock(&(trgt_out->rcv_lock));
873 void conn_ack_rcvd(struct neighbor *nb, __u32 conn_id, struct conn *trgt_out,
874 __u32 seqno, int setwindow, __u8 window)
876 int flush = 0;
877 unsigned long iflags;
879 mutex_lock(&(trgt_out->rcv_lock));
881 if (unlikely(trgt_out->isreset != 0))
882 goto out;
883 if (unlikely(trgt_out->targettype != TARGET_OUT))
884 goto out;
885 if (unlikely(trgt_out->target.out.nb != nb))
886 goto out;
887 if (unlikely(trgt_out->reversedir->source.in.conn_id != conn_id))
888 goto out;
890 if (unlikely(((__s32)(seqno - trgt_out->target.out.seqno_nextsend)) > 0)
892 ((__s32)(seqno - trgt_out->target.out.seqno_acked)) < 0)
893 goto out;
895 if (setwindow) {
896 __u32 windowdec = dec_log_64_11(window);
897 if (unlikely(seqno == trgt_out->target.out.seqno_acked &&
898 ((__s32) (seqno + windowdec -
899 trgt_out->target.out.seqno_windowlimit )) <= 0))
900 goto skipwindow;
902 trgt_out->target.out.seqno_windowlimit = seqno + windowdec;
903 flush = 1;
906 skipwindow:
907 if (seqno == trgt_out->target.out.seqno_acked)
908 goto out;
910 spin_lock_irqsave(&(nb->retrans_lock), iflags);
912 trgt_out->target.out.seqno_acked = seqno;
914 while (list_empty(&(trgt_out->target.out.retrans_list)) == 0) {
915 struct conn_retrans *cr = container_of(
916 trgt_out->target.out.retrans_list.next,
917 struct conn_retrans, conn_list);
919 if (((__s32)(cr->seqno + cr->length - seqno)) > 0) {
920 if (((__s32)(cr->seqno - seqno)) < 0) {
921 cr->length -= (seqno - cr->seqno);
922 cr->seqno = seqno;
924 break;
927 list_del(&(cr->timeout_list));
928 list_del(&(cr->conn_list));
929 cr->ackrcvd = 1;
930 kref_put(&(cr->ref), free_connretrans);
933 spin_unlock_irqrestore(&(nb->retrans_lock), iflags);
934 databuf_ack(trgt_out, trgt_out->target.out.seqno_acked);
936 out:
937 mutex_unlock(&(trgt_out->rcv_lock));
939 if (flush)
940 flush_buf(trgt_out);
943 static void schedule_retransmit_conn(struct conn_retrans *cr,
944 struct conn *trgt_out_l, __u32 seqno, __u32 len)
946 unsigned long iflags;
948 struct neighbor *nb = trgt_out_l->target.out.nb;
950 int first;
952 BUG_ON(trgt_out_l->targettype != TARGET_OUT);
954 memset(cr, 0, sizeof (struct conn_retrans));
955 cr->trgt_out_o = trgt_out_l;
956 kref_get(&(trgt_out_l->ref));
957 cr->seqno = seqno;
958 cr->length = len;
959 kref_init(&(cr->ref));
960 set_conn_retrans_timeout(cr);
962 spin_lock_irqsave(&(nb->retrans_lock), iflags);
964 first = unlikely(list_empty(&(nb->retrans_list_conn)));
965 list_add_tail(&(cr->timeout_list), &(nb->retrans_list_conn));
967 list_add_tail(&(cr->conn_list), &(trgt_out_l->target.out.retrans_list));
969 if (unlikely(unlikely(first) &&
970 unlikely(nb->retrans_timer_conn_running == 0))) {
971 schedule_delayed_work(&(nb->retrans_timer_conn),
972 cr->timeout - jiffies);
973 nb->retrans_timer_conn_running = 1;
974 kref_get(&(nb->ref));
977 spin_unlock_irqrestore(&(nb->retrans_lock), iflags);
980 static __u32 get_windowlimit(struct conn *trgt_out_l)
982 __s32 windowlimit = (__s32)(trgt_out_l->target.out.seqno_windowlimit -
983 trgt_out_l->target.out.seqno_nextsend);
984 if (unlikely(windowlimit < 0))
985 return 0;
986 return windowlimit;
989 #warning todo reset connections which are SOURCE_NONE and are stuck for too long
990 int flush_out(struct conn *trgt_out_l, int fromqos, __u32 creditsperbyte)
992 int targetmss;
993 __u32 seqno;
994 int sent = 0;
996 BUG_ON(trgt_out_l->targettype != TARGET_OUT);
998 targetmss = mss(trgt_out_l->target.out.nb);
1000 if (unlikely(trgt_out_l->target.out.conn_id == 0))
1001 return RC_FLUSH_CONN_OUT_OK;
1003 if (unlikely(trgt_out_l->isreset != 0))
1004 return RC_FLUSH_CONN_OUT_OK;
1006 if (unlikely(trgt_out_l->sourcetype == SOURCE_SOCK &&
1007 trgt_out_l->source.sock.delay_flush != 0))
1008 return RC_FLUSH_CONN_OUT_OK;
1010 if (fromqos == 0 && may_send_conn(trgt_out_l) == 0) {
1011 qos_enqueue_conn(trgt_out_l);
1012 return RC_FLUSH_CONN_OUT_CONG;
1015 while (trgt_out_l->data_buf.read_remaining >= targetmss &&
1016 get_windowlimit(trgt_out_l) >= targetmss) {
1017 struct conn_retrans *cr;
1018 struct sk_buff *skb;
1019 char *dst;
1020 int rc;
1022 if (unlikely(creditsperbyte * targetmss >
1023 trgt_out_l->credits))
1024 return RC_FLUSH_CONN_OUT_CREDITS;
1026 seqno = trgt_out_l->target.out.seqno_nextsend;
1027 skb = create_packet(trgt_out_l->target.out.nb, targetmss,
1028 GFP_ATOMIC, trgt_out_l->target.out.conn_id,
1029 seqno);
1030 if (unlikely(skb == 0)) {
1031 qos_enqueue_conn(trgt_out_l);
1032 return RC_FLUSH_CONN_OUT_OOM;
1035 cr = kmem_cache_alloc(connretrans_slab, GFP_KERNEL);
1036 if (unlikely(cr == 0)) {
1037 kfree_skb(skb);
1038 qos_enqueue_conn(trgt_out_l);
1039 return RC_FLUSH_CONN_OUT_OOM;
1042 dst = skb_put(skb, targetmss);
1044 databuf_pull(trgt_out_l, dst, targetmss);
1046 rc = dev_queue_xmit(skb);
1047 if (rc != 0) {
1048 databuf_unpull(trgt_out_l, targetmss);
1049 kmem_cache_free(connretrans_slab, cr);
1050 qos_enqueue_conn(trgt_out_l);
1051 return RC_FLUSH_CONN_OUT_CONG;
1054 trgt_out_l->credits -= creditsperbyte * targetmss;
1055 trgt_out_l->target.out.seqno_nextsend += targetmss;
1056 schedule_retransmit_conn(cr, trgt_out_l, seqno, targetmss);
1057 sent = 1;
1060 if (trgt_out_l->data_buf.read_remaining > 0 && (trgt_out_l->tos ==
1061 TOS_LATENCY || trgt_out_l->target.out.seqno_nextsend ==
1062 trgt_out_l->target.out.seqno_acked)) {
1063 struct control_msg_out *cm;
1064 struct conn_retrans *cr;
1065 __u32 len = trgt_out_l->data_buf.read_remaining;
1066 __s32 windowlimit = get_windowlimit(trgt_out_l);
1067 char *buf;
1069 if (windowlimit == 0)
1070 goto out;
1072 if (windowlimit < len/2 &&
1073 trgt_out_l->target.out.seqno_nextsend !=
1074 trgt_out_l->target.out.seqno_acked)
1075 goto out;
1077 if (len > windowlimit)
1078 len = windowlimit;
1080 buf = kmalloc(len, GFP_KERNEL);
1082 if (unlikely(creditsperbyte * len > trgt_out_l->credits))
1083 return RC_FLUSH_CONN_OUT_CREDITS;
1085 if (unlikely(buf == 0)) {
1086 qos_enqueue_conn(trgt_out_l);
1087 return RC_FLUSH_CONN_OUT_OOM;
1090 cm = alloc_control_msg(trgt_out_l->target.out.nb,
1091 ACM_PRIORITY_LOW);
1092 if (unlikely(cm == 0)) {
1093 kfree(buf);
1094 qos_enqueue_conn(trgt_out_l);
1095 return RC_FLUSH_CONN_OUT_OOM;
1098 cr = kmem_cache_alloc(connretrans_slab, GFP_KERNEL);
1099 if (unlikely(cr == 0)) {
1100 kfree(buf);
1101 free_control_msg(cm);
1102 qos_enqueue_conn(trgt_out_l);
1103 return RC_FLUSH_CONN_OUT_CONG;
1106 databuf_pull(trgt_out_l, buf, len);
1108 seqno = trgt_out_l->target.out.seqno_nextsend;
1109 trgt_out_l->credits -= creditsperbyte * len;
1110 trgt_out_l->target.out.seqno_nextsend += len;
1112 schedule_retransmit_conn(cr, trgt_out_l, seqno, len);
1114 send_conndata(cm, trgt_out_l->target.out.conn_id, seqno, buf,
1115 buf, len);
1116 sent = 1;
1119 out:
1120 if (sent)
1121 return RC_FLUSH_CONN_OUT_OK_SENT;
1123 return RC_FLUSH_CONN_OUT_OK;
1126 int __init cor_snd_init(void)
1128 connretrans_slab = kmem_cache_create("cor_connretrans",
1129 sizeof(struct conn_retrans), 8, 0, 0);
1131 if (unlikely(connretrans_slab == 0))
1132 return 1;
1134 INIT_DELAYED_WORK(&(qos_resume_work), qos_resume);
1135 qos_resume_scheduled = 0;
1137 return 0;
1140 MODULE_LICENSE("GPL");