qos fixes, reset bufferusage on conn reset, conn refcnt fix
[cor_2_6_31.git] / net / cor / rcv.c
blob0ce4e5a83bb340f84761a1eab3b167ccb0a0cde9
1 /*
2 * Connection oriented routing
3 * Copyright (C) 2007-2010 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/module.h>
22 #include <linux/version.h>
23 #include <linux/kernel.h>
24 #include <linux/init.h>
25 #include <linux/in.h>
28 #include "cor.h"
30 atomic_t packets_in_workqueue = ATOMIC_INIT(0);
32 atomic_t ooo_packets = ATOMIC_INIT(0);
34 static struct workqueue_struct *packet_wq;
36 static struct workqueue_struct *outofbufferspace_wq;
37 struct work_struct outofbufferspace_work;
38 spinlock_t oobss_lock;
39 int outofbufferspace_scheduled;
41 /**
42 * buffering space is divided in 4 areas:
44 * buffer_init:
45 * distributed equally among all conns; shrinks and grows immediately when there
46 * are new connections or connections are reset
48 * buffer_speed:
49 * distributed proportional to speed; shrinks and grows constantly
51 * buffer_ata:
52 * used to make noise to make traffic analysis harder; may only shrink if data
53 * is either send on or "stuck" for a long time
55 * buffer_reserve:
56 * reserve in case where sudden shrinking causes some connections to contain
57 * more old data than allowed by the first 3 buffers. If this area is full, the
58 * out-of-memory conn-resetter is triggered
60 * Each area commits a certain amount of space to each connection. This is the
61 * maximum buffer space a connection is allowed to use. The space of a specific
62 * connection is first accounted to buffer_ata. If the buffer space allowed to
63 * use is exceeded, the rest is accounted to buffer_speed and then buffer_init.
64 * The reserve area will be used last. This should only be the case, if the
65 * assigned buffer space of the first 3 areas shrink suddenly. If this area is
66 * also used up, connections will be reset.
69 DEFINE_MUTEX(buffer_conn_list_lock);
70 LIST_HEAD(buffer_conn_list);
72 /**
73 * used to buffer inserts when main list is locked, moved to main list, after
74 * processing of main list finishes
76 LIST_HEAD(buffer_conn_tmp_list); /* protected by bufferlimits_lock */
79 DEFINE_MUTEX(bufferlimits_lock);
81 static __u64 bufferassigned_init;
82 static __u64 bufferassigned_speed;
83 static __u64 bufferassigned_ata;
85 static __u64 bufferusage_init;
86 static __u64 bufferusage_speed;
87 static __u64 bufferusage_ata;
88 static __u64 bufferusage_reserve;
90 spinlock_t st_lock;
91 struct speedtracker st;
93 static __u64 desired_bufferusage(__u64 assigned, __u64 usage, __u64 assignlimit,
94 __u64 usagelimit)
96 __u64 ret;
97 __u64 load;
99 if (unlikely(assignlimit < usagelimit))
100 assignlimit = usagelimit;
102 if (multiply_div(usage, 9, 10) > usagelimit)
103 return multiply_div(usagelimit, 9, 10);
105 load = multiply_div(usage, 192, usagelimit);
107 /* slow limit increase, fast decrease */
108 if (load == 128) {
109 ret = assigned;
110 } else if (load < 128) {
111 if (load == 0)
112 return multiply_div(usagelimit, 9, 10);
114 if (load < 96)
115 load = 96;
116 ret = multiply_div(assigned, 128, load);
117 } else {
118 ret = multiply_div(assigned, 128, load + load - 128);
121 if (ret > assignlimit)
122 return assignlimit;
123 return ret;
126 static __u64 get_speed(struct speedtracker *st, unsigned long jiffies_tmp)
128 if (unlikely(time_after(st->jiffies_last_update, jiffies_tmp) ||
129 time_before(st->jiffies_last_update + HZ*10,
130 jiffies_tmp))) {
131 st->jiffies_last_update = jiffies_tmp;
132 st->speed = 0;
133 st->bytes_curr = 0;
134 return 0;
137 for (;time_before(st->jiffies_last_update, jiffies_tmp);
138 st->jiffies_last_update++) {
139 __u32 bytes_curr = 0;
140 if ((st->jiffies_last_update + 1) == jiffies_tmp) {
141 bytes_curr = st->bytes_curr;
142 st->bytes_curr = 0;
144 st->speed = (st->speed * (HZ-1) + (((__u64)bytes_curr)<<16))/HZ;
147 if ((st->jiffies_last_update + 1) == jiffies_tmp) {
148 st->jiffies_last_update++;
151 return (st->speed * (HZ*2 - 1) + (((__u64)st->bytes_curr) << 17))/HZ/2;
155 * val1[0], val2[2], res[0] ... least significant
156 * val1[val1len-1], val2[val2len-1], res[reslen-1] ... most significant
158 static void mul(__u32 *val1, unsigned int val1len, __u32 *val2,
159 unsigned int val2len, __u32 *res, int reslen)
161 int digits = val1len + val2len;
162 __u64 overflow = 0;
163 int i;
165 BUG_ON(val1len > 0 && val2len > 0 && reslen < digits);
167 memset(res, 0, reslen);
169 if (val1len == 0 || val2len == 0)
170 return;
172 for(i=0;i<digits;i++) {
173 int idx1;
174 res[i] = (__u32) overflow;
175 overflow = overflow >> 32;
176 for(idx1=0;idx1<val1len && idx1<=i;idx1++) {
177 int idx2 = i - idx1;
178 __u64 tmpres;
180 if (idx2 >= val2len)
181 continue;
183 tmpres = ((__u64) (val1[idx1])) *
184 ((__u64) (val2[idx2]));
185 overflow += tmpres >> 32;
186 tmpres = (tmpres << 32) >> 32;
187 if (res[i] + tmpres < res[i])
188 overflow++;
189 res[i] += tmpres;
192 BUG_ON(overflow != 0);
196 * return values:
197 * 1 == usage1/speed1 offends more
198 * 0 == both offend the same
199 * -1 == usage2/speed2 offends more
201 static int compare_scores(__u32 usage1, __u64 speed1, __u32 usage2,
202 __u64 speed2)
204 int i;
206 __u32 speed1squared[4];
207 __u32 speed2squared[4];
209 __u32 speed1squared_usage2[5];
210 __u32 speed2squared_usage1[5];
213 __u32 speed1_tmp[2];
214 __u32 speed2_tmp[2];
216 speed1_tmp[0] = (speed1 << 32) >> 32;
217 speed1_tmp[1] = (speed1 >> 32);
218 speed2_tmp[0] = (speed2 << 32) >> 32;
219 speed2_tmp[1] = (speed2 << 32);
221 mul(speed1_tmp, 2, speed1_tmp, 2, speed1squared,4);
222 mul(speed2_tmp, 2, speed2_tmp, 2, speed2squared, 4);
224 mul(speed1squared, 4, &usage2, 1, speed1squared_usage2, 5);
225 mul(speed2squared, 4, &usage2, 1, speed2squared_usage1, 5);
227 for(i=4;i>=0;i++) {
228 if (speed1squared_usage2[i] > speed2squared_usage1[i])
229 return -1;
230 if (speed1squared_usage2[i] < speed2squared_usage1[i])
231 return 1;
234 return 0;
237 #define OOBS_SIZE 10
238 static void _outofbufferspace(void)
240 int i;
242 struct list_head *curr;
243 struct conn *offendingconns[OOBS_SIZE];
244 __u32 offendingusage[OOBS_SIZE];
245 __u64 offendingspeed[OOBS_SIZE];
247 memset(&offendingconns, 0, sizeof(offendingconns));
249 mutex_lock(&buffer_conn_list_lock);
251 curr = buffer_conn_list.next;
252 while (curr != &buffer_conn_list) {
253 unsigned long iflags;
255 struct conn *conn = container_of(curr, struct conn,
256 source.in.buffer_list);
258 __u32 usage;
259 __u64 speed;
261 int i;
263 curr = curr->next;
265 mutex_lock(&(conn->rcv_lock));
267 BUG_ON(conn->sourcetype != SOURCE_IN);
269 usage = conn->source.in.usage_reserve;
271 spin_lock_irqsave(&st_lock, iflags);
272 speed = get_speed(&(conn->source.in.st), jiffies);
273 spin_unlock_irqrestore(&st_lock, iflags);
275 mutex_unlock(&(conn->rcv_lock));
277 if (offendingconns[OOBS_SIZE-1] != 0 &&
278 compare_scores(
279 offendingusage[OOBS_SIZE-1],
280 offendingspeed[OOBS_SIZE-1],
281 usage, speed) >= 0)
282 continue;
284 if (offendingconns[OOBS_SIZE-1] != 0)
285 kref_put(&(offendingconns[OOBS_SIZE-1]->ref),
286 free_conn);
288 kref_get(&(conn->ref));
289 offendingconns[OOBS_SIZE-1] = conn;
290 offendingusage[OOBS_SIZE-1] = usage;
291 offendingspeed[OOBS_SIZE-1] = speed;
293 for (i=OOBS_SIZE-2;i>=0;i++) {
294 struct conn *tmpconn;
295 __u32 usage_tmp;
296 __u64 speed_tmp;
298 if (offendingconns[i] != 0 && compare_scores(
299 offendingusage[i], offendingspeed[i],
300 offendingusage[i+1],
301 offendingspeed[i+1]) >= 0)
302 break;
304 tmpconn = offendingconns[i];
305 usage_tmp = offendingusage[i];
306 speed_tmp = offendingspeed[i];
308 offendingconns[i] = offendingconns[i+1];
309 offendingusage[i] = offendingusage[i+1];
310 offendingspeed[i] = offendingspeed[i+1];
312 offendingconns[i+1] = tmpconn;
313 offendingusage[i+1] = usage_tmp;
314 offendingspeed[i+1] = speed_tmp;
318 for (i=0;i<OOBS_SIZE;i++) {
319 kref_get(&(offendingconns[i]->ref));
322 mutex_unlock(&buffer_conn_list_lock);
324 for (i=0;i<OOBS_SIZE;i++) {
325 int resetneeded;
327 if (offendingconns[i] == 0)
328 break;
330 mutex_lock(&bufferlimits_lock);
331 resetneeded = ((bufferusage_reserve*4)/3 > BUFFERSPACE_RESERVE);
332 mutex_unlock(&bufferlimits_lock);
334 if (resetneeded)
335 reset_conn(offendingconns[i]);
336 kref_put(&(offendingconns[i]->ref), free_conn);
339 mutex_lock(&bufferlimits_lock);
340 mutex_lock(&buffer_conn_list_lock);
341 while(list_empty(&buffer_conn_tmp_list) == 0) {
342 curr = buffer_conn_tmp_list.next;
343 list_del(curr);
344 list_add(curr, &buffer_conn_list);
346 mutex_unlock(&buffer_conn_list_lock);
347 mutex_unlock(&bufferlimits_lock);
350 static void outofbufferspace(struct work_struct *work)
352 while (1) {
353 unsigned long iflags;
354 int resetneeded;
356 mutex_lock(&bufferlimits_lock);
357 spin_lock_irqsave(&oobss_lock, iflags);
358 resetneeded = (bufferusage_reserve > BUFFERSPACE_RESERVE);
360 if (resetneeded == 0)
361 outofbufferspace_scheduled = 0;
363 spin_unlock_irqrestore(&oobss_lock, iflags);
364 mutex_unlock(&bufferlimits_lock);
366 if (resetneeded == 0)
367 return;
369 _outofbufferspace();
373 static void refresh_bufferusage(struct conn *rconn)
375 BUG_ON(rconn->sourcetype != SOURCE_IN);
377 bufferusage_init -= rconn->source.in.usage_init;
378 bufferusage_speed -= rconn->source.in.usage_speed;
379 bufferusage_ata -= rconn->source.in.usage_ata;
380 bufferusage_reserve -= rconn->source.in.usage_reserve;
382 rconn->source.in.usage_ata = rconn->buf.totalsize;
383 if (rconn->source.in.usage_ata > rconn->source.in.buffer_ata)
384 rconn->source.in.usage_ata = rconn->source.in.buffer_ata;
387 if (rconn->source.in.usage_ata == rconn->buf.totalsize)
388 rconn->source.in.usage_speed = 0;
389 else
390 rconn->source.in.usage_speed = rconn->buf.totalsize -
391 rconn->source.in.usage_ata;
393 if (rconn->source.in.usage_speed > rconn->source.in.buffer_speed)
394 rconn->source.in.usage_speed = rconn->source.in.buffer_speed;
397 if ((rconn->source.in.usage_ata + rconn->source.in.usage_speed) ==
398 rconn->buf.totalsize)
399 rconn->source.in.usage_init = 0;
400 else
401 rconn->source.in.usage_init = rconn->buf.totalsize -
402 rconn->source.in.usage_ata -
403 rconn->source.in.usage_speed;
405 if (rconn->source.in.usage_init > rconn->source.in.buffer_init)
406 rconn->source.in.usage_init = rconn->source.in.buffer_init;
409 if ((rconn->source.in.usage_ata + rconn->source.in.usage_speed +
410 rconn->source.in.usage_init) == rconn->buf.totalsize)
411 rconn->source.in.usage_reserve = 0;
412 else
413 rconn->source.in.usage_reserve = rconn->buf.totalsize -
414 rconn->source.in.usage_ata -
415 rconn->source.in.usage_speed -
416 rconn->source.in.usage_init;
418 bufferusage_init += rconn->source.in.usage_init;
419 bufferusage_speed += rconn->source.in.usage_speed;
420 bufferusage_ata += rconn->source.in.usage_ata;
421 bufferusage_reserve += rconn->source.in.usage_reserve;
423 if (bufferusage_reserve > BUFFERSPACE_RESERVE) {
424 unsigned long iflags;
425 spin_lock_irqsave(&oobss_lock, iflags);
426 if (outofbufferspace_scheduled == 0) {
427 INIT_WORK(&outofbufferspace_work, outofbufferspace);
428 queue_work(outofbufferspace_wq, &outofbufferspace_work);
431 spin_unlock_irqrestore(&oobss_lock, iflags);
435 static __u32 __get_window(struct conn *rconn)
437 __u64 window = 0;
439 if (rconn->source.in.usage_reserve != 0)
440 return 0;
442 BUG_ON(rconn->source.in.usage_init > rconn->source.in.buffer_init);
443 BUG_ON(rconn->source.in.usage_speed > rconn->source.in.buffer_speed);
444 BUG_ON(rconn->source.in.usage_ata > rconn->source.in.buffer_ata);
446 window += rconn->source.in.buffer_init;
447 window += rconn->source.in.buffer_speed;
448 window += rconn->source.in.buffer_ata;
450 window -= rconn->source.in.usage_init;
451 window -= rconn->source.in.usage_speed;
452 window -= rconn->source.in.usage_ata;
454 if (window > MAX_ANNOUNCE_WINDOW)
455 window = MAX_ANNOUNCE_WINDOW;
457 return window;
460 static __u32 _get_window(struct conn *rconn, int listlocked)
462 unsigned long iflags;
464 unsigned long jiffies_tmp;
466 __u32 window = 0;
468 __u32 conns;
469 __u64 bufferlimit_init;
470 __u64 connlimit_init;
472 __u64 totalspeed;
473 __u64 bufferlimit_speed;
474 __u64 connlimit_speed;
476 mutex_lock(&(rconn->rcv_lock));
478 BUG_ON(rconn->sourcetype != SOURCE_IN);
480 if (atomic_read(&(rconn->isreset)) != 0) {
481 if (listlocked && (rconn->source.in.buffer_list.next != 0 ||
482 rconn->source.in.buffer_list.prev != 0)) {
483 list_del(&(rconn->source.in.buffer_list));
484 rconn->source.in.buffer_list.next = 0;
485 rconn->source.in.buffer_list.prev = 0;
486 kref_put(&(rconn->ref), free_conn);
488 goto out;
491 if (listlocked){
492 if (rconn->source.in.buffer_list.next != 0 ||
493 rconn->source.in.buffer_list.prev != 0) {
494 list_del(&(rconn->source.in.buffer_list));
495 } else {
496 kref_get(&(rconn->ref));
498 list_add_tail(&(rconn->source.in.buffer_list),
499 &buffer_conn_list);
500 } else if (rconn->source.in.buffer_list.next == 0 &&
501 rconn->source.in.buffer_list.prev == 0) {
502 kref_get(&(rconn->ref));
503 list_add_tail(&(rconn->source.in.buffer_list),
504 &buffer_conn_tmp_list);
508 conns = atomic_read(&num_conns);
509 bufferlimit_init = desired_bufferusage(bufferassigned_init,
510 bufferusage_init, BUFFERASSIGN_INIT, BUFFERSPACE_INIT);
511 connlimit_init = (bufferlimit_init + conns - 1) / conns;
513 bufferassigned_init -= rconn->source.in.buffer_init;
514 if (((__u32) connlimit_init) != connlimit_init)
515 rconn->source.in.buffer_init = -1;
516 else
517 rconn->source.in.buffer_init = (__u32) connlimit_init;
518 bufferassigned_init += rconn->source.in.buffer_init;
521 spin_lock_irqsave(&st_lock, iflags);
522 jiffies_tmp = jiffies;
523 totalspeed = get_speed(&st, jiffies_tmp);
524 bufferlimit_speed = desired_bufferusage(bufferassigned_speed,
525 bufferusage_speed, BUFFERASSIGN_SPEED,
526 BUFFERSPACE_SPEED);
527 connlimit_speed = multiply_div(bufferlimit_speed,
528 get_speed(&(rconn->source.in.st), jiffies_tmp),
529 totalspeed);
530 spin_unlock_irqrestore(&st_lock, iflags);
532 bufferassigned_speed -= rconn->source.in.buffer_speed;
533 if (((__u32) connlimit_speed) != connlimit_speed)
534 rconn->source.in.buffer_speed = -1;
535 else
536 rconn->source.in.buffer_speed = (__u32) connlimit_speed;
537 bufferassigned_speed += rconn->source.in.buffer_speed;
539 refresh_bufferusage(rconn);
541 window = __get_window(rconn);
542 out:
543 mutex_unlock(&(rconn->rcv_lock));
545 return window;
548 /* do not hold rcv_lock while calling this */
549 __u32 get_window(struct conn *rconn)
551 struct conn *rconn2;
552 int listlocked;
554 __u32 window;
556 mutex_lock(&bufferlimits_lock);
557 listlocked = mutex_trylock(&buffer_conn_list_lock);
559 window = _get_window(rconn, listlocked);
561 if (listlocked){
563 * refresh window of idle conns as well to keep global counters
564 * accurate
567 rconn2 = container_of(buffer_conn_list.next, struct conn,
568 source.in.buffer_list);
570 if (list_empty(&buffer_conn_list) == 0 && rconn2 != rconn)
571 _get_window(rconn2, listlocked);
574 if (list_empty(&buffer_conn_tmp_list) == 0) {
575 rconn2 = container_of(buffer_conn_tmp_list.next,
576 struct conn, source.in.buffer_list);
577 BUG_ON(rconn2 == rconn);
578 _get_window(rconn2, listlocked);
581 mutex_unlock(&buffer_conn_list_lock);
584 mutex_unlock(&bufferlimits_lock);
586 return window;
589 void reset_bufferusage(struct conn *conn)
591 int listlocked;
593 mutex_lock(&bufferlimits_lock);
594 listlocked = mutex_trylock(&buffer_conn_list_lock);
595 mutex_lock(&(conn->rcv_lock));
597 if (conn->sourcetype != SOURCE_IN)
598 goto out;
600 bufferusage_init -= conn->source.in.usage_init;
601 bufferusage_speed -= conn->source.in.usage_speed;
602 bufferusage_ata -= conn->source.in.usage_ata;
603 bufferusage_reserve -= conn->source.in.usage_reserve;
605 bufferassigned_init -= conn->source.in.buffer_init;
606 bufferassigned_speed -= conn->source.in.buffer_speed;
607 bufferassigned_ata -= conn->source.in.buffer_ata;
609 if (listlocked && (conn->source.in.buffer_list.next != 0 ||
610 conn->source.in.buffer_list.prev != 0)) {
611 list_del(&(conn->source.in.buffer_list));
612 conn->source.in.buffer_list.next = 0;
613 conn->source.in.buffer_list.prev = 0;
614 kref_put(&(conn->ref), free_conn);
617 out:
618 mutex_unlock(&(conn->rcv_lock));
619 if (listlocked)
620 mutex_unlock(&buffer_conn_list_lock);
621 mutex_unlock(&bufferlimits_lock);
624 void refresh_speedstat(struct conn *rconn, __u32 written)
626 unsigned long iflags;
627 unsigned long jiffies_tmp;
629 spin_lock_irqsave(&st_lock, iflags);
631 jiffies_tmp = jiffies;
633 if (rconn->source.in.st.jiffies_last_update != jiffies_tmp)
634 get_speed(&(rconn->source.in.st), jiffies_tmp);
635 if (rconn->source.in.st.bytes_curr + written < written)
636 rconn->source.in.st.bytes_curr = -1;
637 rconn->source.in.st.bytes_curr += written;
639 if (st.jiffies_last_update != jiffies_tmp)
640 get_speed(&st, jiffies_tmp);
641 if (st.bytes_curr + written < written)
642 st.bytes_curr = -1;
643 st.bytes_curr += written;
645 spin_unlock_irqrestore(&st_lock, iflags);
648 void drain_ooo_queue(struct conn *rconn)
650 struct sk_buff *skb;
652 BUG_ON(SOURCE_IN != rconn->sourcetype);
654 skb = rconn->source.in.reorder_queue.next;
656 while ((void *) skb != (void *) &(rconn->source.in.reorder_queue)) {
657 struct skb_procstate *ps = skb_pstate(skb);
658 int drop;
660 if (rconn->source.in.next_seqno != ps->funcstate.rcv2.seqno)
661 break;
663 drop = receive_skb(rconn, skb);
664 if (drop)
665 break;
667 skb_unlink(skb, &(rconn->source.in.reorder_queue));
668 rconn->source.in.ooo_packets--;
669 atomic_dec(&(rconn->source.in.nb->ooo_packets));
670 atomic_dec(&ooo_packets);
672 rconn->source.in.next_seqno += skb->len;
676 static int _conn_rcv_ooo(struct conn *rconn, struct sk_buff *skb)
678 struct skb_procstate *ps = skb_pstate(skb);
679 struct sk_buff_head *reorder_queue = &(rconn->source.in.reorder_queue);
680 struct sk_buff *curr = reorder_queue->next;
682 long ooo;
684 rconn->source.in.ooo_packets++;
685 if (rconn->source.in.ooo_packets > MAX_TOTAL_OOO_PER_CONN)
686 goto drop_ooo3;
688 ooo = atomic_inc_return(&(rconn->source.in.nb->ooo_packets));
689 if (ooo > MAX_TOTAL_OOO_PER_NEIGH)
690 goto drop_ooo2;
692 ooo = atomic_inc_return(&ooo_packets);
693 if (ooo > MAX_TOTAL_OOO_PACKETS)
694 goto drop_ooo1;
697 while (1) {
698 struct skb_procstate *ps2 = skb_pstate(curr);
700 if ((void *) curr == (void *) reorder_queue) {
701 skb_queue_tail(reorder_queue, skb);
702 break;
705 if (ps->funcstate.rcv2.seqno > ps2->funcstate.rcv2.seqno) {
706 skb_insert(curr, skb, reorder_queue);
707 break;
710 curr = curr->next;
713 if (0) {
714 drop_ooo1:
715 atomic_dec(&ooo_packets);
716 drop_ooo2:
717 atomic_dec(&(rconn->source.in.nb->ooo_packets));
718 drop_ooo3:
719 rconn->source.in.ooo_packets--;
721 return 1;
724 return 0;
727 static void _conn_rcv(struct conn *rconn, struct sk_buff *skb)
729 struct skb_procstate *ps = skb_pstate(skb);
730 struct control_msg_out *cm = alloc_control_msg(rconn->source.in.nb,
731 ACM_PRIORITY_MED);
733 int in_order;
734 int drop = 1;
735 int flush = 0;
737 __u32 len = skb->len;
739 BUG_ON(rconn->sourcetype != SOURCE_IN);
741 if (unlikely(cm == 0)) {
742 kfree_skb(skb);
743 return;
746 mutex_lock(&(rconn->rcv_lock));
748 in_order = (rconn->source.in.next_seqno == ps->funcstate.rcv2.seqno);
750 if (in_order == 0) {
751 drop = _conn_rcv_ooo(rconn, skb);
752 } else {
753 rconn->source.in.next_seqno += skb->len;
754 drop = receive_skb(rconn, skb);
757 if (drop) {
758 kfree_skb(skb);
759 free_control_msg(cm);
760 goto out;
763 #warning todo balance credits
764 if (in_order == 0) {
765 flush = 1;
766 send_ack_conn_ooo(cm, rconn,
767 rconn->reversedir->target.out.conn_id,
768 rconn->source.in.next_seqno,
769 ps->funcstate.rcv2.seqno, len);
770 } else {
771 drain_ooo_queue(rconn);
772 send_ack_conn(cm, rconn, rconn->reversedir->target.out.conn_id,
773 rconn->source.in.next_seqno);
776 out:
777 mutex_unlock(&(rconn->rcv_lock));
779 flush_buf(rconn);
782 static void conn_rcv(struct sk_buff *skb, __u32 conn_id, __u32 seqno)
784 struct conn *rconn;
785 struct skb_procstate *ps = skb_pstate(skb);
787 ps->funcstate.rcv2.seqno = seqno;
789 rconn = get_conn(conn_id);
791 if (unlikely(rconn == 0)) {
792 printk(KERN_DEBUG "unknown conn_id when receiving: %d", conn_id);
793 kfree_skb(skb);
794 return;
796 _conn_rcv(rconn, skb);
797 kref_put(&(rconn->ref), free_conn);
800 void conn_rcv_buildskb(char *data, __u32 datalen, __u32 conn_id, __u32 seqno)
802 struct sk_buff *skb = alloc_skb(datalen, GFP_KERNEL);
803 char *dst = skb_put(skb, datalen);
804 memcpy(dst, data, datalen);
805 conn_rcv(skb, conn_id, seqno);
808 static void rcv_data(struct sk_buff *skb)
810 __u32 conn_id;
811 __u32 seqno;
813 char *connid_p = cor_pull_skb(skb, 4);
814 char *seqno_p = cor_pull_skb(skb, 4);
816 /* __u8 rand; */
818 ((char *)&conn_id)[0] = connid_p[0];
819 ((char *)&conn_id)[1] = connid_p[1];
820 ((char *)&conn_id)[2] = connid_p[2];
821 ((char *)&conn_id)[3] = connid_p[3];
823 ((char *)&seqno)[0] = seqno_p[0];
824 ((char *)&seqno)[1] = seqno_p[1];
825 ((char *)&seqno)[2] = seqno_p[2];
826 ((char *)&seqno)[3] = seqno_p[3];
828 conn_id = be32_to_cpu(conn_id);
829 seqno = be32_to_cpu(seqno);
831 /* get_random_bytes(&rand, 1);
833 if (rand < 64) {
834 printk(KERN_ERR "drop %d %d %d %d %d", conn_id, seqno_p[0],
835 seqno_p[1], seqno_p[2], seqno_p[3]);
836 goto drop;
837 } */
839 if (conn_id == 0) {
840 struct neighbor *nb = get_neigh_by_mac(skb);
841 if (unlikely(nb == 0))
842 goto drop;
843 kernel_packet(nb, skb, seqno);
844 kref_put(&(nb->ref), neighbor_free);
845 } else {
846 conn_rcv(skb, conn_id, seqno);
849 if (0) {
850 drop:
851 kfree_skb(skb);
855 static void rcv(struct work_struct *work)
857 struct sk_buff *skb = skb_from_pstate(container_of(work,
858 struct skb_procstate, funcstate.rcv.work));
860 __u8 packet_type;
861 char *packet_type_p;
863 atomic_dec(&packets_in_workqueue);
865 packet_type_p = cor_pull_skb(skb, 1);
867 if (unlikely(packet_type_p == 0))
868 goto drop;
870 packet_type = *packet_type_p;
872 if (packet_type == PACKET_TYPE_ANNOUNCE) {
873 rcv_announce(skb);
874 return;
877 if (unlikely(packet_type != PACKET_TYPE_DATA))
878 goto drop;
880 rcv_data(skb);
882 if (0) {
883 drop:
884 kfree_skb(skb);
888 static int queue_rcv_processing(struct sk_buff *skb, struct net_device *dev,
889 struct packet_type *pt, struct net_device *orig_dev)
891 struct skb_procstate *ps = skb_pstate(skb);
892 long queuelen;
894 if (skb->pkt_type == PACKET_OTHERHOST)
895 goto drop;
897 BUG_ON(skb->next != 0);
899 queuelen = atomic_inc_return(&packets_in_workqueue);
901 BUG_ON(queuelen <= 0);
903 if (queuelen > MAX_PACKETS_IN_RCVQUEUE) {
904 atomic_dec(&packets_in_workqueue);
905 goto drop;
908 INIT_WORK(&(ps->funcstate.rcv.work), rcv);
909 queue_work(packet_wq, &(ps->funcstate.rcv.work));
910 return NET_RX_SUCCESS;
912 drop:
913 kfree_skb(skb);
914 return NET_RX_DROP;
917 static struct packet_type ptype_cor = {
918 .type = htons(ETH_P_COR),
919 .dev = 0,
920 .func = queue_rcv_processing
923 int __init cor_rcv_init(void)
925 bufferassigned_init = 0;
926 bufferassigned_speed = 0;
927 bufferassigned_ata = 0;
929 bufferusage_init = 0;
930 bufferusage_speed = 0;
931 bufferusage_ata = 0;
932 bufferusage_reserve = 0;
934 spin_lock_init(&st_lock);
935 memset(&st, 0, sizeof(struct speedtracker));
937 BUG_ON(sizeof(struct skb_procstate) > 48);
938 packet_wq = create_workqueue("cor_packet");
939 outofbufferspace_wq = create_workqueue("cor_outofbufferspace");
940 spin_lock_init(&oobss_lock);
941 outofbufferspace_scheduled = 0;
943 dev_add_pack(&ptype_cor);
944 return 0;
947 MODULE_LICENSE("GPL");