credit system
[cor_2_6_31.git] / net / cor / snd.c
blob3117da22da139e66505c4be8c28e97c535364745
1 /*
2 * Connection oriented routing
3 * Copyright (C) 2007-2010 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/gfp.h>
22 #include <linux/jiffies.h>
23 #include <linux/slab.h>
25 #include "cor.h"
27 struct kmem_cache *connretrans_slab;
29 struct conn_retrans {
30 /* timeout_list and conn_list share a single ref */
31 struct kref ref;
32 struct list_head timeout_list;
33 struct list_head conn_list;
34 struct htab_entry htab_entry;
35 struct conn *rconn;
36 __u32 seqno;
37 __u32 length;
38 __u8 ackrcvd;
39 unsigned long timeout;
42 static void free_connretrans(struct kref *ref)
44 struct conn_retrans *cr = container_of(ref, struct conn_retrans, ref);
45 kmem_cache_free(connretrans_slab, cr);
46 kref_put(&(cr->rconn->ref), free_conn);
49 DEFINE_MUTEX(queues_lock);
50 LIST_HEAD(queues);
51 struct delayed_work qos_resume_work;
52 int qos_resume_scheduled;
54 struct qos_queue {
55 struct list_head queue_list;
57 struct net_device *dev;
59 struct list_head kpackets_waiting;
60 struct list_head conn_retrans_waiting;
61 struct list_head announce_waiting;
62 struct list_head conns_waiting;
65 /* Higherst bidder "pays" the credits the second has bid */
66 static int _resume_conns(struct qos_queue *q)
68 struct conn *best = 0;
69 __u64 bestcredit = 0;
70 __u64 secondcredit = 0;
72 int rc;
74 struct list_head *lh = q->conns_waiting.next;
76 while (lh != &(q->conns_waiting)) {
77 struct conn *rconn = container_of(lh, struct conn,
78 target.out.rb.lh);
79 __u64 credits;
81 lh = lh->next;
83 refresh_conn_credits(rconn, 0, 0);
85 mutex_lock(&(rconn->rcv_lock));
87 BUG_ON(rconn->targettype != TARGET_OUT);
89 if (atomic_read(&(rconn->isreset)) != 0) {
90 rconn->target.out.rb.in_queue = 0;
91 list_del(&(rconn->target.out.rb.lh));
92 mutex_unlock(&(rconn->rcv_lock));
93 kref_put(&(rconn->ref), free_conn);
95 continue;
98 BUG_ON(rconn->data_buf.read_remaining == 0);
100 if (may_alloc_control_msg(rconn->target.out.nb,
101 ACM_PRIORITY_MED) == 0)
102 continue;
104 if (rconn->credits <= 0)
105 credits = 0;
106 else
107 credits = multiply_div(rconn->credits,
108 ((__u64) 1) << 32,
109 rconn->data_buf.read_remaining);
110 mutex_unlock(&(rconn->rcv_lock));
112 if (best == 0 || bestcredit < credits) {
113 secondcredit = bestcredit;
114 best = rconn;
115 bestcredit = credits;
116 } else if (secondcredit < credits) {
117 secondcredit = credits;
121 if (best == 0)
122 return RC_FLUSH_CONN_OUT_OK;
124 mutex_lock(&(best->rcv_lock));
125 rc = flush_out(best, 1, (__u32) (secondcredit >> 32));
127 if (rc == RC_FLUSH_CONN_OUT_OK || rc == RC_FLUSH_CONN_OUT_OK_SENT) {
128 best->target.out.rb.in_queue = 0;
129 list_del(&(best->target.out.rb.lh));
131 mutex_unlock(&(best->rcv_lock));
133 refresh_conn_credits(best, 0, 0);
134 unreserve_sock_buffer(best);
136 if (rc == RC_FLUSH_CONN_OUT_OK_SENT)
137 wake_sender(best);
139 if (rc == RC_FLUSH_CONN_OUT_OK || rc == RC_FLUSH_CONN_OUT_OK_SENT)
140 kref_put(&(best->ref), free_conn);
142 return rc;
145 static int resume_conns(struct qos_queue *q)
147 while (list_empty(&(q->conns_waiting)) == 0) {
148 int rc = _resume_conns(q);
149 if (rc != RC_FLUSH_CONN_OUT_OK &&
150 rc != RC_FLUSH_CONN_OUT_OK_SENT)
151 return 1;
153 return 0;
156 static int send_retrans(struct neighbor *nb, int fromqos);
158 static int _qos_resume(struct qos_queue *q, int caller)
160 int rc = 0;
162 struct list_head *lh;
164 if (caller == QOS_CALLER_KPACKET)
165 lh = &(q->conn_retrans_waiting);
166 else if (caller == QOS_CALLER_CONN_RETRANS)
167 lh = &(q->kpackets_waiting);
168 else if (caller == QOS_CALLER_ANNOUNCE)
169 lh = &(q->announce_waiting);
170 else
171 BUG();
173 while (list_empty(lh) == 0) {
174 struct list_head *curr = lh->next;
175 struct resume_block *rb = container_of(curr,
176 struct resume_block, lh);
177 rb->in_queue = 0;
178 list_del(curr);
180 if (caller == QOS_CALLER_KPACKET) {
181 struct neighbor *nb = container_of(rb, struct neighbor,
182 rb_kp);
183 rc = send_messages(nb, 0, 1);
184 } else if (caller == QOS_CALLER_CONN_RETRANS) {
185 struct neighbor *nb = container_of(rb, struct neighbor,
186 rb_cr);
187 #warning todo do not send if neighbor is stalled
188 rc = send_retrans(nb, 1);
189 } else if (caller == QOS_CALLER_ANNOUNCE) {
190 struct announce_data *ann = container_of(rb,
191 struct announce_data, rb);
192 rc = send_announce_qos(ann);
193 } else {
194 BUG();
197 if (rc != 0 && rb->in_queue == 0) {
198 rb->in_queue = 1;
199 list_add(curr , lh);
200 } else {
201 if (caller == QOS_CALLER_KPACKET) {
202 kref_put(&(container_of(rb, struct neighbor,
203 rb_kp)->ref), neighbor_free);
204 } else if (caller == QOS_CALLER_CONN_RETRANS) {
205 kref_put(&(container_of(rb, struct neighbor,
206 rb_cr)->ref), neighbor_free);
207 } else if (caller == QOS_CALLER_ANNOUNCE) {
208 kref_put(&(container_of(rb,
209 struct announce_data, rb)->ref),
210 announce_data_free);
211 } else {
212 BUG();
217 if (rc != 0)
218 break;
220 return rc;
223 static void qos_resume(struct work_struct *work)
225 struct list_head *curr;
227 mutex_lock(&(queues_lock));
229 curr = queues.next;
230 while (curr != (&queues)) {
231 struct qos_queue *q = container_of(curr,
232 struct qos_queue, queue_list);
233 int i;
235 for (i=0;i<4;i++) {
236 int rc;
237 if (i == 3)
238 rc = resume_conns(q);
239 else
240 rc = _qos_resume(q, i);
242 if (rc != 0)
243 goto congested;
246 curr = curr->next;
248 if (i == 4 && unlikely(q->dev == 0)) {
249 list_del(&(q->queue_list));
250 kfree(q);
254 qos_resume_scheduled = 0;
256 if (0) {
257 congested:
258 schedule_delayed_work(&(qos_resume_work), 1);
261 mutex_unlock(&(queues_lock));
264 static struct qos_queue *get_queue(struct net_device *dev)
266 struct list_head *curr = queues.next;
267 while (curr != (&queues)) {
268 struct qos_queue *q = container_of(curr,
269 struct qos_queue, queue_list);
270 if (q->dev == dev)
271 return q;
273 return 0;
276 int destroy_queue(struct net_device *dev)
278 struct qos_queue *q;
280 mutex_lock(&(queues_lock));
282 q = get_queue(dev);
284 if (q == 0) {
285 mutex_unlock(&(queues_lock));
286 return 1;
289 q->dev = 0;
291 dev_put(dev);
293 mutex_unlock(&(queues_lock));
295 return 0;
298 int create_queue(struct net_device *dev)
300 struct qos_queue *q = kmalloc(sizeof(struct qos_queue), GFP_KERNEL);
302 if (q == 0) {
303 printk(KERN_ERR "cor: unable to allocate memory for device "
304 "queue, not enabling device");
305 return 1;
308 q->dev = dev;
309 dev_hold(dev);
311 INIT_LIST_HEAD(&(q->kpackets_waiting));
312 INIT_LIST_HEAD(&(q->conn_retrans_waiting));
313 INIT_LIST_HEAD(&(q->announce_waiting));
314 INIT_LIST_HEAD(&(q->conns_waiting));
316 mutex_lock(&(queues_lock));
317 list_add(&(q->queue_list), &queues);
318 mutex_unlock(&(queues_lock));
320 return 0;
323 void qos_enqueue(struct net_device *dev, struct resume_block *rb, int caller)
325 struct qos_queue *q;
327 mutex_lock(&(queues_lock));
329 if (rb->in_queue)
330 goto out;
332 q = get_queue(dev);
333 if (unlikely(q == 0))
334 goto out;
336 rb->in_queue = 1;
338 if (caller == QOS_CALLER_KPACKET) {
339 list_add(&(rb->lh) , &(q->conn_retrans_waiting));
340 kref_get(&(container_of(rb, struct neighbor, rb_kp)->ref));
341 } else if (caller == QOS_CALLER_CONN_RETRANS) {
342 list_add(&(rb->lh), &(q->kpackets_waiting));
343 kref_get(&(container_of(rb, struct neighbor, rb_cr)->ref));
344 } else if (caller == QOS_CALLER_ANNOUNCE) {
345 list_add(&(rb->lh), &(q->announce_waiting));
346 kref_get(&(container_of(rb, struct announce_data, rb)->ref));
347 } else if (caller == QOS_CALLER_CONN) {
348 struct conn *rconn = container_of(rb, struct conn,
349 target.out.rb);
350 mutex_lock(&(rconn->rcv_lock));
351 BUG_ON(rconn->targettype != TARGET_OUT);
352 list_add(&(rb->lh), &(q->conns_waiting));
353 kref_get(&(rconn->ref));
354 mutex_lock(&(rconn->rcv_lock));
355 } else {
356 BUG();
359 if (qos_resume_scheduled == 0) {
360 schedule_delayed_work(&(qos_resume_work), 1);
361 qos_resume_scheduled = 1;
364 out:
365 mutex_unlock(&(queues_lock));
368 void qos_remove_conn(struct conn *rconn)
370 int kref = 0;
371 mutex_lock(&(queues_lock));
372 if (rconn->targettype != TARGET_OUT)
373 goto out;
375 if (rconn->target.out.rb.in_queue == 0)
376 goto out;
378 rconn->target.out.rb.in_queue = 0;
379 list_del(&(rconn->target.out.rb.lh));
380 kref = 1;
382 out:
383 mutex_unlock(&(queues_lock));
385 if (kref)
386 kref_put(&(rconn->ref), free_conn);
389 static int may_send_conn_retrans(struct neighbor *nb)
391 struct qos_queue *q;
392 int rc = 0;
394 mutex_lock(&(queues_lock));
396 q = get_queue(nb->dev);
397 if (unlikely(q == 0))
398 goto out;
400 rc = (list_empty(&(q->kpackets_waiting)));
402 out:
403 mutex_unlock(&(queues_lock));
405 return rc;
408 static int may_send_conn(struct conn *rconn)
410 struct qos_queue *q;
411 int rc = 0;
413 mutex_lock(&(queues_lock));
415 q = get_queue(rconn->target.out.nb->dev);
416 if (unlikely(q == 0))
417 goto out;
419 rc = (list_empty(&(q->kpackets_waiting)) &&
420 list_empty(&(q->conn_retrans_waiting)) &&
421 list_empty(&(q->announce_waiting)) &&
422 list_empty(&(q->conns_waiting)));
424 out:
425 mutex_unlock(&(queues_lock));
427 return rc;
431 struct sk_buff *create_packet(struct neighbor *nb, int size,
432 gfp_t alloc_flags, __u32 conn_id, __u32 seqno)
434 struct sk_buff *ret;
435 char *dest;
437 ret = alloc_skb(size + 9 + LL_ALLOCATED_SPACE(nb->dev), alloc_flags);
438 if (unlikely(0 == ret))
439 return 0;
441 ret->protocol = htons(ETH_P_COR);
442 ret->dev = nb->dev;
444 skb_reserve(ret, LL_RESERVED_SPACE(nb->dev));
445 if(unlikely(dev_hard_header(ret, nb->dev, ETH_P_COR, nb->mac,
446 nb->dev->dev_addr, ret->len) < 0))
447 return 0;
448 skb_reset_network_header(ret);
450 dest = skb_put(ret, 9);
451 BUG_ON(0 == dest);
453 dest[0] = PACKET_TYPE_DATA;
454 dest += 1;
456 put_u32(dest, conn_id, 1);
457 dest += 4;
458 put_u32(dest, seqno, 1);
459 dest += 4;
461 return ret;
464 static void set_conn_retrans_timeout(struct conn_retrans *cr)
466 struct neighbor *nb = cr->rconn->target.out.nb;
467 cr->timeout = jiffies + usecs_to_jiffies(100000 +
468 ((__u32) atomic_read(&(nb->latency))) +
469 ((__u32) atomic_read(&(nb->max_remote_cmsg_delay))));
472 static struct conn_retrans *readd_conn_retrans(struct conn_retrans *cr,
473 struct neighbor *nb, __u32 length, int *dontsend)
475 unsigned long iflags;
477 struct conn_retrans *ret = 0;
479 spin_lock_irqsave( &(nb->retrans_lock), iflags );
481 if (unlikely(cr->ackrcvd)) {
482 *dontsend = 1;
483 goto out;
484 } else
485 *dontsend = 0;
487 if (unlikely(cr->length > length)) {
488 ret = kmem_cache_alloc(connretrans_slab, GFP_ATOMIC);
489 if (unlikely(ret == 0)) {
490 cr->timeout = jiffies + 1;
491 goto out;
494 memset(ret, 0, sizeof (struct conn_retrans));
495 ret->rconn = cr->rconn;
496 kref_get(&(cr->rconn->ref));
497 ret->seqno = cr->seqno + length;
498 ret->length = cr->length - length;
499 kref_init(&(ret->ref));
501 list_add(&(ret->timeout_list), &(nb->retrans_list_conn));
502 list_add(&(ret->conn_list), &(cr->conn_list));
504 cr->length = length;
505 } else {
506 list_del(&(cr->timeout_list));
507 list_add_tail(&(cr->timeout_list), &(nb->retrans_list_conn));
508 set_conn_retrans_timeout(cr);
510 BUG_ON(cr->length != length);
513 out:
514 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
516 return ret;
519 /* rcvlock *must* be held while calling this */
520 void cancel_retrans(struct conn *rconn)
522 unsigned long iflags;
523 struct neighbor *nb = rconn->target.out.nb;
525 spin_lock_irqsave( &(nb->retrans_lock), iflags );
527 while (list_empty(&(rconn->target.out.retrans_list)) == 0) {
528 struct conn_retrans *cr = container_of(
529 rconn->target.out.retrans_list.next,
530 struct conn_retrans, conn_list);
531 BUG_ON(cr->rconn != rconn);
533 list_del(&(cr->timeout_list));
534 list_del(&(cr->conn_list));
535 cr->ackrcvd = 1;
536 kref_put(&(cr->ref), free_connretrans);
539 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
542 static int _send_retrans(struct neighbor *nb, struct conn_retrans *cr)
544 int targetmss = mss(nb);
545 int dontsend;
546 int queuefull = 0;
548 mutex_lock(&(cr->rconn->rcv_lock));
550 BUG_ON(cr->rconn->targettype != TARGET_OUT);
551 BUG_ON(cr->rconn->target.out.nb != nb);
553 kref_get(&(cr->rconn->ref));
555 if (unlikely(atomic_read(&(cr->rconn->isreset)) != 0)) {
556 cancel_retrans(cr->rconn);
557 goto out;
560 while (cr->length >= targetmss) {
561 struct sk_buff *skb;
562 char *dst;
563 struct conn_retrans *cr2;
564 int rc;
566 if (may_send_conn_retrans(nb) == 0)
567 goto qos_enqueue;
569 skb = create_packet(nb, targetmss, GFP_KERNEL,
570 cr->rconn->target.out.conn_id, cr->seqno);
571 if (unlikely(skb == 0)) {
572 cr->timeout = jiffies + 1;
573 goto out;
576 cr2 = readd_conn_retrans(cr, nb, targetmss, &dontsend);
577 if (unlikely(unlikely(dontsend) || unlikely(cr2 == 0 &&
578 unlikely(cr->length > targetmss)))) {
579 kfree_skb(skb);
580 goto out;
583 dst = skb_put(skb, targetmss);
585 databuf_pullold(cr->rconn, cr->seqno, dst, targetmss);
586 rc = dev_queue_xmit(skb);
588 if (rc != 0) {
589 unsigned long iflags;
591 spin_lock_irqsave( &(nb->retrans_lock), iflags );
592 if (unlikely(cr->ackrcvd)) {
593 dontsend = 1;
594 } else {
595 list_del(&(cr->timeout_list));
596 list_add(&(cr->timeout_list),
597 &(nb->retrans_list_conn));
599 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
600 if (dontsend == 0)
601 goto qos_enqueue;
604 cr = cr2;
606 if (likely(cr == 0))
607 goto out;
610 if (unlikely(cr->length <= 0)) {
611 BUG();
612 } else {
613 struct control_msg_out *cm;
614 char *buf = kmalloc(cr->length, GFP_KERNEL);
616 if (unlikely(buf == 0)) {
617 cr->timeout = jiffies + 1;
618 goto out;
621 cm = alloc_control_msg(nb, ACM_PRIORITY_MED);
622 if (unlikely(cm == 0)) {
623 cr->timeout = jiffies + 1;
624 kfree(buf);
625 goto out;
628 databuf_pullold(cr->rconn, cr->seqno, buf, cr->length);
630 if (unlikely(readd_conn_retrans(cr, nb, cr->length, &dontsend)
631 != 0))
632 BUG();
634 if (likely(dontsend == 0)) {
635 send_conndata(cm, cr->rconn->target.out.conn_id,
636 cr->seqno, buf, buf, cr->length);
640 if (0) {
641 qos_enqueue:
642 queuefull = 1;
644 out:
645 mutex_unlock(&(cr->rconn->rcv_lock));
647 kref_put(&(cr->rconn->ref), free_conn);
649 return queuefull;
652 static int send_retrans(struct neighbor *nb, int fromqos)
654 unsigned long iflags;
656 struct conn_retrans *cr = 0;
658 int nbstate;
659 int rescheduled = 0;
660 int queuefull = 0;
662 spin_lock_irqsave( &(nb->state_lock), iflags );
663 nbstate = nb->state;
664 spin_unlock_irqrestore( &(nb->state_lock), iflags );
666 while (1) {
667 spin_lock_irqsave( &(nb->retrans_lock), iflags );
669 if (list_empty(&(nb->retrans_list_conn))) {
670 nb->retrans_timer_conn_running = 0;
671 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
672 break;
675 cr = container_of(nb->retrans_list_conn.next,
676 struct conn_retrans, timeout_list);
678 BUG_ON(cr->rconn->targettype != TARGET_OUT);
680 if (unlikely(unlikely(nbstate == NEIGHBOR_STATE_KILLED) ||
681 unlikely(atomic_read(
682 &(cr->rconn->isreset)) != 0))) {
683 list_del(&(cr->timeout_list));
684 list_del(&(cr->conn_list));
685 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
687 kref_put(&(cr->ref), free_connretrans);
688 continue;
691 BUG_ON(nb != cr->rconn->target.out.nb);
693 #warning todo check window limit
695 if (time_after(cr->timeout, jiffies)) {
696 schedule_delayed_work(&(nb->retrans_timer_conn),
697 cr->timeout - jiffies);
698 rescheduled = 1;
699 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
700 break;
703 kref_get(&(cr->ref));
704 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
705 queuefull = _send_retrans(nb, cr);
706 kref_put(&(cr->ref), free_connretrans);
707 if (queuefull) {
708 rescheduled = 1;
709 if (fromqos == 0)
710 qos_enqueue(nb->dev, &(nb->rb_cr),
711 QOS_CALLER_CONN_RETRANS);
712 break;
716 if (rescheduled == 0)
717 kref_put(&(nb->ref), neighbor_free);
719 return queuefull;
722 void retransmit_conn_timerfunc(struct work_struct *work)
724 struct neighbor *nb = container_of(to_delayed_work(work),
725 struct neighbor, retrans_timer_conn);
727 send_retrans(nb, 0);
730 static struct conn_retrans *search_seqno(struct conn *rconn, __u32 seqno)
732 struct list_head *next = rconn->target.out.retrans_list.next;
734 while (next != &(rconn->target.out.retrans_list)) {
735 struct conn_retrans *cr = container_of(next,
736 struct conn_retrans, conn_list);
737 BUG_ON(cr->rconn != rconn);
738 if (cr->seqno + cr->length > seqno)
739 return cr;
740 next = next->next;
742 return 0;
745 void conn_ack_rcvd(__u32 kpacket_seqno, struct conn *rconn, __u32 seqno,
746 __u8 window, __u32 seqno_ooo, __u32 length)
748 unsigned long iflags;
749 struct neighbor *nb = rconn->target.out.nb;
750 struct conn_retrans *cr = 0;
752 int setwindow = 0;
754 BUG_ON(rconn->targettype != TARGET_OUT);
756 mutex_lock(&(rconn->rcv_lock));
758 if (unlikely(((__s32) (seqno - rconn->target.out.seqno_nextsend)) > 0))
759 goto out;
761 spin_lock_irqsave( &(nb->retrans_lock), iflags );
763 if (likely(length == 0))
764 goto in_order;
766 cr = search_seqno(rconn, seqno_ooo);
768 while (cr != 0) {
769 struct list_head *next = cr->conn_list.next;
770 struct conn_retrans *nextcr = 0;
771 if (next != &(rconn->target.out.retrans_list)) {
772 nextcr = container_of(next, struct conn_retrans,
773 conn_list);
776 if (((__s32)(cr->seqno + cr->length - seqno_ooo - length)) >0) {
777 __u32 newseqno = seqno_ooo + length;
778 cr->length -= (newseqno - cr->seqno);
779 cr->seqno = newseqno;
780 break;
781 } else {
782 list_del(&(cr->timeout_list));
783 list_del(&(cr->conn_list));
784 cr->ackrcvd = 1;
785 kref_put(&(cr->ref), free_connretrans);
788 cr = nextcr;
791 if (unlikely(list_empty(&(rconn->target.out.retrans_list))) == 0) {
792 struct conn_retrans *cr = container_of(
793 rconn->target.out.retrans_list.next,
794 struct conn_retrans, conn_list);
795 if (unlikely(((__s32) (cr->seqno -
796 rconn->target.out.seqno_acked)) > 0)) {
797 rconn->target.out.seqno_acked = cr->seqno;
801 in_order:
802 if (likely(((__s32) (seqno - rconn->target.out.seqno_acked)) > 0)) {
803 rconn->target.out.seqno_acked = seqno;
804 setwindow = 1;
807 cr = search_seqno(rconn, seqno_ooo);
809 while (cr != 0) {
810 struct list_head *next = cr->conn_list.next;
811 struct conn_retrans *nextcr = 0;
812 if (next != &(rconn->target.out.retrans_list)) {
813 nextcr = container_of(next, struct conn_retrans,
814 conn_list);
817 if (((__s32)(cr->seqno + cr->length -
818 rconn->target.out.seqno_acked)) <= 0) {
819 list_del(&(cr->timeout_list));
820 list_del(&(cr->conn_list));
821 cr->ackrcvd = 1;
822 kref_put(&(cr->ref), free_connretrans);
825 cr = nextcr;
828 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
829 databuf_ack(rconn, rconn->target.out.seqno_acked);
831 setwindow = setwindow || (seqno == rconn->target.out.seqno_acked &&
832 (kpacket_seqno - rconn->target.out.kp_windowsetseqno >
833 0));
834 if (setwindow) {
835 rconn->target.out.kp_windowsetseqno = kpacket_seqno;
836 rconn->target.out.seqno_windowlimit = seqno +
837 dec_log_64_11(window);
841 out:
842 mutex_unlock(&(rconn->rcv_lock));
844 flush_buf(rconn);
847 static void schedule_retransmit_conn(struct conn_retrans *cr,
848 struct conn *rconn, __u32 seqno, __u32 len)
850 unsigned long iflags;
852 struct neighbor *nb = rconn->target.out.nb;
854 int first;
856 BUG_ON(rconn->targettype != TARGET_OUT);
858 memset(cr, 0, sizeof (struct conn_retrans));
859 cr->rconn = rconn;
860 kref_get(&(rconn->ref));
861 cr->seqno = seqno;
862 cr->length = len;
863 kref_init(&(cr->ref));
864 set_conn_retrans_timeout(cr);
866 spin_lock_irqsave( &(nb->retrans_lock), iflags );
868 first = unlikely(list_empty(&(nb->retrans_list_conn)));
869 list_add_tail(&(cr->timeout_list), &(nb->retrans_list_conn));
871 list_add_tail(&(cr->conn_list), &(rconn->target.out.retrans_list));
873 if (unlikely(unlikely(first) &&
874 unlikely(nb->retrans_timer_conn_running == 0))) {
875 schedule_delayed_work(&(nb->retrans_timer_conn),
876 cr->timeout - jiffies);
877 nb->retrans_timer_conn_running = 1;
878 kref_get(&(nb->ref));
881 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
884 static __u32 get_windowlimit(struct conn *rconn)
886 __s32 windowlimit = (__s32)(rconn->target.out.seqno_windowlimit -
887 rconn->target.out.seqno_nextsend);
888 if (unlikely(windowlimit < 0))
889 return 0;
890 return windowlimit;
893 #warning todo reset connections which are SOURCE_NONE and are stuck for too long
894 int flush_out(struct conn *rconn, int fromqos, __u32 creditsperbyte)
896 int targetmss = mss(rconn->target.out.nb);
897 __u32 seqno;
898 int sent = 0;
900 BUG_ON(rconn->targettype != TARGET_OUT);
902 if (unlikely(rconn->target.out.conn_id == 0))
903 return RC_FLUSH_CONN_OUT_OK;
905 if (unlikely(atomic_read(&(rconn->isreset)) != 0))
906 return RC_FLUSH_CONN_OUT_OK;
908 if (unlikely(rconn->sourcetype == SOURCE_SOCK &&
909 rconn->source.sock.delay_flush != 0))
910 return RC_FLUSH_CONN_OUT_OK;
912 if (fromqos == 0 && may_send_conn(rconn) == 0)
913 return RC_FLUSH_CONN_OUT_CONG;
915 while (rconn->data_buf.read_remaining >= targetmss &&
916 get_windowlimit(rconn) >= targetmss) {
917 struct conn_retrans *cr;
918 struct sk_buff *skb;
919 char *dst;
920 int rc;
922 if (unlikely(creditsperbyte * targetmss >
923 rconn->credits))
924 return RC_FLUSH_CONN_OUT_CREDITS;
926 seqno = rconn->target.out.seqno_nextsend;
927 skb = create_packet(rconn->target.out.nb, targetmss, GFP_ATOMIC,
928 rconn->target.out.conn_id, seqno);
929 if (unlikely(skb == 0))
930 return RC_FLUSH_CONN_OUT_OOM;
932 cr = kmem_cache_alloc(connretrans_slab, GFP_KERNEL);
933 if (unlikely(cr == 0)) {
934 kfree_skb(skb);
935 return RC_FLUSH_CONN_OUT_OOM;
938 dst = skb_put(skb, targetmss);
940 databuf_pull(rconn, dst, targetmss);
942 rc = dev_queue_xmit(skb);
943 if (rc != 0) {
944 databuf_unpull(rconn, targetmss);
945 kmem_cache_free(connretrans_slab, cr);
946 return RC_FLUSH_CONN_OUT_CONG;
949 rconn->credits -= creditsperbyte * targetmss;
950 rconn->target.out.seqno_nextsend += targetmss;
951 schedule_retransmit_conn(cr, rconn, seqno, targetmss);
952 sent = 1;
955 if (rconn->data_buf.read_remaining > 0 && (rconn->tos == TOS_LATENCY ||
956 rconn->target.out.seqno_nextsend ==
957 rconn->target.out.seqno_acked)) {
958 struct control_msg_out *cm;
959 struct conn_retrans *cr;
960 __u32 len = rconn->data_buf.read_remaining;
961 __s32 windowlimit = get_windowlimit(rconn);
962 char *buf;
964 if (windowlimit == 0)
965 goto out;
967 if (windowlimit < len/2 && rconn->target.out.seqno_nextsend !=
968 rconn->target.out.seqno_acked)
969 goto out;
971 if (len > windowlimit)
972 len = windowlimit;
974 buf = kmalloc(len, GFP_KERNEL);
976 if (unlikely(creditsperbyte * len > rconn->credits))
977 return RC_FLUSH_CONN_OUT_CREDITS;
979 if (unlikely(buf == 0))
980 return RC_FLUSH_CONN_OUT_OOM;
982 cm = alloc_control_msg(rconn->target.out.nb, ACM_PRIORITY_MED);
983 if (unlikely(cm == 0)) {
984 kfree(buf);
985 return RC_FLUSH_CONN_OUT_OOM;
988 cr = kmem_cache_alloc(connretrans_slab, GFP_KERNEL);
989 if (unlikely(cr == 0)) {
990 kfree(buf);
991 free_control_msg(cm);
992 return RC_FLUSH_CONN_OUT_CONG;
995 databuf_pull(rconn, buf, len);
997 seqno = rconn->target.out.seqno_nextsend;
998 rconn->credits -= creditsperbyte * len;
999 rconn->target.out.seqno_nextsend += len;
1001 schedule_retransmit_conn(cr, rconn, seqno, len);
1003 send_conndata(cm, rconn->target.out.conn_id, seqno, buf, buf,
1004 len);
1005 sent = 1;
1008 out:
1009 if (sent)
1010 return RC_FLUSH_CONN_OUT_OK_SENT;
1012 return RC_FLUSH_CONN_OUT_OK;
1015 int __init cor_snd_init(void)
1017 connretrans_slab = kmem_cache_create("cor_connretrans",
1018 sizeof(struct conn_retrans), 8, 0, 0);
1020 if (unlikely(connretrans_slab == 0))
1021 return 1;
1023 INIT_DELAYED_WORK(&(qos_resume_work), qos_resume);
1024 qos_resume_scheduled = 0;
1026 return 0;
1029 MODULE_LICENSE("GPL");