qos queue
[cor_2_6_31.git] / net / cor / snd.c
blob360d5a08aeac1f1d9901d359a0a833f01efe86b4
1 /*
2 * Connection oriented routing
3 * Copyright (C) 2007-2008 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/gfp.h>
22 #include <linux/jiffies.h>
23 #include <linux/slab.h>
25 #include "cor.h"
27 struct kmem_cache *connretrans_slab;
29 struct conn_retrans {
30 /* timeout_list and conn_list share a single ref */
31 struct kref ref;
32 struct list_head timeout_list;
33 struct list_head conn_list;
34 struct htab_entry htab_entry;
35 struct conn *rconn;
36 __u32 seqno;
37 __u32 length;
38 __u8 ackrcvd;
39 unsigned long timeout;
42 static void free_connretrans(struct kref *ref)
44 struct conn_retrans *cr = container_of(ref, struct conn_retrans, ref);
45 kmem_cache_free(connretrans_slab, cr);
46 kref_put(&(cr->rconn->ref), free_conn);
49 DEFINE_MUTEX(queues_lock);
50 LIST_HEAD(queues);
51 struct delayed_work qos_resume_work;
53 struct qos_queue {
54 struct list_head queue_list;
56 struct net_device *dev;
58 struct list_head kpackets_waiting;
59 struct list_head conn_retrans_waiting;
60 struct list_head announce_waiting;
61 struct list_head conns_waiting;
64 #define RC_FLUSH_CONN_OK 0
65 #define RC_FLUSH_CONN_CONG 1
66 #define RC_FLUSH_CONN_CREDITS 2
68 static int _flush_out(struct conn *rconn, int fromqos, __u32 creditsperbyte);
70 static int _resume_conns(struct qos_queue *q)
72 struct conn *best = 0;
73 __u64 bestcredit = 0;
74 __u64 secondcredit = 0;
76 int rc;
78 struct list_head *lh = q->conns_waiting.next;
80 while (lh != &(q->conns_waiting)) {
81 struct conn *rconn = container_of(lh, struct conn,
82 target.out.rb.lh);
83 __u64 credits;
85 BUG_ON(rconn->targettype != TARGET_OUT);
87 mutex_lock(&(rconn->rcv_lock));
88 credits = ((__u64) rconn->target.out.credits) << 32 /
89 rconn->buf.read_remaining;
90 mutex_unlock(&(rconn->rcv_lock));
92 if (best == 0 || bestcredit < credits) {
93 secondcredit = bestcredit;
94 best = rconn;
95 bestcredit = credits;
96 } else if (secondcredit < credits) {
97 secondcredit = credits;
100 lh = lh->next;
103 mutex_lock(&(best->rcv_lock));
104 rc = _flush_out(best, 1, (__u32) (secondcredit >> 32));
106 if (rc == 0) {
107 best->target.out.rb.in_queue = 0;
108 list_del(&(best->target.out.rb.lh));
110 mutex_unlock(&(best->rcv_lock));
112 return rc;
115 static int resume_conns(struct qos_queue *q)
117 while (list_empty(&(q->conns_waiting))) {
118 int rc = _resume_conns(q);
119 if (rc != 0)
120 return rc;
122 return 0;
125 static int send_retrans(struct neighbor *nb);
127 static int _qos_resume(struct qos_queue *q, int caller)
129 int rc = 0;
131 struct list_head *lh;
133 if (caller == QOS_CALLER_KPACKET)
134 lh = &(q->conn_retrans_waiting);
135 else if (caller == QOS_CALLER_CONN_RETRANS)
136 lh = &(q->kpackets_waiting);
137 else if (caller == QOS_CALLER_ANNOUNCE)
138 lh = &(q->announce_waiting);
139 else
140 BUG();
142 while (list_empty(lh) == 0) {
143 struct list_head *curr = lh->next;
144 struct resume_block *rb = container_of(curr,
145 struct resume_block, lh);
146 rb->in_queue = 0;
147 list_del(curr);
149 mutex_unlock(&(queues_lock));
151 if (caller == QOS_CALLER_KPACKET) {
152 struct neighbor *nb = container_of(rb, struct neighbor,
153 rb_kp);
154 rc = resume_send_messages(nb);
155 } else if (caller == QOS_CALLER_CONN_RETRANS) {
156 struct neighbor *nb = container_of(rb, struct neighbor,
157 rb_cr);
158 rc = send_retrans(nb);
159 } else if (caller == QOS_CALLER_ANNOUNCE) {
160 struct announce_data *ann = container_of(rb,
161 struct announce_data, rb);
162 rc = send_announce_qos(ann);
163 } else {
164 BUG();
167 mutex_lock(&(queues_lock));
169 if (rc != 0 && rb->in_queue == 0) {
170 rb->in_queue = 1;
171 list_add(curr , lh);
172 } else {
173 if (caller == QOS_CALLER_KPACKET) {
174 kref_put(&(container_of(rb, struct neighbor,
175 rb_kp)->ref), neighbor_free);
176 } else if (caller == QOS_CALLER_CONN_RETRANS) {
177 kref_put(&(container_of(rb, struct neighbor,
178 rb_cr)->ref), neighbor_free);
179 } else if (caller == QOS_CALLER_ANNOUNCE) {
180 kref_put(&(container_of(rb,
181 struct announce_data, rb)->ref),
182 announce_data_free);
183 } else {
184 BUG();
189 if (rc != 0)
190 break;
192 return rc;
195 static void qos_resume(struct work_struct *work)
197 struct list_head *curr;
199 mutex_lock(&(queues_lock));
201 curr = queues.next;
202 while (curr != (&queues)) {
203 struct qos_queue *q = container_of(curr,
204 struct qos_queue, queue_list);
205 int i;
207 for (i=0;i<4;i++) {
208 int rc;
209 if (i == 3)
210 rc = resume_conns(q);
211 else
212 rc = _qos_resume(q, i);
214 if (rc != 0)
215 goto congested;
218 curr = curr->next;
220 if (i == 4 && unlikely(q->dev == 0)) {
221 list_del(&(q->queue_list));
222 kfree(q);
226 if (0) {
227 congested:
228 schedule_delayed_work(&(qos_resume_work), 1);
231 mutex_unlock(&(queues_lock));
234 static struct qos_queue *get_queue(struct net_device *dev)
236 struct list_head *curr = queues.next;
237 while (curr != (&queues)) {
238 struct qos_queue *q = container_of(curr,
239 struct qos_queue, queue_list);
240 if (q->dev == dev)
241 return q;
243 return 0;
246 int destroy_queue(struct net_device *dev)
248 struct qos_queue *q;
250 mutex_lock(&(queues_lock));
252 q = get_queue(dev);
254 if (q == 0) {
255 mutex_unlock(&(queues_lock));
256 return 1;
259 q->dev = 0;
261 dev_put(dev);
263 mutex_unlock(&(queues_lock));
265 return 0;
268 int create_queue(struct net_device *dev)
270 struct qos_queue *q = kmalloc(sizeof(struct qos_queue), GFP_KERNEL);
272 if (q == 0) {
273 printk(KERN_ERR "cor: unable to allocate memory for device "
274 "queue, not enabling device");
275 return 1;
278 q->dev = dev;
279 dev_hold(dev);
281 INIT_LIST_HEAD(&(q->kpackets_waiting));
282 INIT_LIST_HEAD(&(q->conn_retrans_waiting));
283 INIT_LIST_HEAD(&(q->announce_waiting));
284 INIT_LIST_HEAD(&(q->conns_waiting));
286 mutex_lock(&(queues_lock));
287 list_add(&(q->queue_list), &queues);
288 mutex_unlock(&(queues_lock));
290 return 0;
293 void qos_enqueue(struct net_device *dev, struct resume_block *rb, int caller)
295 struct qos_queue *q;
297 mutex_lock(&(queues_lock));
299 if (rb->in_queue)
300 goto out;
302 q = get_queue(dev);
303 if (unlikely(q == 0))
304 goto out;
306 rb->in_queue = 1;
308 if (caller == QOS_CALLER_KPACKET) {
309 list_add(&(rb->lh) , &(q->conn_retrans_waiting));
310 kref_get(&(container_of(rb, struct neighbor, rb_kp)->ref));
311 } else if (caller == QOS_CALLER_CONN_RETRANS) {
312 list_add(&(rb->lh), &(q->kpackets_waiting));
313 kref_get(&(container_of(rb, struct neighbor, rb_cr)->ref));
314 } else if (caller == QOS_CALLER_ANNOUNCE) {
315 list_add(&(rb->lh), &(q->announce_waiting));
316 kref_get(&(container_of(rb, struct announce_data, rb)->ref));
317 } else {
318 BUG();
321 out:
322 mutex_unlock(&(queues_lock));
325 void qos_enqueue_kpacket(struct neighbor *nb)
327 qos_enqueue(nb->dev, &(nb->rb_kp), QOS_CALLER_KPACKET);
330 static void qos_enqueue_conn_retrans(struct neighbor *nb)
332 qos_enqueue(nb->dev, &(nb->rb_cr), QOS_CALLER_CONN_RETRANS);
335 static void qos_enqueue_conn(struct conn *rconn)
337 BUG_ON(rconn->targettype != TARGET_OUT);
338 qos_enqueue(rconn->target.out.nb->dev, &(rconn->target.out.rb),
339 QOS_CALLER_CONN);
342 static int may_send_conn_retrans(struct neighbor *nb)
344 struct qos_queue *q;
345 int rc = 0;
347 mutex_lock(&(queues_lock));
349 q = get_queue(nb->dev);
350 if (unlikely(q == 0))
351 goto out;
353 rc = (list_empty(&(q->kpackets_waiting)));
355 out:
356 mutex_unlock(&(queues_lock));
358 return rc;
361 static int may_send_conn(struct conn *rconn)
363 struct qos_queue *q;
364 int rc = 0;
366 mutex_lock(&(queues_lock));
368 q = get_queue(rconn->target.out.nb->dev);
369 if (unlikely(q == 0))
370 goto out;
372 rc = (list_empty(&(q->kpackets_waiting)) &&
373 list_empty(&(q->conn_retrans_waiting)) &&
374 list_empty(&(q->announce_waiting)) &&
375 list_empty(&(q->conns_waiting)));
377 out:
378 mutex_unlock(&(queues_lock));
380 return rc;
384 struct sk_buff *create_packet(struct neighbor *nb, int size,
385 gfp_t alloc_flags, __u32 conn_id, __u32 seqno)
387 struct sk_buff *ret;
388 char *dest;
390 ret = alloc_skb(size + 9 + LL_ALLOCATED_SPACE(nb->dev), alloc_flags);
391 if (unlikely(0 == ret))
392 return 0;
394 ret->protocol = htons(ETH_P_COR);
395 ret->dev = nb->dev;
397 skb_reserve(ret, LL_RESERVED_SPACE(nb->dev));
398 if(unlikely(dev_hard_header(ret, nb->dev, ETH_P_COR, nb->mac,
399 nb->dev->dev_addr, ret->len) < 0))
400 return 0;
401 skb_reset_network_header(ret);
403 dest = skb_put(ret, 9);
404 BUG_ON(0 == dest);
406 dest[0] = PACKET_TYPE_DATA;
407 dest += 1;
409 put_u32(dest, conn_id, 1);
410 dest += 4;
411 put_u32(dest, seqno, 1);
412 dest += 4;
414 return ret;
417 static void set_conn_retrans_timeout(struct conn_retrans *cr)
419 struct neighbor *nb = cr->rconn->target.out.nb;
420 cr->timeout = jiffies + usecs_to_jiffies(100000 +
421 ((__u32) atomic_read(&(nb->latency))) +
422 ((__u32) atomic_read(&(nb->max_remote_cmsg_delay))));
425 static struct conn_retrans *readd_conn_retrans(struct conn_retrans *cr,
426 struct neighbor *nb, __u32 length, int *dontsend)
428 unsigned long iflags;
430 struct conn_retrans *ret = 0;
432 spin_lock_irqsave( &(nb->retrans_lock), iflags );
434 if (unlikely(cr->ackrcvd)) {
435 *dontsend = 1;
436 goto out;
437 } else
438 *dontsend = 0;
440 if (unlikely(cr->length > length)) {
441 ret = kmem_cache_alloc(connretrans_slab, GFP_ATOMIC);
442 if (unlikely(ret == 0)) {
443 cr->timeout = jiffies + 1;
444 goto out;
447 memset(ret, 0, sizeof (struct conn_retrans));
448 ret->rconn = cr->rconn;
449 kref_get(&(cr->rconn->ref));
450 ret->seqno = cr->seqno + length;
451 ret->length = cr->length - length;
452 kref_init(&(ret->ref));
454 list_add(&(ret->timeout_list), &(nb->retrans_list_conn));
455 list_add(&(ret->conn_list), &(cr->conn_list));
457 cr->length = length;
458 } else {
459 list_del(&(cr->timeout_list));
460 list_add_tail(&(cr->timeout_list), &(nb->retrans_list_conn));
461 set_conn_retrans_timeout(cr);
463 BUG_ON(cr->length != length);
466 out:
467 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
469 return ret;
472 /* rcvlock *must* be held while calling this */
473 void cancel_retrans(struct conn *rconn)
475 unsigned long iflags;
476 struct neighbor *nb = rconn->target.out.nb;
478 spin_lock_irqsave( &(nb->retrans_lock), iflags );
480 while (list_empty(&(rconn->target.out.retrans_list)) == 0) {
481 struct conn_retrans *cr = container_of(
482 rconn->target.out.retrans_list.next,
483 struct conn_retrans, conn_list);
484 BUG_ON(cr->rconn != rconn);
486 list_del(&(cr->timeout_list));
487 list_del(&(cr->conn_list));
488 cr->ackrcvd = 1;
489 kref_put(&(cr->ref), free_connretrans);
492 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
495 static int _send_retrans(struct neighbor *nb, struct conn_retrans *cr)
497 int targetmss = mss(nb);
498 int dontsend;
499 int queuefull = 0;
501 mutex_lock(&(cr->rconn->rcv_lock));
503 BUG_ON(cr->rconn->targettype != TARGET_OUT);
504 BUG_ON(cr->rconn->target.out.nb != nb);
506 kref_get(&(cr->rconn->ref));
508 if (unlikely(atomic_read(&(cr->rconn->isreset)) != 0)) {
509 cancel_retrans(cr->rconn);
510 goto out;
513 while (cr->length >= targetmss) {
514 struct sk_buff *skb;
515 char *dst;
516 struct conn_retrans *cr2;
517 int rc;
519 if (may_send_conn_retrans(nb) == 0)
520 goto qos_enqueue;
522 skb = create_packet(nb, targetmss, GFP_KERNEL,
523 cr->rconn->target.out.conn_id, cr->seqno);
524 if (unlikely(skb == 0)) {
525 cr->timeout = jiffies + 1;
526 goto out;
529 cr2 = readd_conn_retrans(cr, nb, targetmss, &dontsend);
530 if (unlikely(unlikely(dontsend) || unlikely(cr2 == 0 &&
531 unlikely(cr->length > targetmss)))) {
532 kfree_skb(skb);
533 goto out;
536 dst = skb_put(skb, targetmss);
538 databuf_pullold(&(cr->rconn->buf), cr->seqno, dst, targetmss);
539 rc = dev_queue_xmit(skb);
541 if (rc != 0) {
542 unsigned long iflags;
544 spin_lock_irqsave( &(nb->retrans_lock), iflags );
545 if (unlikely(cr->ackrcvd)) {
546 dontsend = 1;
547 } else {
548 list_del(&(cr->timeout_list));
549 list_add(&(cr->timeout_list),
550 &(nb->retrans_list_conn));
552 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
553 if (dontsend == 0)
554 goto qos_enqueue;
557 cr = cr2;
559 if (likely(cr == 0))
560 goto out;
563 if (unlikely(cr->length <= 0)) {
564 BUG();
565 } else {
566 struct control_msg_out *cm;
567 char *buf = kmalloc(cr->length, GFP_KERNEL);
569 if (unlikely(buf == 0)) {
570 cr->timeout = jiffies + 1;
571 goto out;
574 cm = alloc_control_msg(nb, ACM_PRIORITY_MED);
575 if (unlikely(cm == 0)) {
576 cr->timeout = jiffies + 1;
577 kfree(buf);
578 goto out;
581 databuf_pullold(&(cr->rconn->buf), cr->seqno, buf, cr->length);
583 if (unlikely(readd_conn_retrans(cr, nb, cr->length, &dontsend)
584 != 0))
585 BUG();
587 if (likely(dontsend == 0)) {
588 send_conndata(cm, cr->rconn->target.out.conn_id,
589 cr->seqno, buf, buf, cr->length);
593 if (0) {
594 qos_enqueue:
595 qos_enqueue_conn_retrans(nb);
596 queuefull = 1;
598 out:
599 mutex_unlock(&(cr->rconn->rcv_lock));
601 kref_put(&(cr->rconn->ref), free_conn);
603 return queuefull;
606 static int send_retrans(struct neighbor *nb)
608 unsigned long iflags;
610 struct conn_retrans *cr = 0;
612 int nbstate;
613 int nbput = 1;
614 int queuefull = 0;
616 spin_lock_irqsave( &(nb->state_lock), iflags );
617 nbstate = nb->state;
618 spin_unlock_irqrestore( &(nb->state_lock), iflags );
620 while (1) {
621 spin_lock_irqsave( &(nb->retrans_lock), iflags );
623 if (list_empty(&(nb->retrans_list_conn))) {
624 nb->retrans_timer_conn_running = 0;
625 break;
628 cr = container_of(nb->retrans_list_conn.next,
629 struct conn_retrans, timeout_list);
631 BUG_ON(cr->rconn->targettype != TARGET_OUT);
633 if (unlikely(unlikely(nbstate == NEIGHBOR_STATE_KILLED) ||
634 unlikely(atomic_read(
635 &(cr->rconn->isreset)) != 0))) {
636 list_del(&(cr->timeout_list));
637 list_del(&(cr->conn_list));
638 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
640 kref_put(&(cr->ref), free_connretrans);
641 continue;
644 BUG_ON(nb != cr->rconn->target.out.nb);
646 if (time_after(cr->timeout, jiffies)) {
647 schedule_delayed_work(&(nb->retrans_timer_conn),
648 cr->timeout - jiffies);
649 nbput = 0;
650 break;
653 kref_get(&(cr->ref));
654 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
655 queuefull = _send_retrans(nb, cr);
656 kref_put(&(cr->ref), free_connretrans);
657 if (queuefull)
658 goto out;
661 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
663 out:
664 if (nbput)
665 kref_put(&(nb->ref), neighbor_free);
667 return queuefull;
670 void retransmit_conn_timerfunc(struct work_struct *work)
672 struct neighbor *nb = container_of(to_delayed_work(work),
673 struct neighbor, retrans_timer_conn);
675 send_retrans(nb);
678 static struct conn_retrans *search_seqno(struct conn *rconn, __u32 seqno)
680 struct list_head *next = rconn->target.out.retrans_list.next;
682 while (next != &(rconn->target.out.retrans_list)) {
683 struct conn_retrans *cr = container_of(next,
684 struct conn_retrans, conn_list);
685 BUG_ON(cr->rconn != rconn);
686 if (cr->seqno + cr->length > seqno)
687 return cr;
688 next = next->next;
690 return 0;
693 void conn_ack_rcvd(__u32 kpacket_seqno, struct conn *rconn, __u32 seqno,
694 __u8 window, __u32 seqno_ooo, __u32 length)
696 unsigned long iflags;
697 struct neighbor *nb = rconn->target.out.nb;
698 struct conn_retrans *cr = 0;
700 int setwindow = 0;
702 BUG_ON(rconn->targettype != TARGET_OUT);
704 mutex_lock(&(rconn->rcv_lock));
706 if (unlikely(seqno - rconn->target.out.seqno_nextsend > 0))
707 goto out;
709 spin_lock_irqsave( &(nb->retrans_lock), iflags );
711 if (likely(length == 0))
712 goto in_order;
714 cr = search_seqno(rconn, seqno_ooo);
716 while (cr != 0) {
717 struct list_head *next = cr->conn_list.next;
718 struct conn_retrans *nextcr = 0;
719 if (next != &(rconn->target.out.retrans_list)) {
720 nextcr = container_of(next, struct conn_retrans,
721 conn_list);
724 if (((__s32)(cr->seqno + cr->length - seqno_ooo - length)) > 0) {
725 __u32 newseqno = seqno_ooo + length;
726 cr->length -= (newseqno - cr->seqno);
727 cr->seqno = newseqno;
728 break;
729 } else {
730 list_del(&(cr->timeout_list));
731 list_del(&(cr->conn_list));
732 cr->ackrcvd = 1;
733 kref_put(&(cr->ref), free_connretrans);
736 cr = nextcr;
739 if (unlikely(list_empty(&(rconn->target.out.retrans_list))) == 0) {
740 struct conn_retrans *cr = container_of(
741 rconn->target.out.retrans_list.next,
742 struct conn_retrans, conn_list);
743 if (unlikely(cr->seqno - rconn->target.out.seqno_acked > 0)) {
744 rconn->target.out.seqno_acked = cr->seqno;
748 in_order:
749 if (likely(seqno - rconn->target.out.seqno_acked > 0)) {
750 rconn->target.out.seqno_acked = seqno;
751 setwindow = 1;
754 cr = search_seqno(rconn, seqno_ooo);
756 while (cr != 0) {
757 struct list_head *next = cr->conn_list.next;
758 struct conn_retrans *nextcr = 0;
759 if (next != &(rconn->target.out.retrans_list)) {
760 nextcr = container_of(next, struct conn_retrans,
761 conn_list);
764 if (((__s32)(cr->seqno + cr->length -
765 rconn->target.out.seqno_acked)) <= 0) {
766 list_del(&(cr->timeout_list));
767 list_del(&(cr->conn_list));
768 cr->ackrcvd = 1;
769 kref_put(&(cr->ref), free_connretrans);
772 cr = nextcr;
775 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
776 databuf_ack(&(rconn->buf), rconn->target.out.seqno_acked);
778 setwindow = setwindow || (seqno == rconn->target.out.seqno_acked &&
779 (kpacket_seqno - rconn->target.out.kp_windowsetseqno >
780 0));
781 if (setwindow) {
782 rconn->target.out.kp_windowsetseqno = kpacket_seqno;
783 rconn->target.out.seqno_windowlimit = seqno +
784 dec_window(window);
787 out:
788 mutex_unlock(&(rconn->rcv_lock));
791 static void schedule_retransmit_conn(struct conn_retrans *cr, struct conn *rconn,
792 __u32 seqno, __u32 len)
794 unsigned long iflags;
796 struct neighbor *nb = rconn->target.out.nb;
798 int first;
800 BUG_ON(rconn->targettype != TARGET_OUT);
802 memset(cr, 0, sizeof (struct conn_retrans));
803 cr->rconn = rconn;
804 kref_get(&(rconn->ref));
805 cr->seqno = seqno;
806 cr->length = len;
807 kref_init(&(cr->ref));
808 set_conn_retrans_timeout(cr);
810 spin_lock_irqsave( &(nb->retrans_lock), iflags );
812 first = unlikely(list_empty(&(nb->retrans_list_conn)));
813 list_add_tail(&(cr->timeout_list), &(nb->retrans_list_conn));
815 list_add_tail(&(cr->conn_list), &(rconn->target.out.retrans_list));
817 if (unlikely(unlikely(first) &&
818 unlikely(nb->retrans_timer_conn_running == 0))) {
819 schedule_delayed_work(&(nb->retrans_timer_conn),
820 cr->timeout - jiffies);
821 nb->retrans_timer_conn_running = 1;
822 kref_get(&(nb->ref));
825 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
828 static int _flush_out(struct conn *rconn, int fromqos, __u32 creditsperbyte)
830 int targetmss = mss(rconn->target.out.nb);
831 __u32 seqno;
833 #warning todo honor window size
835 BUG_ON(rconn->targettype != TARGET_OUT);
837 if (unlikely(rconn->target.out.conn_id == 0))
838 return RC_FLUSH_CONN_OK;
840 if (unlikely(atomic_read(&(rconn->isreset)) != 0))
841 return RC_FLUSH_CONN_OK;
843 if (fromqos == 0 && may_send_conn(rconn) == 0)
844 goto qos;
846 while (rconn->buf.read_remaining >= targetmss) {
847 struct conn_retrans *cr;
848 struct sk_buff *skb;
849 char *dst;
850 int rc;
852 if (unlikely(creditsperbyte * targetmss >
853 rconn->target.out.credits))
854 return RC_FLUSH_CONN_CREDITS;
856 seqno = rconn->target.out.seqno_nextsend;
857 skb = create_packet(rconn->target.out.nb, targetmss, GFP_ATOMIC,
858 rconn->target.out.conn_id, seqno);
859 if (unlikely(skb == 0))
860 goto oom;
862 cr = kmem_cache_alloc(connretrans_slab, GFP_KERNEL);
863 if (unlikely(cr == 0)) {
864 kfree_skb(skb);
865 goto oom;
868 dst = skb_put(skb, targetmss);
870 databuf_pull(&(rconn->buf), dst, targetmss);
872 rc = dev_queue_xmit(skb);
873 if (rc != 0) {
874 databuf_unpull(&(rconn->buf), targetmss);
875 kmem_cache_free(connretrans_slab, cr);
876 goto qos;
879 rconn->target.out.credits -= creditsperbyte * targetmss;
880 rconn->target.out.seqno_nextsend += targetmss;
881 schedule_retransmit_conn(cr, rconn, seqno, targetmss);
884 if (rconn->buf.read_remaining > 0) {
885 struct control_msg_out *cm;
886 struct conn_retrans *cr;
887 __u32 len = rconn->buf.read_remaining;
888 char *buf = kmalloc(len, GFP_KERNEL);
890 if (unlikely(creditsperbyte * len > rconn->target.out.credits))
891 return RC_FLUSH_CONN_CREDITS;
893 if (unlikely(buf == 0))
894 goto oom;
896 cm = alloc_control_msg(rconn->target.out.nb, ACM_PRIORITY_MED);
897 if (unlikely(cm == 0)) {
898 kfree(buf);
899 goto oom;
902 cr = kmem_cache_alloc(connretrans_slab, GFP_KERNEL);
903 if (unlikely(cr == 0)) {
904 kfree(buf);
905 free_control_msg(cm);
906 goto oom;
909 databuf_pull(&(rconn->buf), buf, len);
911 seqno = rconn->target.out.seqno_nextsend;
912 rconn->target.out.credits -= creditsperbyte * len;
913 rconn->target.out.seqno_nextsend += len;
915 schedule_retransmit_conn(cr, rconn, seqno, len);
917 send_conndata(cm, rconn->target.out.conn_id, seqno, buf, buf,
918 len);
921 wake_sender(rconn);
923 if (0) {
924 qos:
925 printk(KERN_ERR "qos");
926 if (fromqos == 0)
927 qos_enqueue_conn(rconn);
928 return RC_FLUSH_CONN_CONG;
931 if (0) {
932 oom:
933 #warning todo flush later
934 printk(KERN_ERR "oom");
937 return RC_FLUSH_CONN_OK;
940 void flush_out(struct conn *rconn)
942 _flush_out(rconn, 0, 0);
945 int __init cor_snd_init(void)
947 connretrans_slab = kmem_cache_create("cor_connretrans",
948 sizeof(struct conn_retrans), 8, 0, 0);
950 if (unlikely(connretrans_slab == 0))
951 return 1;
953 INIT_DELAYED_WORK(&(qos_resume_work), qos_resume);
955 return 0;
958 MODULE_LICENSE("GPL");