conn reference renaming and locking logic
[cor_2_6_31.git] / net / cor / rcv.c
blob7945875fb17783f08f0f089c5b2f73b26ac97568
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2011 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/module.h>
22 #include <linux/version.h>
23 #include <linux/kernel.h>
24 #include <linux/init.h>
25 #include <linux/in.h>
28 #include "cor.h"
30 static atomic_t packets_in_workqueue = ATOMIC_INIT(0);
32 static atomic_t ooo_packets = ATOMIC_INIT(0);
34 static struct workqueue_struct *packet_wq;
36 static struct work_struct outofbufferspace_work;
37 DEFINE_SPINLOCK(oobss_lock);
38 static int outofbufferspace_scheduled;
40 /**
41 * buffering space is divided in 4 areas:
43 * buffer_init:
44 * distributed equally among all conns; shrinks and grows immediately when there
45 * are new connections or connections are reset
47 * buffer_speed:
48 * distributed proportional to speed; shrinks and grows constantly
50 * buffer_ata:
51 * used to make noise to make traffic analysis harder; may only shrink if data
52 * is either send on or "stuck" for a long time
54 * buffer_reserve:
55 * reserve in case where sudden shrinking causes some connections to contain
56 * more old data than allowed by the first 3 buffers. If this area is full, the
57 * out-of-memory conn-resetter is triggered
59 * Each area commits a certain amount of space to each connection. This is the
60 * maximum buffer space a connection is allowed to use. The space of a specific
61 * connection is first accounted to buffer_ata. If the buffer space allowed to
62 * use is exceeded, the rest is accounted to buffer_speed and then buffer_init.
63 * The reserve area will be used last. This should only be the case, if the
64 * assigned buffer space of the first 3 areas shrink suddenly. If this area is
65 * also used up, connections will be reset.
68 DEFINE_MUTEX(buffer_conn_list_lock);
69 LIST_HEAD(buffer_conn_list);
71 /**
72 * used to buffer inserts when main list is locked, moved to main list, after
73 * processing of main list finishes
75 LIST_HEAD(buffer_conn_tmp_list); /* protected by bufferlimits_lock */
78 DEFINE_MUTEX(bufferlimits_lock);
80 static __u64 bufferassigned_init;
81 static __u64 bufferassigned_speed;
82 static __u64 bufferassigned_ata;
84 static __u64 bufferusage_init;
85 static __u64 bufferusage_speed;
86 static __u64 bufferusage_ata;
87 static __u64 bufferusage_reserve;
89 DEFINE_SPINLOCK(st_lock);
90 static struct speedtracker st;
92 static __u64 desired_bufferusage(__u64 assigned, __u64 usage, __u64 assignlimit,
93 __u64 usagelimit)
95 __u64 ret;
96 __u64 load;
98 if (unlikely(assignlimit < usagelimit))
99 assignlimit = usagelimit;
101 if (multiply_div(usage, 9, 10) > usagelimit)
102 return multiply_div(usagelimit, 9, 10);
104 load = multiply_div(usage, 192, usagelimit);
106 /* slow limit increase, fast decrease */
107 if (load == 128) {
108 ret = assigned;
109 } else if (load < 128) {
110 if (load == 0)
111 return multiply_div(usagelimit, 9, 10);
113 if (load < 96)
114 load = 96;
115 ret = multiply_div(assigned, 128, load);
116 } else {
117 ret = multiply_div(assigned, 128, load + load - 128);
120 if (ret > assignlimit)
121 return assignlimit;
122 return ret;
125 #warning todo changing speed too fast
126 static __u64 get_speed(struct speedtracker *st, unsigned long jiffies_tmp)
128 if (unlikely(time_after(st->jiffies_last_update, jiffies_tmp) ||
129 time_before(st->jiffies_last_update + HZ*10,
130 jiffies_tmp))) {
131 st->jiffies_last_update = jiffies_tmp;
132 st->speed = 0;
133 st->bytes_curr = 0;
134 return 0;
137 for (;time_before(st->jiffies_last_update, jiffies_tmp);
138 st->jiffies_last_update++) {
139 __u32 bytes_curr = 0;
140 if ((st->jiffies_last_update + 1) == jiffies_tmp) {
141 bytes_curr = st->bytes_curr;
142 st->bytes_curr = 0;
144 st->speed = (st->speed * (HZ-1) + (((__u64)bytes_curr)<<16))/HZ;
147 if ((st->jiffies_last_update + 1) == jiffies_tmp) {
148 st->jiffies_last_update++;
151 return (st->speed * (HZ*2 - 1) + (((__u64)st->bytes_curr) << 17))/HZ/2;
155 * val1[0], val2[2], res[0] ... least significant
156 * val1[val1len-1], val2[val2len-1], res[reslen-1] ... most significant
158 static void mul(__u32 *val1, unsigned int val1len, __u32 *val2,
159 unsigned int val2len, __u32 *res, int reslen)
161 int digits = val1len + val2len;
162 __u64 overflow = 0;
163 int i;
165 BUG_ON(val1len > 0 && val2len > 0 && reslen < digits);
167 memset(res, 0, reslen);
169 if (val1len == 0 || val2len == 0)
170 return;
172 for(i=0;i<digits;i++) {
173 int idx1;
174 res[i] = (__u32) overflow;
175 overflow = overflow >> 32;
176 for(idx1=0;idx1<val1len && idx1<=i;idx1++) {
177 int idx2 = i - idx1;
178 __u64 tmpres;
180 if (idx2 >= val2len)
181 continue;
183 tmpres = ((__u64) (val1[idx1])) *
184 ((__u64) (val2[idx2]));
185 overflow += tmpres >> 32;
186 tmpres = (tmpres << 32) >> 32;
187 if (res[i] + tmpres < res[i])
188 overflow++;
189 res[i] += tmpres;
192 BUG_ON(overflow != 0);
196 * return values:
197 * 1 == usage1/speed1 offends more
198 * 0 == both offend the same
199 * -1 == usage2/speed2 offends more
201 static int compare_scores(__u32 usage1, __u64 speed1, __u32 usage2,
202 __u64 speed2)
204 int i;
206 __u32 speed1squared[4];
207 __u32 speed2squared[4];
209 __u32 speed1squared_usage2[5];
210 __u32 speed2squared_usage1[5];
213 __u32 speed1_tmp[2];
214 __u32 speed2_tmp[2];
216 speed1_tmp[0] = (speed1 << 32) >> 32;
217 speed1_tmp[1] = (speed1 >> 32);
218 speed2_tmp[0] = (speed2 << 32) >> 32;
219 speed2_tmp[1] = (speed2 << 32);
221 mul(speed1_tmp, 2, speed1_tmp, 2, speed1squared,4);
222 mul(speed2_tmp, 2, speed2_tmp, 2, speed2squared, 4);
224 mul(speed1squared, 4, &usage2, 1, speed1squared_usage2, 5);
225 mul(speed2squared, 4, &usage2, 1, speed2squared_usage1, 5);
227 for(i=4;i>=0;i++) {
228 if (speed1squared_usage2[i] > speed2squared_usage1[i])
229 return -1;
230 if (speed1squared_usage2[i] < speed2squared_usage1[i])
231 return 1;
234 return 0;
237 #define OOBS_SIZE 10
238 static void _outofbufferspace(void)
240 int i;
242 struct list_head *curr;
243 struct conn *offendingconns[OOBS_SIZE];
244 __u32 offendingusage[OOBS_SIZE];
245 __u64 offendingspeed[OOBS_SIZE];
247 memset(&offendingconns, 0, sizeof(offendingconns));
249 mutex_lock(&buffer_conn_list_lock);
251 curr = buffer_conn_list.next;
252 while (curr != &buffer_conn_list) {
253 unsigned long iflags;
255 struct conn *src_in_o = container_of(curr, struct conn,
256 source.in.buffer_list);
258 __u32 usage;
259 __u64 speed;
261 int i;
263 curr = curr->next;
265 mutex_lock(&(src_in_o->rcv_lock));
267 BUG_ON(src_in_o->sourcetype != SOURCE_IN);
269 usage = src_in_o->source.in.usage_reserve;
271 spin_lock_irqsave(&st_lock, iflags);
272 speed = get_speed(&(src_in_o->source.in.st), jiffies);
273 spin_unlock_irqrestore(&st_lock, iflags);
275 mutex_unlock(&(src_in_o->rcv_lock));
277 if (offendingconns[OOBS_SIZE-1] != 0 &&
278 compare_scores(
279 offendingusage[OOBS_SIZE-1],
280 offendingspeed[OOBS_SIZE-1],
281 usage, speed) >= 0)
282 continue;
284 if (offendingconns[OOBS_SIZE-1] != 0)
285 kref_put(&(offendingconns[OOBS_SIZE-1]->ref),
286 free_conn);
287 kref_get(&(src_in_o->ref));
288 offendingconns[OOBS_SIZE-1] = src_in_o;
289 offendingusage[OOBS_SIZE-1] = usage;
290 offendingspeed[OOBS_SIZE-1] = speed;
292 for (i=OOBS_SIZE-2;i>=0;i++) {
293 struct conn *tmpconn;
294 __u32 usage_tmp;
295 __u64 speed_tmp;
297 if (offendingconns[i] != 0 && compare_scores(
298 offendingusage[i], offendingspeed[i],
299 offendingusage[i+1],
300 offendingspeed[i+1]) >= 0)
301 break;
303 tmpconn = offendingconns[i];
304 usage_tmp = offendingusage[i];
305 speed_tmp = offendingspeed[i];
307 offendingconns[i] = offendingconns[i+1];
308 offendingusage[i] = offendingusage[i+1];
309 offendingspeed[i] = offendingspeed[i+1];
311 offendingconns[i+1] = tmpconn;
312 offendingusage[i+1] = usage_tmp;
313 offendingspeed[i+1] = speed_tmp;
317 for (i=0;i<OOBS_SIZE;i++) {
318 kref_get(&(offendingconns[i]->ref));
321 mutex_unlock(&buffer_conn_list_lock);
323 for (i=0;i<OOBS_SIZE;i++) {
324 int resetneeded;
326 if (offendingconns[i] == 0)
327 break;
329 mutex_lock(&bufferlimits_lock);
330 resetneeded = ((bufferusage_reserve*4)/3 > BUFFERSPACE_RESERVE);
331 mutex_unlock(&bufferlimits_lock);
333 if (resetneeded)
334 reset_conn(offendingconns[i]);
335 kref_put(&(offendingconns[i]->ref), free_conn);
338 mutex_lock(&bufferlimits_lock);
339 mutex_lock(&buffer_conn_list_lock);
340 while(list_empty(&buffer_conn_tmp_list) == 0) {
341 curr = buffer_conn_tmp_list.next;
342 list_del(curr);
343 list_add(curr, &buffer_conn_list);
345 mutex_unlock(&buffer_conn_list_lock);
346 mutex_unlock(&bufferlimits_lock);
349 static void outofbufferspace(struct work_struct *work)
351 while (1) {
352 unsigned long iflags;
353 int resetneeded;
355 mutex_lock(&bufferlimits_lock);
356 spin_lock_irqsave(&oobss_lock, iflags);
357 resetneeded = (bufferusage_reserve > BUFFERSPACE_RESERVE);
359 if (resetneeded == 0)
360 outofbufferspace_scheduled = 0;
362 spin_unlock_irqrestore(&oobss_lock, iflags);
363 mutex_unlock(&bufferlimits_lock);
365 if (resetneeded == 0)
366 return;
368 _outofbufferspace();
372 static void refresh_bufferusage(struct conn *src_in_l)
374 BUG_ON(src_in_l->sourcetype != SOURCE_IN);
376 bufferusage_init -= src_in_l->source.in.usage_init;
377 bufferusage_speed -= src_in_l->source.in.usage_speed;
378 bufferusage_ata -= src_in_l->source.in.usage_ata;
379 bufferusage_reserve -= src_in_l->source.in.usage_reserve;
381 src_in_l->source.in.usage_ata = src_in_l->data_buf.totalsize;
382 if (src_in_l->source.in.usage_ata > src_in_l->source.in.buffer_ata)
383 src_in_l->source.in.usage_ata = src_in_l->source.in.buffer_ata;
386 if (src_in_l->source.in.usage_ata == src_in_l->data_buf.totalsize)
387 src_in_l->source.in.usage_speed = 0;
388 else
389 src_in_l->source.in.usage_speed = src_in_l->data_buf.totalsize -
390 src_in_l->source.in.usage_ata;
392 if (src_in_l->source.in.usage_speed > src_in_l->source.in.buffer_speed)
393 src_in_l->source.in.usage_speed =
394 src_in_l->source.in.buffer_speed;
397 if ((src_in_l->source.in.usage_ata + src_in_l->source.in.usage_speed) ==
398 src_in_l->data_buf.totalsize)
399 src_in_l->source.in.usage_init = 0;
400 else
401 src_in_l->source.in.usage_init = src_in_l->data_buf.totalsize -
402 src_in_l->source.in.usage_ata -
403 src_in_l->source.in.usage_speed;
405 if (src_in_l->source.in.usage_init > src_in_l->source.in.buffer_init)
406 src_in_l->source.in.usage_init =
407 src_in_l->source.in.buffer_init;
410 if ((src_in_l->source.in.usage_ata + src_in_l->source.in.usage_speed +
411 src_in_l->source.in.usage_init) ==
412 src_in_l->data_buf.totalsize)
413 src_in_l->source.in.usage_reserve = 0;
414 else
415 src_in_l->source.in.usage_reserve =
416 src_in_l->data_buf.totalsize -
417 src_in_l->source.in.usage_ata -
418 src_in_l->source.in.usage_speed -
419 src_in_l->source.in.usage_init;
421 bufferusage_init += src_in_l->source.in.usage_init;
422 bufferusage_speed += src_in_l->source.in.usage_speed;
423 bufferusage_ata += src_in_l->source.in.usage_ata;
424 bufferusage_reserve += src_in_l->source.in.usage_reserve;
426 if (bufferusage_reserve > BUFFERSPACE_RESERVE) {
427 unsigned long iflags;
428 spin_lock_irqsave(&oobss_lock, iflags);
429 if (outofbufferspace_scheduled == 0) {
430 schedule_work(&outofbufferspace_work);
431 outofbufferspace_scheduled = 1;
434 spin_unlock_irqrestore(&oobss_lock, iflags);
438 static __u8 __get_window(struct conn *src_in_l)
440 __u64 window = 0;
442 BUG_ON(src_in_l->sourcetype != SOURCE_IN);
444 if (src_in_l->source.in.usage_reserve != 0)
445 return 0;
447 BUG_ON(src_in_l->source.in.usage_init >
448 src_in_l->source.in.buffer_init);
449 BUG_ON(src_in_l->source.in.usage_speed >
450 src_in_l->source.in.buffer_speed);
451 BUG_ON(src_in_l->source.in.usage_ata > src_in_l->source.in.buffer_ata);
453 window += src_in_l->source.in.buffer_init;
454 window += src_in_l->source.in.buffer_speed;
455 window += src_in_l->source.in.buffer_ata;
457 window -= src_in_l->source.in.usage_init;
458 window -= src_in_l->source.in.usage_speed;
459 window -= src_in_l->source.in.usage_ata;
461 if (window > MAX_ANNOUNCE_WINDOW)
462 window = MAX_ANNOUNCE_WINDOW;
464 return enc_log_64_11(window);
467 #warning todo upper buffer limits
468 static __u8 _get_window(struct conn *cn, struct neighbor *expectedsender,
469 int listlocked)
471 unsigned long iflags;
473 unsigned long jiffies_tmp;
475 __u8 window = 0;
477 __s32 conns;
478 __u64 bufferlimit_init;
479 __u64 connlimit_init;
481 __u64 totalspeed;
482 __u64 bufferlimit_speed;
483 __u64 connlimit_speed;
485 mutex_lock(&(cn->rcv_lock));
487 BUG_ON(expectedsender == 0 && cn->sourcetype != SOURCE_IN);
489 if (unlikely(unlikely(cn->sourcetype != SOURCE_IN) ||
490 unlikely(expectedsender != 0 && cn->source.in.nb !=
491 expectedsender)))
492 goto out;
494 if (unlikely(atomic_read(&(cn->isreset)) != 0)) {
495 if (listlocked && (cn->source.in.buffer_list.next != 0 ||
496 cn->source.in.buffer_list.prev != 0)) {
497 list_del(&(cn->source.in.buffer_list));
498 cn->source.in.buffer_list.next = 0;
499 cn->source.in.buffer_list.prev = 0;
500 kref_put(&(cn->ref), free_conn);
502 goto out;
505 if (listlocked){
506 if (cn->source.in.buffer_list.next != 0 ||
507 cn->source.in.buffer_list.prev != 0) {
508 list_del(&(cn->source.in.buffer_list));
509 } else {
510 kref_get(&(cn->ref));
512 list_add_tail(&(cn->source.in.buffer_list),
513 &buffer_conn_list);
514 } else if (cn->source.in.buffer_list.next == 0 &&
515 cn->source.in.buffer_list.prev == 0) {
516 kref_get(&(cn->ref));
517 list_add_tail(&(cn->source.in.buffer_list),
518 &buffer_conn_tmp_list);
522 conns = atomic_read(&num_conns);
523 BUG_ON(conns < 0);
524 bufferlimit_init = desired_bufferusage(bufferassigned_init,
525 bufferusage_init, BUFFERASSIGN_INIT, BUFFERSPACE_INIT);
526 connlimit_init = (bufferlimit_init + conns - 1) / conns;
528 bufferassigned_init -= cn->source.in.buffer_init;
529 if (((__u32) connlimit_init) != connlimit_init)
530 cn->source.in.buffer_init = -1;
531 else
532 cn->source.in.buffer_init = (__u32) connlimit_init;
533 bufferassigned_init += cn->source.in.buffer_init;
535 spin_lock_irqsave(&st_lock, iflags);
536 jiffies_tmp = jiffies;
537 totalspeed = get_speed(&st, jiffies_tmp);
538 bufferlimit_speed = desired_bufferusage(bufferassigned_speed,
539 bufferusage_speed, BUFFERASSIGN_SPEED,
540 BUFFERSPACE_SPEED);
541 connlimit_speed = multiply_div(bufferlimit_speed,
542 get_speed(&(cn->source.in.st), jiffies_tmp),
543 totalspeed);
544 spin_unlock_irqrestore(&st_lock, iflags);
546 bufferassigned_speed -= cn->source.in.buffer_speed;
547 if (((__u32) connlimit_speed) != connlimit_speed)
548 cn->source.in.buffer_speed = -1;
549 else
550 cn->source.in.buffer_speed = (__u32) connlimit_speed;
551 bufferassigned_speed += cn->source.in.buffer_speed;
553 refresh_bufferusage(cn);
555 window = __get_window(cn);
557 cn->source.in.window_seqnolimit_last = cn->source.in.next_seqno +
558 dec_log_64_11(window);
559 if (((__s32) (cn->source.in.window_seqnolimit_last -
560 cn->source.in.window_seqnolimit_max)) > 0)
561 cn->source.in.window_seqnolimit_max =
562 cn->source.in.window_seqnolimit_last;
564 out:
565 mutex_unlock(&(cn->rcv_lock));
567 return window;
570 __u8 get_window(struct conn *cn, struct neighbor *expectedsender)
572 struct conn *cn2;
573 int listlocked;
575 __u8 window;
577 mutex_lock(&bufferlimits_lock);
578 listlocked = mutex_trylock(&buffer_conn_list_lock);
580 window = _get_window(cn, expectedsender, listlocked);
582 if (listlocked) {
584 * refresh window of idle conns as well to keep global counters
585 * accurate
588 cn2 = container_of(buffer_conn_list.next, struct conn,
589 source.in.buffer_list);
591 if (list_empty(&buffer_conn_list) == 0 && cn2 != cn)
592 _get_window(cn2, 0, listlocked);
595 if (list_empty(&buffer_conn_tmp_list) == 0) {
596 cn2 = container_of(buffer_conn_tmp_list.next,
597 struct conn, source.in.buffer_list);
598 BUG_ON(cn2 == cn);
599 _get_window(cn2, 0, listlocked);
602 mutex_unlock(&buffer_conn_list_lock);
605 mutex_unlock(&bufferlimits_lock);
607 return window;
610 void reset_bufferusage(struct conn *cn)
612 int listlocked;
614 mutex_lock(&bufferlimits_lock);
615 listlocked = mutex_trylock(&buffer_conn_list_lock);
616 mutex_lock(&(cn->rcv_lock));
618 if (cn->sourcetype != SOURCE_IN)
619 goto out;
621 bufferusage_init -= cn->source.in.usage_init;
622 bufferusage_speed -= cn->source.in.usage_speed;
623 bufferusage_ata -= cn->source.in.usage_ata;
624 bufferusage_reserve -= cn->source.in.usage_reserve;
626 bufferassigned_init -= cn->source.in.buffer_init;
627 bufferassigned_speed -= cn->source.in.buffer_speed;
628 bufferassigned_ata -= cn->source.in.buffer_ata;
630 if (listlocked && (cn->source.in.buffer_list.next != 0 ||
631 cn->source.in.buffer_list.prev != 0)) {
632 list_del(&(cn->source.in.buffer_list));
633 cn->source.in.buffer_list.next = 0;
634 cn->source.in.buffer_list.prev = 0;
635 kref_put(&(cn->ref), free_conn);
638 out:
639 mutex_unlock(&(cn->rcv_lock));
640 if (listlocked)
641 mutex_unlock(&buffer_conn_list_lock);
642 mutex_unlock(&bufferlimits_lock);
645 void refresh_speedstat(struct conn *src_in_l, __u32 written)
647 unsigned long iflags;
648 unsigned long jiffies_tmp;
650 spin_lock_irqsave(&st_lock, iflags);
652 jiffies_tmp = jiffies;
654 if (src_in_l->source.in.st.jiffies_last_update != jiffies_tmp)
655 get_speed(&(src_in_l->source.in.st), jiffies_tmp);
656 if (src_in_l->source.in.st.bytes_curr + written < written)
657 src_in_l->source.in.st.bytes_curr = -1;
658 src_in_l->source.in.st.bytes_curr += written;
660 if (st.jiffies_last_update != jiffies_tmp)
661 get_speed(&st, jiffies_tmp);
662 if (st.bytes_curr + written < written)
663 st.bytes_curr = -1;
664 st.bytes_curr += written;
666 spin_unlock_irqrestore(&st_lock, iflags);
669 #warning todo overlapping seqno rcv
670 void drain_ooo_queue(struct conn *src_in_l)
672 struct sk_buff *skb;
674 BUG_ON(src_in_l->sourcetype != SOURCE_IN);
676 skb = src_in_l->source.in.reorder_queue.next;
678 while ((void *) skb != (void *) &(src_in_l->source.in.reorder_queue)) {
679 struct skb_procstate *ps = skb_pstate(skb);
680 int drop;
682 if (src_in_l->source.in.next_seqno != ps->funcstate.rcv2.seqno)
683 break;
685 #warning todo cont after drop == 1
686 drop = receive_skb(src_in_l, skb);
687 if (drop)
688 break;
690 skb_unlink(skb, &(src_in_l->source.in.reorder_queue));
691 src_in_l->source.in.ooo_packets--;
692 atomic_dec(&(src_in_l->source.in.nb->ooo_packets));
693 atomic_dec(&ooo_packets);
695 src_in_l->source.in.next_seqno += skb->len;
699 static int _conn_rcv_ooo(struct conn *src_in_l, struct sk_buff *skb)
701 struct skb_procstate *ps = skb_pstate(skb);
702 struct sk_buff_head *reorder_queue =
703 &(src_in_l->source.in.reorder_queue);
704 struct sk_buff *curr = reorder_queue->next;
706 long ooo;
708 #warning todo limit amount of data, not packet count
709 src_in_l->source.in.ooo_packets++;
710 if (src_in_l->source.in.ooo_packets > MAX_TOTAL_OOO_PER_CONN)
711 goto drop_ooo3;
713 ooo = atomic_inc_return(&(src_in_l->source.in.nb->ooo_packets));
714 if (ooo > MAX_TOTAL_OOO_PER_NEIGH)
715 goto drop_ooo2;
717 ooo = atomic_inc_return(&ooo_packets);
718 if (ooo > MAX_TOTAL_OOO_PACKETS)
719 goto drop_ooo1;
722 while (1) {
723 struct skb_procstate *ps2 = skb_pstate(curr);
725 if ((void *) curr == (void *) reorder_queue) {
726 skb_queue_tail(reorder_queue, skb);
727 break;
730 if (ps->funcstate.rcv2.seqno > ps2->funcstate.rcv2.seqno) {
731 skb_insert(curr, skb, reorder_queue);
732 break;
735 curr = curr->next;
738 if (0) {
739 drop_ooo1:
740 atomic_dec(&ooo_packets);
741 drop_ooo2:
742 atomic_dec(&(src_in_l->source.in.nb->ooo_packets));
743 drop_ooo3:
744 src_in_l->source.in.ooo_packets--;
746 return 1;
749 return 0;
752 #warning todo check if in window
753 static void _conn_rcv(struct conn *src_in, struct sk_buff *skb, __u32 conn_id)
755 struct skb_procstate *ps = skb_pstate(skb);
756 struct control_msg_out *cm = alloc_control_msg(src_in->source.in.nb,
757 ACM_PRIORITY_MED);
759 int in_order;
760 int drop = 1;
761 int flush = 0;
763 __u32 len = skb->len;
765 if (unlikely(cm == 0)) {
766 kfree_skb(skb);
767 return;
770 #warning todo check whether skb mac+iface correct
771 #warning todo receive even if no ack is possible and save source.in.next_seqno so that the ack can be sent later
773 mutex_lock(&(src_in->rcv_lock));
775 if (unlikely(unlikely(src_in->sourcetype != SOURCE_IN) ||
776 unlikely(src_in->source.in.conn_id != conn_id)))
777 goto out;
779 in_order = (src_in->source.in.next_seqno == ps->funcstate.rcv2.seqno);
780 if (in_order == 0) {
781 drop = _conn_rcv_ooo(src_in, skb);
782 } else {
783 src_in->source.in.next_seqno += skb->len;
784 drop = receive_skb(src_in, skb);
787 if (drop)
788 goto out;
790 #warning todo send_ack_conn uses the conn without source/target check
791 if (in_order == 0) {
792 send_ack_conn_ooo(cm, src_in,
793 src_in->reversedir->target.out.conn_id,
794 ps->funcstate.rcv2.seqno, len);
795 } else {
796 flush = 1;
797 drain_ooo_queue(src_in);
798 send_ack_conn(cm, src_in,
799 src_in->reversedir->target.out.conn_id,
800 src_in->source.in.next_seqno);
803 out:
804 mutex_unlock(&(src_in->rcv_lock));
806 if (unlikely(drop)) {
807 kfree_skb(skb);
808 free_control_msg(cm);
809 } else if (flush) {
810 flush_buf(src_in);
814 static void conn_rcv(struct sk_buff *skb, __u32 conn_id, __u32 seqno)
816 struct conn *cn_src_in;
817 struct skb_procstate *ps = skb_pstate(skb);
819 ps->funcstate.rcv2.seqno = seqno;
821 cn_src_in = get_conn(conn_id);
823 if (unlikely(cn_src_in == 0)) {
824 struct neighbor *nb = get_neigh_by_mac(skb);
826 printk(KERN_DEBUG "unknown conn_id when receiving: %d",
827 conn_id);
828 kfree_skb(skb);
829 if (likely(nb != 0)) {
830 send_connid_unknown(nb, conn_id);
831 kref_put(&(nb->ref), neighbor_free);
833 return;
835 _conn_rcv(cn_src_in, skb, conn_id);
836 kref_put(&(cn_src_in->ref), free_conn);
839 void conn_rcv_buildskb(char *data, __u32 datalen, __u32 conn_id, __u32 seqno)
841 struct sk_buff *skb = alloc_skb(datalen, GFP_KERNEL);
842 char *dst = skb_put(skb, datalen);
843 memcpy(dst, data, datalen);
844 conn_rcv(skb, conn_id, seqno);
847 static void rcv_data(struct sk_buff *skb)
849 __u32 conn_id;
850 __u32 seqno;
852 char *connid_p = cor_pull_skb(skb, 4);
853 char *seqno_p = cor_pull_skb(skb, 4);
855 /* __u8 rand; */
857 ((char *)&conn_id)[0] = connid_p[0];
858 ((char *)&conn_id)[1] = connid_p[1];
859 ((char *)&conn_id)[2] = connid_p[2];
860 ((char *)&conn_id)[3] = connid_p[3];
862 ((char *)&seqno)[0] = seqno_p[0];
863 ((char *)&seqno)[1] = seqno_p[1];
864 ((char *)&seqno)[2] = seqno_p[2];
865 ((char *)&seqno)[3] = seqno_p[3];
867 conn_id = be32_to_cpu(conn_id);
868 seqno = be32_to_cpu(seqno);
870 /* get_random_bytes(&rand, 1);
872 if (rand < 64) {
873 printk(KERN_ERR "drop %d %d %d %d %d", conn_id, seqno_p[0],
874 seqno_p[1], seqno_p[2], seqno_p[3]);
875 goto drop;
876 } */
878 if (conn_id == 0) {
879 struct neighbor *nb = get_neigh_by_mac(skb);
880 if (unlikely(nb == 0))
881 goto drop;
882 kernel_packet(nb, skb, seqno);
883 kref_put(&(nb->ref), neighbor_free);
884 } else {
885 conn_rcv(skb, conn_id, seqno);
888 if (0) {
889 drop:
890 kfree_skb(skb);
894 static void rcv(struct work_struct *work)
896 struct sk_buff *skb = skb_from_pstate(container_of(work,
897 struct skb_procstate, funcstate.rcv.work));
899 __u8 packet_type;
900 char *packet_type_p;
902 atomic_dec(&packets_in_workqueue);
904 packet_type_p = cor_pull_skb(skb, 1);
906 if (unlikely(packet_type_p == 0))
907 goto drop;
909 packet_type = *packet_type_p;
911 if (packet_type == PACKET_TYPE_ANNOUNCE) {
912 rcv_announce(skb);
913 return;
916 if (unlikely(packet_type != PACKET_TYPE_DATA))
917 goto drop;
919 rcv_data(skb);
921 if (0) {
922 drop:
923 kfree_skb(skb);
927 static int queue_rcv_processing(struct sk_buff *skb, struct net_device *dev,
928 struct packet_type *pt, struct net_device *orig_dev)
930 struct skb_procstate *ps = skb_pstate(skb);
931 long queuelen;
933 if (skb->pkt_type == PACKET_OTHERHOST)
934 goto drop;
936 BUG_ON(skb->next != 0);
938 queuelen = atomic_inc_return(&packets_in_workqueue);
940 BUG_ON(queuelen <= 0);
942 #warning todo limit per interface, inbound credits
943 if (queuelen > MAX_PACKETS_IN_RCVQUEUE) {
944 atomic_dec(&packets_in_workqueue);
945 goto drop;
948 INIT_WORK(&(ps->funcstate.rcv.work), rcv);
949 queue_work(packet_wq, &(ps->funcstate.rcv.work));
950 return NET_RX_SUCCESS;
952 drop:
953 kfree_skb(skb);
954 return NET_RX_DROP;
957 static struct packet_type ptype_cor = {
958 .type = htons(ETH_P_COR),
959 .dev = 0,
960 .func = queue_rcv_processing
963 int __init cor_rcv_init(void)
965 bufferassigned_init = 0;
966 bufferassigned_speed = 0;
967 bufferassigned_ata = 0;
969 bufferusage_init = 0;
970 bufferusage_speed = 0;
971 bufferusage_ata = 0;
972 bufferusage_reserve = 0;
974 memset(&st, 0, sizeof(struct speedtracker));
976 BUG_ON(sizeof(struct skb_procstate) > 48);
977 packet_wq = create_workqueue("cor_packet");
978 INIT_WORK(&outofbufferspace_work, outofbufferspace);
979 outofbufferspace_scheduled = 0;
981 dev_add_pack(&ptype_cor);
982 return 0;
985 MODULE_LICENSE("GPL");