window limits
[cor_2_6_31.git] / net / cor / snd.c
blob11c92152bc4f4b6973403db7efb48f1a1159564a
1 /*
2 * Connection oriented routing
3 * Copyright (C) 2007-2008 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/gfp.h>
22 #include <linux/jiffies.h>
23 #include <linux/slab.h>
25 #include "cor.h"
27 struct kmem_cache *connretrans_slab;
29 struct conn_retrans {
30 /* timeout_list and conn_list share a single ref */
31 struct kref ref;
32 struct list_head timeout_list;
33 struct list_head conn_list;
34 struct htab_entry htab_entry;
35 struct conn *rconn;
36 __u32 seqno;
37 __u32 length;
38 __u8 ackrcvd;
39 unsigned long timeout;
42 static void free_connretrans(struct kref *ref)
44 struct conn_retrans *cr = container_of(ref, struct conn_retrans, ref);
45 kmem_cache_free(connretrans_slab, cr);
46 kref_put(&(cr->rconn->ref), free_conn);
49 DEFINE_MUTEX(queues_lock);
50 LIST_HEAD(queues);
51 struct delayed_work qos_resume_work;
53 struct qos_queue {
54 struct list_head queue_list;
56 struct net_device *dev;
58 struct list_head kpackets_waiting;
59 struct list_head conn_retrans_waiting;
60 struct list_head announce_waiting;
61 struct list_head conns_waiting;
64 #define RC_FLUSH_CONN_OK 0
65 #define RC_FLUSH_CONN_CONG 1
66 #define RC_FLUSH_CONN_CREDITS 2
68 static int _flush_out(struct conn *rconn, int fromqos, __u32 creditsperbyte);
70 /* Higherst bidder "pays" the credits the second has bid */
71 static int _resume_conns(struct qos_queue *q)
73 struct conn *best = 0;
74 __u64 bestcredit = 0;
75 __u64 secondcredit = 0;
77 int rc;
79 struct list_head *lh = q->conns_waiting.next;
81 while (lh != &(q->conns_waiting)) {
82 struct conn *rconn = container_of(lh, struct conn,
83 target.out.rb.lh);
84 __u64 credits;
86 refresh_conn_credits(rconn);
88 mutex_lock(&(rconn->rcv_lock));
90 BUG_ON(rconn->targettype != TARGET_OUT);
91 BUG_ON(rconn->buf.read_remaining);
93 if (rconn->credits <= 0)
94 credits = 0;
95 else
96 credits = multiply_div(rconn->credits,
97 ((__u64) 1) << 32,
98 rconn->buf.read_remaining);
99 mutex_unlock(&(rconn->rcv_lock));
101 if (best == 0 || bestcredit < credits) {
102 secondcredit = bestcredit;
103 best = rconn;
104 bestcredit = credits;
105 } else if (secondcredit < credits) {
106 secondcredit = credits;
109 lh = lh->next;
112 mutex_lock(&(best->rcv_lock));
113 rc = _flush_out(best, 1, (__u32) (secondcredit >> 32));
115 if (rc == 0) {
116 best->target.out.rb.in_queue = 0;
117 list_del(&(best->target.out.rb.lh));
119 mutex_unlock(&(best->rcv_lock));
121 return rc;
124 static int resume_conns(struct qos_queue *q)
126 while (list_empty(&(q->conns_waiting))) {
127 int rc = _resume_conns(q);
128 if (rc != 0)
129 return rc;
131 return 0;
134 static int send_retrans(struct neighbor *nb);
136 static int _qos_resume(struct qos_queue *q, int caller)
138 int rc = 0;
140 struct list_head *lh;
142 if (caller == QOS_CALLER_KPACKET)
143 lh = &(q->conn_retrans_waiting);
144 else if (caller == QOS_CALLER_CONN_RETRANS)
145 lh = &(q->kpackets_waiting);
146 else if (caller == QOS_CALLER_ANNOUNCE)
147 lh = &(q->announce_waiting);
148 else
149 BUG();
151 while (list_empty(lh) == 0) {
152 struct list_head *curr = lh->next;
153 struct resume_block *rb = container_of(curr,
154 struct resume_block, lh);
155 rb->in_queue = 0;
156 list_del(curr);
158 mutex_unlock(&(queues_lock));
160 if (caller == QOS_CALLER_KPACKET) {
161 struct neighbor *nb = container_of(rb, struct neighbor,
162 rb_kp);
163 rc = resume_send_messages(nb);
164 } else if (caller == QOS_CALLER_CONN_RETRANS) {
165 struct neighbor *nb = container_of(rb, struct neighbor,
166 rb_cr);
167 #warning todo do not send if neighbor is stalled
168 rc = send_retrans(nb);
169 } else if (caller == QOS_CALLER_ANNOUNCE) {
170 struct announce_data *ann = container_of(rb,
171 struct announce_data, rb);
172 rc = send_announce_qos(ann);
173 } else {
174 BUG();
177 mutex_lock(&(queues_lock));
179 if (rc != 0 && rb->in_queue == 0) {
180 rb->in_queue = 1;
181 list_add(curr , lh);
182 } else {
183 if (caller == QOS_CALLER_KPACKET) {
184 kref_put(&(container_of(rb, struct neighbor,
185 rb_kp)->ref), neighbor_free);
186 } else if (caller == QOS_CALLER_CONN_RETRANS) {
187 kref_put(&(container_of(rb, struct neighbor,
188 rb_cr)->ref), neighbor_free);
189 } else if (caller == QOS_CALLER_ANNOUNCE) {
190 kref_put(&(container_of(rb,
191 struct announce_data, rb)->ref),
192 announce_data_free);
193 } else {
194 BUG();
199 if (rc != 0)
200 break;
202 return rc;
205 static void qos_resume(struct work_struct *work)
207 struct list_head *curr;
209 mutex_lock(&(queues_lock));
211 curr = queues.next;
212 while (curr != (&queues)) {
213 struct qos_queue *q = container_of(curr,
214 struct qos_queue, queue_list);
215 int i;
217 for (i=0;i<4;i++) {
218 int rc;
219 if (i == 3)
220 rc = resume_conns(q);
221 else
222 rc = _qos_resume(q, i);
224 if (rc != 0)
225 goto congested;
228 curr = curr->next;
230 if (i == 4 && unlikely(q->dev == 0)) {
231 list_del(&(q->queue_list));
232 kfree(q);
236 if (0) {
237 congested:
238 schedule_delayed_work(&(qos_resume_work), 1);
241 mutex_unlock(&(queues_lock));
244 static struct qos_queue *get_queue(struct net_device *dev)
246 struct list_head *curr = queues.next;
247 while (curr != (&queues)) {
248 struct qos_queue *q = container_of(curr,
249 struct qos_queue, queue_list);
250 if (q->dev == dev)
251 return q;
253 return 0;
256 int destroy_queue(struct net_device *dev)
258 struct qos_queue *q;
260 mutex_lock(&(queues_lock));
262 q = get_queue(dev);
264 if (q == 0) {
265 mutex_unlock(&(queues_lock));
266 return 1;
269 q->dev = 0;
271 dev_put(dev);
273 mutex_unlock(&(queues_lock));
275 return 0;
278 int create_queue(struct net_device *dev)
280 struct qos_queue *q = kmalloc(sizeof(struct qos_queue), GFP_KERNEL);
282 if (q == 0) {
283 printk(KERN_ERR "cor: unable to allocate memory for device "
284 "queue, not enabling device");
285 return 1;
288 q->dev = dev;
289 dev_hold(dev);
291 INIT_LIST_HEAD(&(q->kpackets_waiting));
292 INIT_LIST_HEAD(&(q->conn_retrans_waiting));
293 INIT_LIST_HEAD(&(q->announce_waiting));
294 INIT_LIST_HEAD(&(q->conns_waiting));
296 mutex_lock(&(queues_lock));
297 list_add(&(q->queue_list), &queues);
298 mutex_unlock(&(queues_lock));
300 return 0;
303 void qos_enqueue(struct net_device *dev, struct resume_block *rb, int caller)
305 struct qos_queue *q;
307 mutex_lock(&(queues_lock));
309 if (rb->in_queue)
310 goto out;
312 q = get_queue(dev);
313 if (unlikely(q == 0))
314 goto out;
316 rb->in_queue = 1;
318 if (caller == QOS_CALLER_KPACKET) {
319 list_add(&(rb->lh) , &(q->conn_retrans_waiting));
320 kref_get(&(container_of(rb, struct neighbor, rb_kp)->ref));
321 } else if (caller == QOS_CALLER_CONN_RETRANS) {
322 list_add(&(rb->lh), &(q->kpackets_waiting));
323 kref_get(&(container_of(rb, struct neighbor, rb_cr)->ref));
324 } else if (caller == QOS_CALLER_ANNOUNCE) {
325 list_add(&(rb->lh), &(q->announce_waiting));
326 kref_get(&(container_of(rb, struct announce_data, rb)->ref));
327 } else {
328 BUG();
331 out:
332 mutex_unlock(&(queues_lock));
335 void qos_enqueue_kpacket(struct neighbor *nb)
337 qos_enqueue(nb->dev, &(nb->rb_kp), QOS_CALLER_KPACKET);
340 static void qos_enqueue_conn_retrans(struct neighbor *nb)
342 qos_enqueue(nb->dev, &(nb->rb_cr), QOS_CALLER_CONN_RETRANS);
345 static void qos_enqueue_conn(struct conn *rconn)
347 BUG_ON(rconn->targettype != TARGET_OUT);
348 qos_enqueue(rconn->target.out.nb->dev, &(rconn->target.out.rb),
349 QOS_CALLER_CONN);
352 static int may_send_conn_retrans(struct neighbor *nb)
354 struct qos_queue *q;
355 int rc = 0;
357 mutex_lock(&(queues_lock));
359 q = get_queue(nb->dev);
360 if (unlikely(q == 0))
361 goto out;
363 rc = (list_empty(&(q->kpackets_waiting)));
365 out:
366 mutex_unlock(&(queues_lock));
368 return rc;
371 static int may_send_conn(struct conn *rconn)
373 struct qos_queue *q;
374 int rc = 0;
376 mutex_lock(&(queues_lock));
378 q = get_queue(rconn->target.out.nb->dev);
379 if (unlikely(q == 0))
380 goto out;
382 rc = (list_empty(&(q->kpackets_waiting)) &&
383 list_empty(&(q->conn_retrans_waiting)) &&
384 list_empty(&(q->announce_waiting)) &&
385 list_empty(&(q->conns_waiting)));
387 out:
388 mutex_unlock(&(queues_lock));
390 return rc;
394 struct sk_buff *create_packet(struct neighbor *nb, int size,
395 gfp_t alloc_flags, __u32 conn_id, __u32 seqno)
397 struct sk_buff *ret;
398 char *dest;
400 ret = alloc_skb(size + 9 + LL_ALLOCATED_SPACE(nb->dev), alloc_flags);
401 if (unlikely(0 == ret))
402 return 0;
404 ret->protocol = htons(ETH_P_COR);
405 ret->dev = nb->dev;
407 skb_reserve(ret, LL_RESERVED_SPACE(nb->dev));
408 if(unlikely(dev_hard_header(ret, nb->dev, ETH_P_COR, nb->mac,
409 nb->dev->dev_addr, ret->len) < 0))
410 return 0;
411 skb_reset_network_header(ret);
413 dest = skb_put(ret, 9);
414 BUG_ON(0 == dest);
416 dest[0] = PACKET_TYPE_DATA;
417 dest += 1;
419 put_u32(dest, conn_id, 1);
420 dest += 4;
421 put_u32(dest, seqno, 1);
422 dest += 4;
424 return ret;
427 static void set_conn_retrans_timeout(struct conn_retrans *cr)
429 struct neighbor *nb = cr->rconn->target.out.nb;
430 cr->timeout = jiffies + usecs_to_jiffies(100000 +
431 ((__u32) atomic_read(&(nb->latency))) +
432 ((__u32) atomic_read(&(nb->max_remote_cmsg_delay))));
435 static struct conn_retrans *readd_conn_retrans(struct conn_retrans *cr,
436 struct neighbor *nb, __u32 length, int *dontsend)
438 unsigned long iflags;
440 struct conn_retrans *ret = 0;
442 spin_lock_irqsave( &(nb->retrans_lock), iflags );
444 if (unlikely(cr->ackrcvd)) {
445 *dontsend = 1;
446 goto out;
447 } else
448 *dontsend = 0;
450 if (unlikely(cr->length > length)) {
451 ret = kmem_cache_alloc(connretrans_slab, GFP_ATOMIC);
452 if (unlikely(ret == 0)) {
453 cr->timeout = jiffies + 1;
454 goto out;
457 memset(ret, 0, sizeof (struct conn_retrans));
458 ret->rconn = cr->rconn;
459 kref_get(&(cr->rconn->ref));
460 ret->seqno = cr->seqno + length;
461 ret->length = cr->length - length;
462 kref_init(&(ret->ref));
464 list_add(&(ret->timeout_list), &(nb->retrans_list_conn));
465 list_add(&(ret->conn_list), &(cr->conn_list));
467 cr->length = length;
468 } else {
469 list_del(&(cr->timeout_list));
470 list_add_tail(&(cr->timeout_list), &(nb->retrans_list_conn));
471 set_conn_retrans_timeout(cr);
473 BUG_ON(cr->length != length);
476 out:
477 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
479 return ret;
482 /* rcvlock *must* be held while calling this */
483 void cancel_retrans(struct conn *rconn)
485 unsigned long iflags;
486 struct neighbor *nb = rconn->target.out.nb;
488 spin_lock_irqsave( &(nb->retrans_lock), iflags );
490 while (list_empty(&(rconn->target.out.retrans_list)) == 0) {
491 struct conn_retrans *cr = container_of(
492 rconn->target.out.retrans_list.next,
493 struct conn_retrans, conn_list);
494 BUG_ON(cr->rconn != rconn);
496 list_del(&(cr->timeout_list));
497 list_del(&(cr->conn_list));
498 cr->ackrcvd = 1;
499 kref_put(&(cr->ref), free_connretrans);
502 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
505 static int _send_retrans(struct neighbor *nb, struct conn_retrans *cr)
507 int targetmss = mss(nb);
508 int dontsend;
509 int queuefull = 0;
511 mutex_lock(&(cr->rconn->rcv_lock));
513 BUG_ON(cr->rconn->targettype != TARGET_OUT);
514 BUG_ON(cr->rconn->target.out.nb != nb);
516 kref_get(&(cr->rconn->ref));
518 if (unlikely(atomic_read(&(cr->rconn->isreset)) != 0)) {
519 cancel_retrans(cr->rconn);
520 goto out;
523 while (cr->length >= targetmss) {
524 struct sk_buff *skb;
525 char *dst;
526 struct conn_retrans *cr2;
527 int rc;
529 if (may_send_conn_retrans(nb) == 0)
530 goto qos_enqueue;
532 skb = create_packet(nb, targetmss, GFP_KERNEL,
533 cr->rconn->target.out.conn_id, cr->seqno);
534 if (unlikely(skb == 0)) {
535 cr->timeout = jiffies + 1;
536 goto out;
539 cr2 = readd_conn_retrans(cr, nb, targetmss, &dontsend);
540 if (unlikely(unlikely(dontsend) || unlikely(cr2 == 0 &&
541 unlikely(cr->length > targetmss)))) {
542 kfree_skb(skb);
543 goto out;
546 dst = skb_put(skb, targetmss);
548 databuf_pullold(&(cr->rconn->buf), cr->seqno, dst, targetmss);
549 rc = dev_queue_xmit(skb);
551 if (rc != 0) {
552 unsigned long iflags;
554 spin_lock_irqsave( &(nb->retrans_lock), iflags );
555 if (unlikely(cr->ackrcvd)) {
556 dontsend = 1;
557 } else {
558 list_del(&(cr->timeout_list));
559 list_add(&(cr->timeout_list),
560 &(nb->retrans_list_conn));
562 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
563 if (dontsend == 0)
564 goto qos_enqueue;
567 cr = cr2;
569 if (likely(cr == 0))
570 goto out;
573 if (unlikely(cr->length <= 0)) {
574 BUG();
575 } else {
576 struct control_msg_out *cm;
577 char *buf = kmalloc(cr->length, GFP_KERNEL);
579 if (unlikely(buf == 0)) {
580 cr->timeout = jiffies + 1;
581 goto out;
584 cm = alloc_control_msg(nb, ACM_PRIORITY_MED);
585 if (unlikely(cm == 0)) {
586 cr->timeout = jiffies + 1;
587 kfree(buf);
588 goto out;
591 databuf_pullold(&(cr->rconn->buf), cr->seqno, buf, cr->length);
593 if (unlikely(readd_conn_retrans(cr, nb, cr->length, &dontsend)
594 != 0))
595 BUG();
597 if (likely(dontsend == 0)) {
598 send_conndata(cm, cr->rconn->target.out.conn_id,
599 cr->seqno, buf, buf, cr->length);
603 if (0) {
604 qos_enqueue:
605 qos_enqueue_conn_retrans(nb);
606 queuefull = 1;
608 out:
609 mutex_unlock(&(cr->rconn->rcv_lock));
611 kref_put(&(cr->rconn->ref), free_conn);
613 return queuefull;
616 static int send_retrans(struct neighbor *nb)
618 unsigned long iflags;
620 struct conn_retrans *cr = 0;
622 int nbstate;
623 int nbput = 1;
624 int queuefull = 0;
626 spin_lock_irqsave( &(nb->state_lock), iflags );
627 nbstate = nb->state;
628 spin_unlock_irqrestore( &(nb->state_lock), iflags );
630 while (1) {
631 spin_lock_irqsave( &(nb->retrans_lock), iflags );
633 if (list_empty(&(nb->retrans_list_conn))) {
634 nb->retrans_timer_conn_running = 0;
635 break;
638 cr = container_of(nb->retrans_list_conn.next,
639 struct conn_retrans, timeout_list);
641 BUG_ON(cr->rconn->targettype != TARGET_OUT);
643 if (unlikely(unlikely(nbstate == NEIGHBOR_STATE_KILLED) ||
644 unlikely(atomic_read(
645 &(cr->rconn->isreset)) != 0))) {
646 list_del(&(cr->timeout_list));
647 list_del(&(cr->conn_list));
648 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
650 kref_put(&(cr->ref), free_connretrans);
651 continue;
654 BUG_ON(nb != cr->rconn->target.out.nb);
656 if (time_after(cr->timeout, jiffies)) {
657 schedule_delayed_work(&(nb->retrans_timer_conn),
658 cr->timeout - jiffies);
659 nbput = 0;
660 break;
663 kref_get(&(cr->ref));
664 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
665 queuefull = _send_retrans(nb, cr);
666 kref_put(&(cr->ref), free_connretrans);
667 if (queuefull)
668 goto out;
671 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
673 out:
674 if (nbput)
675 kref_put(&(nb->ref), neighbor_free);
677 return queuefull;
680 void retransmit_conn_timerfunc(struct work_struct *work)
682 struct neighbor *nb = container_of(to_delayed_work(work),
683 struct neighbor, retrans_timer_conn);
685 send_retrans(nb);
688 static struct conn_retrans *search_seqno(struct conn *rconn, __u32 seqno)
690 struct list_head *next = rconn->target.out.retrans_list.next;
692 while (next != &(rconn->target.out.retrans_list)) {
693 struct conn_retrans *cr = container_of(next,
694 struct conn_retrans, conn_list);
695 BUG_ON(cr->rconn != rconn);
696 if (cr->seqno + cr->length > seqno)
697 return cr;
698 next = next->next;
700 return 0;
703 void conn_ack_rcvd(__u32 kpacket_seqno, struct conn *rconn, __u32 seqno,
704 __u8 window, __u32 seqno_ooo, __u32 length)
706 unsigned long iflags;
707 struct neighbor *nb = rconn->target.out.nb;
708 struct conn_retrans *cr = 0;
710 int setwindow = 0;
712 BUG_ON(rconn->targettype != TARGET_OUT);
714 mutex_lock(&(rconn->rcv_lock));
716 if (unlikely(seqno - rconn->target.out.seqno_nextsend > 0))
717 goto out;
719 spin_lock_irqsave( &(nb->retrans_lock), iflags );
721 if (likely(length == 0))
722 goto in_order;
724 cr = search_seqno(rconn, seqno_ooo);
726 while (cr != 0) {
727 struct list_head *next = cr->conn_list.next;
728 struct conn_retrans *nextcr = 0;
729 if (next != &(rconn->target.out.retrans_list)) {
730 nextcr = container_of(next, struct conn_retrans,
731 conn_list);
734 if (((__s32)(cr->seqno + cr->length - seqno_ooo - length)) > 0) {
735 __u32 newseqno = seqno_ooo + length;
736 cr->length -= (newseqno - cr->seqno);
737 cr->seqno = newseqno;
738 break;
739 } else {
740 list_del(&(cr->timeout_list));
741 list_del(&(cr->conn_list));
742 cr->ackrcvd = 1;
743 kref_put(&(cr->ref), free_connretrans);
746 cr = nextcr;
749 if (unlikely(list_empty(&(rconn->target.out.retrans_list))) == 0) {
750 struct conn_retrans *cr = container_of(
751 rconn->target.out.retrans_list.next,
752 struct conn_retrans, conn_list);
753 if (unlikely(cr->seqno - rconn->target.out.seqno_acked > 0)) {
754 rconn->target.out.seqno_acked = cr->seqno;
758 in_order:
759 if (likely(seqno - rconn->target.out.seqno_acked > 0)) {
760 rconn->target.out.seqno_acked = seqno;
761 setwindow = 1;
764 cr = search_seqno(rconn, seqno_ooo);
766 while (cr != 0) {
767 struct list_head *next = cr->conn_list.next;
768 struct conn_retrans *nextcr = 0;
769 if (next != &(rconn->target.out.retrans_list)) {
770 nextcr = container_of(next, struct conn_retrans,
771 conn_list);
774 if (((__s32)(cr->seqno + cr->length -
775 rconn->target.out.seqno_acked)) <= 0) {
776 list_del(&(cr->timeout_list));
777 list_del(&(cr->conn_list));
778 cr->ackrcvd = 1;
779 kref_put(&(cr->ref), free_connretrans);
782 cr = nextcr;
785 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
786 databuf_ack(rconn, rconn->target.out.seqno_acked);
788 setwindow = setwindow || (seqno == rconn->target.out.seqno_acked &&
789 (kpacket_seqno - rconn->target.out.kp_windowsetseqno >
790 0));
791 if (setwindow) {
792 rconn->target.out.kp_windowsetseqno = kpacket_seqno;
793 rconn->target.out.seqno_windowlimit = seqno +
794 dec_window(window);
797 out:
798 mutex_unlock(&(rconn->rcv_lock));
801 static void schedule_retransmit_conn(struct conn_retrans *cr, struct conn *rconn,
802 __u32 seqno, __u32 len)
804 unsigned long iflags;
806 struct neighbor *nb = rconn->target.out.nb;
808 int first;
810 BUG_ON(rconn->targettype != TARGET_OUT);
812 memset(cr, 0, sizeof (struct conn_retrans));
813 cr->rconn = rconn;
814 kref_get(&(rconn->ref));
815 cr->seqno = seqno;
816 cr->length = len;
817 kref_init(&(cr->ref));
818 set_conn_retrans_timeout(cr);
820 spin_lock_irqsave( &(nb->retrans_lock), iflags );
822 first = unlikely(list_empty(&(nb->retrans_list_conn)));
823 list_add_tail(&(cr->timeout_list), &(nb->retrans_list_conn));
825 list_add_tail(&(cr->conn_list), &(rconn->target.out.retrans_list));
827 if (unlikely(unlikely(first) &&
828 unlikely(nb->retrans_timer_conn_running == 0))) {
829 schedule_delayed_work(&(nb->retrans_timer_conn),
830 cr->timeout - jiffies);
831 nb->retrans_timer_conn_running = 1;
832 kref_get(&(nb->ref));
835 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
838 static int _flush_out(struct conn *rconn, int fromqos, __u32 creditsperbyte)
840 int targetmss = mss(rconn->target.out.nb);
841 __u32 seqno;
843 #warning todo honor window size, balance credits
845 BUG_ON(rconn->targettype != TARGET_OUT);
847 if (unlikely(rconn->target.out.conn_id == 0))
848 return RC_FLUSH_CONN_OK;
850 if (unlikely(atomic_read(&(rconn->isreset)) != 0))
851 return RC_FLUSH_CONN_OK;
853 if (fromqos == 0 && may_send_conn(rconn) == 0)
854 goto qos;
856 while (rconn->buf.read_remaining >= targetmss) {
857 struct conn_retrans *cr;
858 struct sk_buff *skb;
859 char *dst;
860 int rc;
862 if (unlikely(creditsperbyte * targetmss >
863 rconn->credits))
864 return RC_FLUSH_CONN_CREDITS;
866 seqno = rconn->target.out.seqno_nextsend;
867 skb = create_packet(rconn->target.out.nb, targetmss, GFP_ATOMIC,
868 rconn->target.out.conn_id, seqno);
869 if (unlikely(skb == 0))
870 goto oom;
872 cr = kmem_cache_alloc(connretrans_slab, GFP_KERNEL);
873 if (unlikely(cr == 0)) {
874 kfree_skb(skb);
875 goto oom;
878 dst = skb_put(skb, targetmss);
880 databuf_pull(&(rconn->buf), dst, targetmss);
882 rc = dev_queue_xmit(skb);
883 if (rc != 0) {
884 databuf_unpull(&(rconn->buf), targetmss);
885 kmem_cache_free(connretrans_slab, cr);
886 goto qos;
889 rconn->credits -= creditsperbyte * targetmss;
890 rconn->target.out.seqno_nextsend += targetmss;
891 schedule_retransmit_conn(cr, rconn, seqno, targetmss);
894 if (rconn->buf.read_remaining > 0) {
895 struct control_msg_out *cm;
896 struct conn_retrans *cr;
897 __u32 len = rconn->buf.read_remaining;
898 char *buf = kmalloc(len, GFP_KERNEL);
900 if (unlikely(creditsperbyte * len > rconn->credits))
901 return RC_FLUSH_CONN_CREDITS;
903 if (unlikely(buf == 0))
904 goto oom;
906 cm = alloc_control_msg(rconn->target.out.nb, ACM_PRIORITY_MED);
907 if (unlikely(cm == 0)) {
908 kfree(buf);
909 goto oom;
912 cr = kmem_cache_alloc(connretrans_slab, GFP_KERNEL);
913 if (unlikely(cr == 0)) {
914 kfree(buf);
915 free_control_msg(cm);
916 goto oom;
919 databuf_pull(&(rconn->buf), buf, len);
921 seqno = rconn->target.out.seqno_nextsend;
922 rconn->credits -= creditsperbyte * len;
923 rconn->target.out.seqno_nextsend += len;
925 schedule_retransmit_conn(cr, rconn, seqno, len);
927 send_conndata(cm, rconn->target.out.conn_id, seqno, buf, buf,
928 len);
931 wake_sender(rconn);
933 if (0) {
934 qos:
935 printk(KERN_ERR "qos");
936 if (fromqos == 0)
937 qos_enqueue_conn(rconn);
938 return RC_FLUSH_CONN_CONG;
941 if (0) {
942 oom:
943 #warning todo flush later
944 printk(KERN_ERR "oom");
947 return RC_FLUSH_CONN_OK;
950 void flush_out(struct conn *rconn)
952 _flush_out(rconn, 0, 0);
955 int __init cor_snd_init(void)
957 connretrans_slab = kmem_cache_create("cor_connretrans",
958 sizeof(struct conn_retrans), 8, 0, 0);
960 if (unlikely(connretrans_slab == 0))
961 return 1;
963 INIT_DELAYED_WORK(&(qos_resume_work), qos_resume);
965 return 0;
968 MODULE_LICENSE("GPL");