2 * Connection oriented routing
3 * Copyright (C) 2007-2011 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <linux/module.h>
22 #include <linux/version.h>
23 #include <linux/kernel.h>
24 #include <linux/init.h>
30 static atomic_t packets_in_workqueue
= ATOMIC_INIT(0);
32 static atomic_t ooo_packets
= ATOMIC_INIT(0);
34 static struct workqueue_struct
*packet_wq
;
36 static struct work_struct outofbufferspace_work
;
37 DEFINE_SPINLOCK(oobss_lock
);
38 static int outofbufferspace_scheduled
;
41 * buffering space is divided in 4 areas:
44 * distributed equally among all conns; shrinks and grows immediately when there
45 * are new connections or connections are reset
48 * distributed proportional to speed; shrinks and grows constantly
51 * used to make noise to make traffic analysis harder; may only shrink if data
52 * is either send on or "stuck" for a long time
55 * reserve in case where sudden shrinking causes some connections to contain
56 * more old data than allowed by the first 3 buffers. If this area is full, the
57 * out-of-memory conn-resetter is triggered
59 * Each area commits a certain amount of space to each connection. This is the
60 * maximum buffer space a connection is allowed to use. The space of a specific
61 * connection is first accounted to buffer_ata. If the buffer space allowed to
62 * use is exceeded, the rest is accounted to buffer_speed and then buffer_init.
63 * The reserve area will be used last. This should only be the case, if the
64 * assigned buffer space of the first 3 areas shrink suddenly. If this area is
65 * also used up, connections will be reset.
68 DEFINE_MUTEX(buffer_conn_list_lock
);
69 LIST_HEAD(buffer_conn_list
);
72 * used to buffer inserts when main list is locked, moved to main list, after
73 * processing of main list finishes
75 LIST_HEAD(buffer_conn_tmp_list
); /* protected by bufferlimits_lock */
78 DEFINE_MUTEX(bufferlimits_lock
);
80 static __u64 bufferassigned_init
;
81 static __u64 bufferassigned_speed
;
82 static __u64 bufferassigned_ata
;
84 static __u64 bufferusage_init
;
85 static __u64 bufferusage_speed
;
86 static __u64 bufferusage_ata
;
87 static __u64 bufferusage_reserve
;
89 DEFINE_SPINLOCK(st_lock
);
90 static struct speedtracker st
;
92 static __u64
desired_bufferusage(__u64 assigned
, __u64 usage
, __u64 assignlimit
,
98 if (unlikely(assignlimit
< usagelimit
))
99 assignlimit
= usagelimit
;
101 if (multiply_div(usage
, 9, 10) > usagelimit
)
102 return multiply_div(usagelimit
, 9, 10);
104 load
= multiply_div(usage
, 192, usagelimit
);
106 /* slow limit increase, fast decrease */
109 } else if (load
< 128) {
111 return multiply_div(usagelimit
, 9, 10);
115 ret
= multiply_div(assigned
, 128, load
);
117 ret
= multiply_div(assigned
, 128, load
+ load
- 128);
120 if (ret
> assignlimit
)
125 #warning todo changing speed too fast
126 static __u64
get_speed(struct speedtracker
*st
, unsigned long jiffies_tmp
)
128 if (unlikely(time_after(st
->jiffies_last_update
, jiffies_tmp
) ||
129 time_before(st
->jiffies_last_update
+ HZ
*10,
131 st
->jiffies_last_update
= jiffies_tmp
;
137 for (;time_before(st
->jiffies_last_update
, jiffies_tmp
);
138 st
->jiffies_last_update
++) {
139 __u32 bytes_curr
= 0;
140 if ((st
->jiffies_last_update
+ 1) == jiffies_tmp
) {
141 bytes_curr
= st
->bytes_curr
;
144 st
->speed
= (st
->speed
* (HZ
-1) + (((__u64
)bytes_curr
)<<16))/HZ
;
147 if ((st
->jiffies_last_update
+ 1) == jiffies_tmp
) {
148 st
->jiffies_last_update
++;
151 return (st
->speed
* (HZ
*2 - 1) + (((__u64
)st
->bytes_curr
) << 17))/HZ
/2;
155 * val1[0], val2[2], res[0] ... least significant
156 * val1[val1len-1], val2[val2len-1], res[reslen-1] ... most significant
158 static void mul(__u32
*val1
, unsigned int val1len
, __u32
*val2
,
159 unsigned int val2len
, __u32
*res
, int reslen
)
161 int digits
= val1len
+ val2len
;
165 BUG_ON(val1len
> 0 && val2len
> 0 && reslen
< digits
);
167 memset(res
, 0, reslen
);
169 if (val1len
== 0 || val2len
== 0)
172 for(i
=0;i
<digits
;i
++) {
174 res
[i
] = (__u32
) overflow
;
175 overflow
= overflow
>> 32;
176 for(idx1
=0;idx1
<val1len
&& idx1
<=i
;idx1
++) {
183 tmpres
= ((__u64
) (val1
[idx1
])) *
184 ((__u64
) (val2
[idx2
]));
185 overflow
+= tmpres
>> 32;
186 tmpres
= (tmpres
<< 32) >> 32;
187 if (res
[i
] + tmpres
< res
[i
])
192 BUG_ON(overflow
!= 0);
197 * 1 == usage1/speed1 offends more
198 * 0 == both offend the same
199 * -1 == usage2/speed2 offends more
201 static int compare_scores(__u32 usage1
, __u64 speed1
, __u32 usage2
,
206 __u32 speed1squared
[4];
207 __u32 speed2squared
[4];
209 __u32 speed1squared_usage2
[5];
210 __u32 speed2squared_usage1
[5];
216 speed1_tmp
[0] = (speed1
<< 32) >> 32;
217 speed1_tmp
[1] = (speed1
>> 32);
218 speed2_tmp
[0] = (speed2
<< 32) >> 32;
219 speed2_tmp
[1] = (speed2
<< 32);
221 mul(speed1_tmp
, 2, speed1_tmp
, 2, speed1squared
,4);
222 mul(speed2_tmp
, 2, speed2_tmp
, 2, speed2squared
, 4);
224 mul(speed1squared
, 4, &usage2
, 1, speed1squared_usage2
, 5);
225 mul(speed2squared
, 4, &usage2
, 1, speed2squared_usage1
, 5);
228 if (speed1squared_usage2
[i
] > speed2squared_usage1
[i
])
230 if (speed1squared_usage2
[i
] < speed2squared_usage1
[i
])
238 static void _outofbufferspace(void)
242 struct list_head
*curr
;
243 struct conn
*offendingconns
[OOBS_SIZE
];
244 __u32 offendingusage
[OOBS_SIZE
];
245 __u64 offendingspeed
[OOBS_SIZE
];
247 memset(&offendingconns
, 0, sizeof(offendingconns
));
249 mutex_lock(&buffer_conn_list_lock
);
251 curr
= buffer_conn_list
.next
;
252 while (curr
!= &buffer_conn_list
) {
253 unsigned long iflags
;
255 struct conn
*src_in_o
= container_of(curr
, struct conn
,
256 source
.in
.buffer_list
);
265 mutex_lock(&(src_in_o
->rcv_lock
));
267 BUG_ON(src_in_o
->sourcetype
!= SOURCE_IN
);
269 usage
= src_in_o
->source
.in
.usage_reserve
;
271 spin_lock_irqsave(&st_lock
, iflags
);
272 speed
= get_speed(&(src_in_o
->source
.in
.st
), jiffies
);
273 spin_unlock_irqrestore(&st_lock
, iflags
);
275 mutex_unlock(&(src_in_o
->rcv_lock
));
277 if (offendingconns
[OOBS_SIZE
-1] != 0 &&
279 offendingusage
[OOBS_SIZE
-1],
280 offendingspeed
[OOBS_SIZE
-1],
284 if (offendingconns
[OOBS_SIZE
-1] != 0)
285 kref_put(&(offendingconns
[OOBS_SIZE
-1]->ref
),
287 kref_get(&(src_in_o
->ref
));
288 offendingconns
[OOBS_SIZE
-1] = src_in_o
;
289 offendingusage
[OOBS_SIZE
-1] = usage
;
290 offendingspeed
[OOBS_SIZE
-1] = speed
;
292 for (i
=OOBS_SIZE
-2;i
>=0;i
++) {
293 struct conn
*tmpconn
;
297 if (offendingconns
[i
] != 0 && compare_scores(
298 offendingusage
[i
], offendingspeed
[i
],
300 offendingspeed
[i
+1]) >= 0)
303 tmpconn
= offendingconns
[i
];
304 usage_tmp
= offendingusage
[i
];
305 speed_tmp
= offendingspeed
[i
];
307 offendingconns
[i
] = offendingconns
[i
+1];
308 offendingusage
[i
] = offendingusage
[i
+1];
309 offendingspeed
[i
] = offendingspeed
[i
+1];
311 offendingconns
[i
+1] = tmpconn
;
312 offendingusage
[i
+1] = usage_tmp
;
313 offendingspeed
[i
+1] = speed_tmp
;
317 for (i
=0;i
<OOBS_SIZE
;i
++) {
318 kref_get(&(offendingconns
[i
]->ref
));
321 mutex_unlock(&buffer_conn_list_lock
);
323 for (i
=0;i
<OOBS_SIZE
;i
++) {
326 if (offendingconns
[i
] == 0)
329 mutex_lock(&bufferlimits_lock
);
330 resetneeded
= ((bufferusage_reserve
*4)/3 > BUFFERSPACE_RESERVE
);
331 mutex_unlock(&bufferlimits_lock
);
334 reset_conn(offendingconns
[i
]);
335 kref_put(&(offendingconns
[i
]->ref
), free_conn
);
338 mutex_lock(&bufferlimits_lock
);
339 mutex_lock(&buffer_conn_list_lock
);
340 while(list_empty(&buffer_conn_tmp_list
) == 0) {
341 curr
= buffer_conn_tmp_list
.next
;
343 list_add(curr
, &buffer_conn_list
);
345 mutex_unlock(&buffer_conn_list_lock
);
346 mutex_unlock(&bufferlimits_lock
);
349 static void outofbufferspace(struct work_struct
*work
)
352 unsigned long iflags
;
355 mutex_lock(&bufferlimits_lock
);
356 spin_lock_irqsave(&oobss_lock
, iflags
);
357 resetneeded
= (bufferusage_reserve
> BUFFERSPACE_RESERVE
);
359 if (resetneeded
== 0)
360 outofbufferspace_scheduled
= 0;
362 spin_unlock_irqrestore(&oobss_lock
, iflags
);
363 mutex_unlock(&bufferlimits_lock
);
365 if (resetneeded
== 0)
372 static void refresh_bufferusage(struct conn
*src_in_l
)
374 BUG_ON(src_in_l
->sourcetype
!= SOURCE_IN
);
376 bufferusage_init
-= src_in_l
->source
.in
.usage_init
;
377 bufferusage_speed
-= src_in_l
->source
.in
.usage_speed
;
378 bufferusage_ata
-= src_in_l
->source
.in
.usage_ata
;
379 bufferusage_reserve
-= src_in_l
->source
.in
.usage_reserve
;
381 src_in_l
->source
.in
.usage_ata
= src_in_l
->data_buf
.totalsize
;
382 if (src_in_l
->source
.in
.usage_ata
> src_in_l
->source
.in
.buffer_ata
)
383 src_in_l
->source
.in
.usage_ata
= src_in_l
->source
.in
.buffer_ata
;
386 if (src_in_l
->source
.in
.usage_ata
== src_in_l
->data_buf
.totalsize
)
387 src_in_l
->source
.in
.usage_speed
= 0;
389 src_in_l
->source
.in
.usage_speed
= src_in_l
->data_buf
.totalsize
-
390 src_in_l
->source
.in
.usage_ata
;
392 if (src_in_l
->source
.in
.usage_speed
> src_in_l
->source
.in
.buffer_speed
)
393 src_in_l
->source
.in
.usage_speed
=
394 src_in_l
->source
.in
.buffer_speed
;
397 if ((src_in_l
->source
.in
.usage_ata
+ src_in_l
->source
.in
.usage_speed
) ==
398 src_in_l
->data_buf
.totalsize
)
399 src_in_l
->source
.in
.usage_init
= 0;
401 src_in_l
->source
.in
.usage_init
= src_in_l
->data_buf
.totalsize
-
402 src_in_l
->source
.in
.usage_ata
-
403 src_in_l
->source
.in
.usage_speed
;
405 if (src_in_l
->source
.in
.usage_init
> src_in_l
->source
.in
.buffer_init
)
406 src_in_l
->source
.in
.usage_init
=
407 src_in_l
->source
.in
.buffer_init
;
410 if ((src_in_l
->source
.in
.usage_ata
+ src_in_l
->source
.in
.usage_speed
+
411 src_in_l
->source
.in
.usage_init
) ==
412 src_in_l
->data_buf
.totalsize
)
413 src_in_l
->source
.in
.usage_reserve
= 0;
415 src_in_l
->source
.in
.usage_reserve
=
416 src_in_l
->data_buf
.totalsize
-
417 src_in_l
->source
.in
.usage_ata
-
418 src_in_l
->source
.in
.usage_speed
-
419 src_in_l
->source
.in
.usage_init
;
421 bufferusage_init
+= src_in_l
->source
.in
.usage_init
;
422 bufferusage_speed
+= src_in_l
->source
.in
.usage_speed
;
423 bufferusage_ata
+= src_in_l
->source
.in
.usage_ata
;
424 bufferusage_reserve
+= src_in_l
->source
.in
.usage_reserve
;
426 if (bufferusage_reserve
> BUFFERSPACE_RESERVE
) {
427 unsigned long iflags
;
428 spin_lock_irqsave(&oobss_lock
, iflags
);
429 if (outofbufferspace_scheduled
== 0) {
430 schedule_work(&outofbufferspace_work
);
431 outofbufferspace_scheduled
= 1;
434 spin_unlock_irqrestore(&oobss_lock
, iflags
);
438 static __u8
__get_window(struct conn
*src_in_l
)
442 BUG_ON(src_in_l
->sourcetype
!= SOURCE_IN
);
444 if (src_in_l
->source
.in
.usage_reserve
!= 0)
447 BUG_ON(src_in_l
->source
.in
.usage_init
>
448 src_in_l
->source
.in
.buffer_init
);
449 BUG_ON(src_in_l
->source
.in
.usage_speed
>
450 src_in_l
->source
.in
.buffer_speed
);
451 BUG_ON(src_in_l
->source
.in
.usage_ata
> src_in_l
->source
.in
.buffer_ata
);
453 window
+= src_in_l
->source
.in
.buffer_init
;
454 window
+= src_in_l
->source
.in
.buffer_speed
;
455 window
+= src_in_l
->source
.in
.buffer_ata
;
457 window
-= src_in_l
->source
.in
.usage_init
;
458 window
-= src_in_l
->source
.in
.usage_speed
;
459 window
-= src_in_l
->source
.in
.usage_ata
;
461 if (window
> MAX_ANNOUNCE_WINDOW
)
462 window
= MAX_ANNOUNCE_WINDOW
;
464 return enc_log_64_11(window
);
467 #warning todo upper buffer limits
468 static __u8
_get_window(struct conn
*cn
, struct neighbor
*expectedsender
,
471 unsigned long iflags
;
473 unsigned long jiffies_tmp
;
478 __u64 bufferlimit_init
;
479 __u64 connlimit_init
;
482 __u64 bufferlimit_speed
;
483 __u64 connlimit_speed
;
485 mutex_lock(&(cn
->rcv_lock
));
487 BUG_ON(expectedsender
== 0 && cn
->sourcetype
!= SOURCE_IN
);
489 if (unlikely(unlikely(cn
->sourcetype
!= SOURCE_IN
) ||
490 unlikely(expectedsender
!= 0 && cn
->source
.in
.nb
!=
494 if (unlikely(atomic_read(&(cn
->isreset
)) != 0)) {
495 if (listlocked
&& (cn
->source
.in
.buffer_list
.next
!= 0 ||
496 cn
->source
.in
.buffer_list
.prev
!= 0)) {
497 list_del(&(cn
->source
.in
.buffer_list
));
498 cn
->source
.in
.buffer_list
.next
= 0;
499 cn
->source
.in
.buffer_list
.prev
= 0;
500 kref_put(&(cn
->ref
), free_conn
);
506 if (cn
->source
.in
.buffer_list
.next
!= 0 ||
507 cn
->source
.in
.buffer_list
.prev
!= 0) {
508 list_del(&(cn
->source
.in
.buffer_list
));
510 kref_get(&(cn
->ref
));
512 list_add_tail(&(cn
->source
.in
.buffer_list
),
514 } else if (cn
->source
.in
.buffer_list
.next
== 0 &&
515 cn
->source
.in
.buffer_list
.prev
== 0) {
516 kref_get(&(cn
->ref
));
517 list_add_tail(&(cn
->source
.in
.buffer_list
),
518 &buffer_conn_tmp_list
);
522 conns
= atomic_read(&num_conns
);
524 bufferlimit_init
= desired_bufferusage(bufferassigned_init
,
525 bufferusage_init
, BUFFERASSIGN_INIT
, BUFFERSPACE_INIT
);
526 connlimit_init
= (bufferlimit_init
+ conns
- 1) / conns
;
528 bufferassigned_init
-= cn
->source
.in
.buffer_init
;
529 if (((__u32
) connlimit_init
) != connlimit_init
)
530 cn
->source
.in
.buffer_init
= -1;
532 cn
->source
.in
.buffer_init
= (__u32
) connlimit_init
;
533 bufferassigned_init
+= cn
->source
.in
.buffer_init
;
535 spin_lock_irqsave(&st_lock
, iflags
);
536 jiffies_tmp
= jiffies
;
537 totalspeed
= get_speed(&st
, jiffies_tmp
);
538 bufferlimit_speed
= desired_bufferusage(bufferassigned_speed
,
539 bufferusage_speed
, BUFFERASSIGN_SPEED
,
541 connlimit_speed
= multiply_div(bufferlimit_speed
,
542 get_speed(&(cn
->source
.in
.st
), jiffies_tmp
),
544 spin_unlock_irqrestore(&st_lock
, iflags
);
546 bufferassigned_speed
-= cn
->source
.in
.buffer_speed
;
547 if (((__u32
) connlimit_speed
) != connlimit_speed
)
548 cn
->source
.in
.buffer_speed
= -1;
550 cn
->source
.in
.buffer_speed
= (__u32
) connlimit_speed
;
551 bufferassigned_speed
+= cn
->source
.in
.buffer_speed
;
553 refresh_bufferusage(cn
);
555 window
= __get_window(cn
);
557 cn
->source
.in
.window_seqnolimit_last
= cn
->source
.in
.next_seqno
+
558 dec_log_64_11(window
);
559 if (((__s32
) (cn
->source
.in
.window_seqnolimit_last
-
560 cn
->source
.in
.window_seqnolimit_max
)) > 0)
561 cn
->source
.in
.window_seqnolimit_max
=
562 cn
->source
.in
.window_seqnolimit_last
;
565 mutex_unlock(&(cn
->rcv_lock
));
570 __u8
get_window(struct conn
*cn
, struct neighbor
*expectedsender
)
577 mutex_lock(&bufferlimits_lock
);
578 listlocked
= mutex_trylock(&buffer_conn_list_lock
);
580 window
= _get_window(cn
, expectedsender
, listlocked
);
584 * refresh window of idle conns as well to keep global counters
588 cn2
= container_of(buffer_conn_list
.next
, struct conn
,
589 source
.in
.buffer_list
);
591 if (list_empty(&buffer_conn_list
) == 0 && cn2
!= cn
)
592 _get_window(cn2
, 0, listlocked
);
595 if (list_empty(&buffer_conn_tmp_list
) == 0) {
596 cn2
= container_of(buffer_conn_tmp_list
.next
,
597 struct conn
, source
.in
.buffer_list
);
599 _get_window(cn2
, 0, listlocked
);
602 mutex_unlock(&buffer_conn_list_lock
);
605 mutex_unlock(&bufferlimits_lock
);
610 void reset_bufferusage(struct conn
*cn
)
614 mutex_lock(&bufferlimits_lock
);
615 listlocked
= mutex_trylock(&buffer_conn_list_lock
);
616 mutex_lock(&(cn
->rcv_lock
));
618 if (cn
->sourcetype
!= SOURCE_IN
)
621 bufferusage_init
-= cn
->source
.in
.usage_init
;
622 bufferusage_speed
-= cn
->source
.in
.usage_speed
;
623 bufferusage_ata
-= cn
->source
.in
.usage_ata
;
624 bufferusage_reserve
-= cn
->source
.in
.usage_reserve
;
626 bufferassigned_init
-= cn
->source
.in
.buffer_init
;
627 bufferassigned_speed
-= cn
->source
.in
.buffer_speed
;
628 bufferassigned_ata
-= cn
->source
.in
.buffer_ata
;
630 if (listlocked
&& (cn
->source
.in
.buffer_list
.next
!= 0 ||
631 cn
->source
.in
.buffer_list
.prev
!= 0)) {
632 list_del(&(cn
->source
.in
.buffer_list
));
633 cn
->source
.in
.buffer_list
.next
= 0;
634 cn
->source
.in
.buffer_list
.prev
= 0;
635 kref_put(&(cn
->ref
), free_conn
);
639 mutex_unlock(&(cn
->rcv_lock
));
641 mutex_unlock(&buffer_conn_list_lock
);
642 mutex_unlock(&bufferlimits_lock
);
645 void refresh_speedstat(struct conn
*src_in_l
, __u32 written
)
647 unsigned long iflags
;
648 unsigned long jiffies_tmp
;
650 spin_lock_irqsave(&st_lock
, iflags
);
652 jiffies_tmp
= jiffies
;
654 if (src_in_l
->source
.in
.st
.jiffies_last_update
!= jiffies_tmp
)
655 get_speed(&(src_in_l
->source
.in
.st
), jiffies_tmp
);
656 if (src_in_l
->source
.in
.st
.bytes_curr
+ written
< written
)
657 src_in_l
->source
.in
.st
.bytes_curr
= -1;
658 src_in_l
->source
.in
.st
.bytes_curr
+= written
;
660 if (st
.jiffies_last_update
!= jiffies_tmp
)
661 get_speed(&st
, jiffies_tmp
);
662 if (st
.bytes_curr
+ written
< written
)
664 st
.bytes_curr
+= written
;
666 spin_unlock_irqrestore(&st_lock
, iflags
);
669 #warning todo overlapping seqno rcv
670 void drain_ooo_queue(struct conn
*src_in_l
)
674 BUG_ON(src_in_l
->sourcetype
!= SOURCE_IN
);
676 skb
= src_in_l
->source
.in
.reorder_queue
.next
;
678 while ((void *) skb
!= (void *) &(src_in_l
->source
.in
.reorder_queue
)) {
679 struct skb_procstate
*ps
= skb_pstate(skb
);
682 if (src_in_l
->source
.in
.next_seqno
!= ps
->funcstate
.rcv2
.seqno
)
685 #warning todo cont after drop == 1
686 drop
= receive_skb(src_in_l
, skb
);
690 skb_unlink(skb
, &(src_in_l
->source
.in
.reorder_queue
));
691 src_in_l
->source
.in
.ooo_packets
--;
692 atomic_dec(&(src_in_l
->source
.in
.nb
->ooo_packets
));
693 atomic_dec(&ooo_packets
);
695 src_in_l
->source
.in
.next_seqno
+= skb
->len
;
699 static int _conn_rcv_ooo(struct conn
*src_in_l
, struct sk_buff
*skb
)
701 struct skb_procstate
*ps
= skb_pstate(skb
);
702 struct sk_buff_head
*reorder_queue
=
703 &(src_in_l
->source
.in
.reorder_queue
);
704 struct sk_buff
*curr
= reorder_queue
->next
;
708 #warning todo limit amount of data, not packet count
709 src_in_l
->source
.in
.ooo_packets
++;
710 if (src_in_l
->source
.in
.ooo_packets
> MAX_TOTAL_OOO_PER_CONN
)
713 ooo
= atomic_inc_return(&(src_in_l
->source
.in
.nb
->ooo_packets
));
714 if (ooo
> MAX_TOTAL_OOO_PER_NEIGH
)
717 ooo
= atomic_inc_return(&ooo_packets
);
718 if (ooo
> MAX_TOTAL_OOO_PACKETS
)
723 struct skb_procstate
*ps2
= skb_pstate(curr
);
725 if ((void *) curr
== (void *) reorder_queue
) {
726 skb_queue_tail(reorder_queue
, skb
);
730 if (ps
->funcstate
.rcv2
.seqno
> ps2
->funcstate
.rcv2
.seqno
) {
731 skb_insert(curr
, skb
, reorder_queue
);
740 atomic_dec(&ooo_packets
);
742 atomic_dec(&(src_in_l
->source
.in
.nb
->ooo_packets
));
744 src_in_l
->source
.in
.ooo_packets
--;
752 #warning todo check if in window
753 static void _conn_rcv(struct conn
*src_in
, struct sk_buff
*skb
, __u32 conn_id
)
755 struct skb_procstate
*ps
= skb_pstate(skb
);
756 struct control_msg_out
*cm
= alloc_control_msg(src_in
->source
.in
.nb
,
763 __u32 len
= skb
->len
;
765 if (unlikely(cm
== 0)) {
770 #warning todo check whether skb mac+iface correct
771 #warning todo receive even if no ack is possible and save source.in.next_seqno so that the ack can be sent later
773 mutex_lock(&(src_in
->rcv_lock
));
775 if (unlikely(unlikely(src_in
->sourcetype
!= SOURCE_IN
) ||
776 unlikely(src_in
->source
.in
.conn_id
!= conn_id
)))
779 in_order
= (src_in
->source
.in
.next_seqno
== ps
->funcstate
.rcv2
.seqno
);
781 drop
= _conn_rcv_ooo(src_in
, skb
);
783 src_in
->source
.in
.next_seqno
+= skb
->len
;
784 drop
= receive_skb(src_in
, skb
);
790 #warning todo send_ack_conn uses the conn without source/target check
792 send_ack_conn_ooo(cm
, src_in
,
793 src_in
->reversedir
->target
.out
.conn_id
,
794 ps
->funcstate
.rcv2
.seqno
, len
);
797 drain_ooo_queue(src_in
);
798 send_ack_conn(cm
, src_in
,
799 src_in
->reversedir
->target
.out
.conn_id
,
800 src_in
->source
.in
.next_seqno
);
804 mutex_unlock(&(src_in
->rcv_lock
));
806 if (unlikely(drop
)) {
808 free_control_msg(cm
);
814 static void conn_rcv(struct sk_buff
*skb
, __u32 conn_id
, __u32 seqno
)
816 struct conn
*cn_src_in
;
817 struct skb_procstate
*ps
= skb_pstate(skb
);
819 ps
->funcstate
.rcv2
.seqno
= seqno
;
821 cn_src_in
= get_conn(conn_id
);
823 if (unlikely(cn_src_in
== 0)) {
824 struct neighbor
*nb
= get_neigh_by_mac(skb
);
826 printk(KERN_DEBUG
"unknown conn_id when receiving: %d",
829 if (likely(nb
!= 0)) {
830 send_connid_unknown(nb
, conn_id
);
831 kref_put(&(nb
->ref
), neighbor_free
);
835 _conn_rcv(cn_src_in
, skb
, conn_id
);
836 kref_put(&(cn_src_in
->ref
), free_conn
);
839 void conn_rcv_buildskb(char *data
, __u32 datalen
, __u32 conn_id
, __u32 seqno
)
841 struct sk_buff
*skb
= alloc_skb(datalen
, GFP_KERNEL
);
842 char *dst
= skb_put(skb
, datalen
);
843 memcpy(dst
, data
, datalen
);
844 conn_rcv(skb
, conn_id
, seqno
);
847 static void rcv_data(struct sk_buff
*skb
)
852 char *connid_p
= cor_pull_skb(skb
, 4);
853 char *seqno_p
= cor_pull_skb(skb
, 4);
857 ((char *)&conn_id
)[0] = connid_p
[0];
858 ((char *)&conn_id
)[1] = connid_p
[1];
859 ((char *)&conn_id
)[2] = connid_p
[2];
860 ((char *)&conn_id
)[3] = connid_p
[3];
862 ((char *)&seqno
)[0] = seqno_p
[0];
863 ((char *)&seqno
)[1] = seqno_p
[1];
864 ((char *)&seqno
)[2] = seqno_p
[2];
865 ((char *)&seqno
)[3] = seqno_p
[3];
867 conn_id
= be32_to_cpu(conn_id
);
868 seqno
= be32_to_cpu(seqno
);
870 /* get_random_bytes(&rand, 1);
873 printk(KERN_ERR "drop %d %d %d %d %d", conn_id, seqno_p[0],
874 seqno_p[1], seqno_p[2], seqno_p[3]);
879 struct neighbor
*nb
= get_neigh_by_mac(skb
);
880 if (unlikely(nb
== 0))
882 kernel_packet(nb
, skb
, seqno
);
883 kref_put(&(nb
->ref
), neighbor_free
);
885 conn_rcv(skb
, conn_id
, seqno
);
894 static void rcv(struct work_struct
*work
)
896 struct sk_buff
*skb
= skb_from_pstate(container_of(work
,
897 struct skb_procstate
, funcstate
.rcv
.work
));
902 atomic_dec(&packets_in_workqueue
);
904 packet_type_p
= cor_pull_skb(skb
, 1);
906 if (unlikely(packet_type_p
== 0))
909 packet_type
= *packet_type_p
;
911 if (packet_type
== PACKET_TYPE_ANNOUNCE
) {
916 if (unlikely(packet_type
!= PACKET_TYPE_DATA
))
927 static int queue_rcv_processing(struct sk_buff
*skb
, struct net_device
*dev
,
928 struct packet_type
*pt
, struct net_device
*orig_dev
)
930 struct skb_procstate
*ps
= skb_pstate(skb
);
933 if (skb
->pkt_type
== PACKET_OTHERHOST
)
936 BUG_ON(skb
->next
!= 0);
938 queuelen
= atomic_inc_return(&packets_in_workqueue
);
940 BUG_ON(queuelen
<= 0);
942 #warning todo limit per interface, inbound credits
943 if (queuelen
> MAX_PACKETS_IN_RCVQUEUE
) {
944 atomic_dec(&packets_in_workqueue
);
948 INIT_WORK(&(ps
->funcstate
.rcv
.work
), rcv
);
949 queue_work(packet_wq
, &(ps
->funcstate
.rcv
.work
));
950 return NET_RX_SUCCESS
;
957 static struct packet_type ptype_cor
= {
958 .type
= htons(ETH_P_COR
),
960 .func
= queue_rcv_processing
963 int __init
cor_rcv_init(void)
965 bufferassigned_init
= 0;
966 bufferassigned_speed
= 0;
967 bufferassigned_ata
= 0;
969 bufferusage_init
= 0;
970 bufferusage_speed
= 0;
972 bufferusage_reserve
= 0;
974 memset(&st
, 0, sizeof(struct speedtracker
));
976 BUG_ON(sizeof(struct skb_procstate
) > 48);
977 packet_wq
= create_workqueue("cor_packet");
978 INIT_WORK(&outofbufferspace_work
, outofbufferspace
);
979 outofbufferspace_scheduled
= 0;
981 dev_add_pack(&ptype_cor
);
985 MODULE_LICENSE("GPL");