2 * Connection oriented routing
3 * Copyright (C) 2007-2011 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <linux/module.h>
22 #include <linux/version.h>
23 #include <linux/kernel.h>
24 #include <linux/init.h>
30 static atomic_t packets_in_workqueue
= ATOMIC_INIT(0);
32 static atomic_t ooo_packets
= ATOMIC_INIT(0);
34 static struct workqueue_struct
*packet_wq
;
36 static struct work_struct outofbufferspace_work
;
37 DEFINE_SPINLOCK(oobss_lock
);
38 static int outofbufferspace_scheduled
;
41 * buffering space is divided in 4 areas:
44 * distributed equally among all conns; shrinks and grows immediately when there
45 * are new connections or connections are reset
48 * distributed proportional to speed; shrinks and grows constantly
51 * used to make noise to make traffic analysis harder; may only shrink if data
52 * is either send on or "stuck" for a long time
55 * reserve in case where sudden shrinking causes some connections to contain
56 * more old data than allowed by the first 3 buffers. If this area is full, the
57 * out-of-memory conn-resetter is triggered
59 * Each area commits a certain amount of space to each connection. This is the
60 * maximum buffer space a connection is allowed to use. The space of a specific
61 * connection is first accounted to buffer_ata. If the buffer space allowed to
62 * use is exceeded, the rest is accounted to buffer_speed and then buffer_init.
63 * The reserve area will be used last. This should only be the case, if the
64 * assigned buffer space of the first 3 areas shrink suddenly. If this area is
65 * also used up, connections will be reset.
68 DEFINE_MUTEX(buffer_conn_list_lock
);
69 LIST_HEAD(buffer_conn_list
);
72 * used to buffer inserts when main list is locked, moved to main list, after
73 * processing of main list finishes
75 LIST_HEAD(buffer_conn_tmp_list
); /* protected by bufferlimits_lock */
78 DEFINE_MUTEX(bufferlimits_lock
);
80 static __u64 bufferassigned_init
;
81 static __u64 bufferassigned_speed
;
82 static __u64 bufferassigned_ata
;
84 static __u64 bufferusage_init
;
85 static __u64 bufferusage_speed
;
86 static __u64 bufferusage_ata
;
87 static __u64 bufferusage_reserve
;
89 DEFINE_SPINLOCK(st_lock
);
90 static struct speedtracker st
;
92 static __u64
desired_bufferusage(__u64 assigned
, __u64 usage
, __u64 assignlimit
,
98 if (unlikely(assignlimit
< usagelimit
))
99 assignlimit
= usagelimit
;
101 if (multiply_div(usage
, 9, 10) > usagelimit
)
102 return multiply_div(usagelimit
, 9, 10);
104 load
= multiply_div(usage
, 192, usagelimit
);
106 /* slow limit increase, fast decrease */
109 } else if (load
< 128) {
111 return multiply_div(usagelimit
, 9, 10);
115 ret
= multiply_div(assigned
, 128, load
);
117 ret
= multiply_div(assigned
, 128, load
+ load
- 128);
120 if (ret
> assignlimit
)
125 /* return 65536 == 1byte/HZ */
126 static __u64
get_speed(struct speedtracker
*st
, unsigned long jiffies_tmp
)
128 if (unlikely(time_after(st
->jiffies_last_update
, jiffies_tmp
) ||
129 time_before(st
->jiffies_last_update
+ HZ
*10,
131 st
->jiffies_last_update
= jiffies_tmp
;
137 for (;time_before(st
->jiffies_last_update
, jiffies_tmp
);
138 st
->jiffies_last_update
++) {
139 st
->speed
= ( st
->speed
* (HZ
*9-1) +
140 (((__u64
)st
->bytes_curr
)<<16) ) /
145 return ( st
->speed
* (HZ
*9 - 1) + (((__u64
)st
->bytes_curr
) << 17) ) /
150 * val1[0], val2[2], res[0] ... least significant
151 * val1[val1len-1], val2[val2len-1], res[reslen-1] ... most significant
153 static void mul(__u32
*val1
, unsigned int val1len
, __u32
*val2
,
154 unsigned int val2len
, __u32
*res
, int reslen
)
156 int digits
= val1len
+ val2len
;
160 BUG_ON(val1len
> 0 && val2len
> 0 && reslen
< digits
);
162 memset(res
, 0, reslen
);
164 if (val1len
== 0 || val2len
== 0)
167 for(i
=0;i
<digits
;i
++) {
169 res
[i
] = (__u32
) overflow
;
170 overflow
= overflow
>> 32;
171 for(idx1
=0;idx1
<val1len
&& idx1
<=i
;idx1
++) {
178 tmpres
= ((__u64
) (val1
[idx1
])) *
179 ((__u64
) (val2
[idx2
]));
180 overflow
+= tmpres
>> 32;
181 tmpres
= (tmpres
<< 32) >> 32;
182 if (res
[i
] + tmpres
< res
[i
])
187 BUG_ON(overflow
!= 0);
192 * 1 == usage1/speed1 offends more
193 * 0 == both offend the same
194 * -1 == usage2/speed2 offends more
196 static int compare_scores(__u32 usage1
, __u64 speed1
, __u32 usage2
,
201 __u32 speed1squared
[4];
202 __u32 speed2squared
[4];
204 __u32 speed1squared_usage2
[5];
205 __u32 speed2squared_usage1
[5];
211 speed1_tmp
[0] = (speed1
<< 32) >> 32;
212 speed1_tmp
[1] = (speed1
>> 32);
213 speed2_tmp
[0] = (speed2
<< 32) >> 32;
214 speed2_tmp
[1] = (speed2
<< 32);
216 mul(speed1_tmp
, 2, speed1_tmp
, 2, speed1squared
,4);
217 mul(speed2_tmp
, 2, speed2_tmp
, 2, speed2squared
, 4);
219 mul(speed1squared
, 4, &usage2
, 1, speed1squared_usage2
, 5);
220 mul(speed2squared
, 4, &usage2
, 1, speed2squared_usage1
, 5);
223 if (speed1squared_usage2
[i
] > speed2squared_usage1
[i
])
225 if (speed1squared_usage2
[i
] < speed2squared_usage1
[i
])
233 static void _outofbufferspace(void)
237 struct list_head
*curr
;
238 struct conn
*offendingconns
[OOBS_SIZE
];
239 __u32 offendingusage
[OOBS_SIZE
];
240 __u64 offendingspeed
[OOBS_SIZE
];
242 memset(&offendingconns
, 0, sizeof(offendingconns
));
244 mutex_lock(&buffer_conn_list_lock
);
246 curr
= buffer_conn_list
.next
;
247 while (curr
!= &buffer_conn_list
) {
248 unsigned long iflags
;
250 struct conn
*src_in_o
= container_of(curr
, struct conn
,
251 source
.in
.buffer_list
);
260 mutex_lock(&(src_in_o
->rcv_lock
));
262 BUG_ON(src_in_o
->sourcetype
!= SOURCE_IN
);
264 usage
= src_in_o
->source
.in
.usage_reserve
;
266 spin_lock_irqsave(&st_lock
, iflags
);
267 speed
= get_speed(&(src_in_o
->source
.in
.st
), jiffies
);
268 spin_unlock_irqrestore(&st_lock
, iflags
);
270 mutex_unlock(&(src_in_o
->rcv_lock
));
272 if (offendingconns
[OOBS_SIZE
-1] != 0 &&
274 offendingusage
[OOBS_SIZE
-1],
275 offendingspeed
[OOBS_SIZE
-1],
279 offendingconns
[OOBS_SIZE
-1] = src_in_o
;
280 offendingusage
[OOBS_SIZE
-1] = usage
;
281 offendingspeed
[OOBS_SIZE
-1] = speed
;
283 for (i
=OOBS_SIZE
-2;i
>=0;i
++) {
284 struct conn
*tmpconn
;
288 if (offendingconns
[i
] != 0 && compare_scores(
289 offendingusage
[i
], offendingspeed
[i
],
291 offendingspeed
[i
+1]) >= 0)
294 tmpconn
= offendingconns
[i
];
295 usage_tmp
= offendingusage
[i
];
296 speed_tmp
= offendingspeed
[i
];
298 offendingconns
[i
] = offendingconns
[i
+1];
299 offendingusage
[i
] = offendingusage
[i
+1];
300 offendingspeed
[i
] = offendingspeed
[i
+1];
302 offendingconns
[i
+1] = tmpconn
;
303 offendingusage
[i
+1] = usage_tmp
;
304 offendingspeed
[i
+1] = speed_tmp
;
308 for (i
=0;i
<OOBS_SIZE
;i
++) {
309 if (offendingconns
[i
] == 0)
311 kref_get(&(offendingconns
[i
]->ref
));
314 mutex_unlock(&buffer_conn_list_lock
);
316 for (i
=0;i
<OOBS_SIZE
;i
++) {
319 if (offendingconns
[i
] == 0)
322 mutex_lock(&bufferlimits_lock
);
323 resetneeded
= ((bufferusage_reserve
*4)/3 > BUFFERSPACE_RESERVE
);
324 mutex_unlock(&bufferlimits_lock
);
327 reset_conn(offendingconns
[i
]);
328 kref_put(&(offendingconns
[i
]->ref
), free_conn
);
331 mutex_lock(&bufferlimits_lock
);
332 mutex_lock(&buffer_conn_list_lock
);
333 while(list_empty(&buffer_conn_tmp_list
) == 0) {
334 curr
= buffer_conn_tmp_list
.next
;
336 list_add(curr
, &buffer_conn_list
);
338 mutex_unlock(&buffer_conn_list_lock
);
339 mutex_unlock(&bufferlimits_lock
);
342 static void outofbufferspace(struct work_struct
*work
)
345 unsigned long iflags
;
348 mutex_lock(&bufferlimits_lock
);
349 spin_lock_irqsave(&oobss_lock
, iflags
);
350 resetneeded
= (bufferusage_reserve
> BUFFERSPACE_RESERVE
);
352 if (resetneeded
== 0)
353 outofbufferspace_scheduled
= 0;
355 spin_unlock_irqrestore(&oobss_lock
, iflags
);
356 mutex_unlock(&bufferlimits_lock
);
358 if (resetneeded
== 0)
365 static void refresh_bufferusage(struct conn
*src_in_l
)
367 BUG_ON(src_in_l
->sourcetype
!= SOURCE_IN
);
369 bufferusage_init
-= src_in_l
->source
.in
.usage_init
;
370 bufferusage_speed
-= src_in_l
->source
.in
.usage_speed
;
371 bufferusage_ata
-= src_in_l
->source
.in
.usage_ata
;
372 bufferusage_reserve
-= src_in_l
->source
.in
.usage_reserve
;
374 src_in_l
->source
.in
.usage_ata
= src_in_l
->data_buf
.totalsize
;
375 if (src_in_l
->source
.in
.usage_ata
> src_in_l
->source
.in
.buffer_ata
)
376 src_in_l
->source
.in
.usage_ata
= src_in_l
->source
.in
.buffer_ata
;
379 if (src_in_l
->source
.in
.usage_ata
== src_in_l
->data_buf
.totalsize
)
380 src_in_l
->source
.in
.usage_speed
= 0;
382 src_in_l
->source
.in
.usage_speed
= src_in_l
->data_buf
.totalsize
-
383 src_in_l
->source
.in
.usage_ata
;
385 if (src_in_l
->source
.in
.usage_speed
> src_in_l
->source
.in
.buffer_speed
)
386 src_in_l
->source
.in
.usage_speed
=
387 src_in_l
->source
.in
.buffer_speed
;
390 if ((src_in_l
->source
.in
.usage_ata
+ src_in_l
->source
.in
.usage_speed
) ==
391 src_in_l
->data_buf
.totalsize
)
392 src_in_l
->source
.in
.usage_init
= 0;
394 src_in_l
->source
.in
.usage_init
= src_in_l
->data_buf
.totalsize
-
395 src_in_l
->source
.in
.usage_ata
-
396 src_in_l
->source
.in
.usage_speed
;
398 if (src_in_l
->source
.in
.usage_init
> src_in_l
->source
.in
.buffer_init
)
399 src_in_l
->source
.in
.usage_init
=
400 src_in_l
->source
.in
.buffer_init
;
403 if ((src_in_l
->source
.in
.usage_ata
+ src_in_l
->source
.in
.usage_speed
+
404 src_in_l
->source
.in
.usage_init
) ==
405 src_in_l
->data_buf
.totalsize
)
406 src_in_l
->source
.in
.usage_reserve
= 0;
408 src_in_l
->source
.in
.usage_reserve
=
409 src_in_l
->data_buf
.totalsize
-
410 src_in_l
->source
.in
.usage_ata
-
411 src_in_l
->source
.in
.usage_speed
-
412 src_in_l
->source
.in
.usage_init
;
414 bufferusage_init
+= src_in_l
->source
.in
.usage_init
;
415 bufferusage_speed
+= src_in_l
->source
.in
.usage_speed
;
416 bufferusage_ata
+= src_in_l
->source
.in
.usage_ata
;
417 bufferusage_reserve
+= src_in_l
->source
.in
.usage_reserve
;
419 if (bufferusage_reserve
> BUFFERSPACE_RESERVE
) {
420 unsigned long iflags
;
421 spin_lock_irqsave(&oobss_lock
, iflags
);
422 if (outofbufferspace_scheduled
== 0) {
423 schedule_work(&outofbufferspace_work
);
424 outofbufferspace_scheduled
= 1;
427 spin_unlock_irqrestore(&oobss_lock
, iflags
);
431 static __u8
__get_window(struct conn
*src_in_l
)
435 BUG_ON(src_in_l
->sourcetype
!= SOURCE_IN
);
437 if (src_in_l
->source
.in
.usage_reserve
!= 0)
440 BUG_ON(src_in_l
->source
.in
.usage_init
>
441 src_in_l
->source
.in
.buffer_init
);
442 BUG_ON(src_in_l
->source
.in
.usage_speed
>
443 src_in_l
->source
.in
.buffer_speed
);
444 BUG_ON(src_in_l
->source
.in
.usage_ata
> src_in_l
->source
.in
.buffer_ata
);
446 window
+= src_in_l
->source
.in
.buffer_init
;
447 window
+= src_in_l
->source
.in
.buffer_speed
;
448 window
+= src_in_l
->source
.in
.buffer_ata
;
450 window
-= src_in_l
->source
.in
.usage_init
;
451 window
-= src_in_l
->source
.in
.usage_speed
;
452 window
-= src_in_l
->source
.in
.usage_ata
;
454 if (window
> MAX_ANNOUNCE_WINDOW
)
455 window
= MAX_ANNOUNCE_WINDOW
;
457 return enc_log_64_11(window
);
460 #warning todo upper buffer limits
461 static __u8
_get_window(struct conn
*cn
, struct neighbor
*expectedsender
,
462 __u32 expected_connid
, int from_acksend
, int listlocked
)
464 unsigned long iflags
;
466 unsigned long jiffies_tmp
;
471 __u64 bufferlimit_init
;
472 __u64 connlimit_init
;
475 __u64 bufferlimit_speed
;
476 __u64 connlimit_speed
;
478 mutex_lock(&(cn
->rcv_lock
));
480 BUG_ON(expectedsender
== 0 && cn
->sourcetype
!= SOURCE_IN
);
482 if (unlikely(unlikely(cn
->sourcetype
!= SOURCE_IN
) ||
483 unlikely(expectedsender
!= 0 && (cn
->source
.in
.nb
!=
484 expectedsender
|| cn
->reversedir
->target
.out
.conn_id
!=
488 if (unlikely(cn
->isreset
!= 0)) {
489 if (listlocked
&& (cn
->source
.in
.buffer_list
.next
!= 0 ||
490 cn
->source
.in
.buffer_list
.prev
!= 0)) {
491 list_del(&(cn
->source
.in
.buffer_list
));
492 cn
->source
.in
.buffer_list
.next
= 0;
493 cn
->source
.in
.buffer_list
.prev
= 0;
494 kref_put(&(cn
->ref
), free_conn
);
500 if (cn
->source
.in
.buffer_list
.next
!= 0 ||
501 cn
->source
.in
.buffer_list
.prev
!= 0) {
502 list_del(&(cn
->source
.in
.buffer_list
));
504 kref_get(&(cn
->ref
));
506 list_add_tail(&(cn
->source
.in
.buffer_list
),
508 } else if (cn
->source
.in
.buffer_list
.next
== 0 &&
509 cn
->source
.in
.buffer_list
.prev
== 0) {
510 kref_get(&(cn
->ref
));
511 list_add_tail(&(cn
->source
.in
.buffer_list
),
512 &buffer_conn_tmp_list
);
516 conns
= atomic_read(&num_conns
);
518 bufferlimit_init
= desired_bufferusage(bufferassigned_init
,
519 bufferusage_init
, BUFFERASSIGN_INIT
, BUFFERSPACE_INIT
);
520 connlimit_init
= (bufferlimit_init
+ conns
- 1) / conns
;
522 bufferassigned_init
-= cn
->source
.in
.buffer_init
;
523 if (((__u32
) connlimit_init
) != connlimit_init
)
524 cn
->source
.in
.buffer_init
= -1;
526 cn
->source
.in
.buffer_init
= (__u32
) connlimit_init
;
527 bufferassigned_init
+= cn
->source
.in
.buffer_init
;
529 spin_lock_irqsave(&st_lock
, iflags
);
530 jiffies_tmp
= jiffies
;
531 totalspeed
= get_speed(&st
, jiffies_tmp
);
532 bufferlimit_speed
= desired_bufferusage(bufferassigned_speed
,
533 bufferusage_speed
, BUFFERASSIGN_SPEED
,
535 connlimit_speed
= multiply_div(bufferlimit_speed
,
536 get_speed(&(cn
->source
.in
.st
), jiffies_tmp
),
538 spin_unlock_irqrestore(&st_lock
, iflags
);
540 bufferassigned_speed
-= cn
->source
.in
.buffer_speed
;
541 if (((__u32
) connlimit_speed
) != connlimit_speed
)
542 cn
->source
.in
.buffer_speed
= -1;
544 cn
->source
.in
.buffer_speed
= (__u32
) connlimit_speed
;
545 bufferassigned_speed
+= cn
->source
.in
.buffer_speed
;
547 refresh_bufferusage(cn
);
549 window
= __get_window(cn
);
551 cn
->source
.in
.window_seqnolimit
= cn
->source
.in
.next_seqno
+
552 dec_log_64_11(window
);
555 cn
->source
.in
.window_seqnolimit_remote
=
556 cn
->source
.in
.window_seqnolimit
;
558 send_ack_conn_ifneeded(cn
);
561 mutex_unlock(&(cn
->rcv_lock
));
566 __u8
get_window(struct conn
*cn
, struct neighbor
*expectedsender
,
567 __u32 expected_connid
, int from_acksend
)
574 mutex_lock(&bufferlimits_lock
);
575 listlocked
= mutex_trylock(&buffer_conn_list_lock
);
577 window
= _get_window(cn
, expectedsender
, expected_connid
, from_acksend
,
582 * refresh window of idle conns as well to keep global counters
586 cn2
= container_of(buffer_conn_list
.next
, struct conn
,
587 source
.in
.buffer_list
);
589 if (list_empty(&buffer_conn_list
) == 0 && cn2
!= cn
)
590 _get_window(cn2
, 0, 0, 0, listlocked
);
593 if (list_empty(&buffer_conn_tmp_list
) == 0) {
594 cn2
= container_of(buffer_conn_tmp_list
.next
,
595 struct conn
, source
.in
.buffer_list
);
597 _get_window(cn2
, 0, 0, 0, listlocked
);
600 mutex_unlock(&buffer_conn_list_lock
);
603 mutex_unlock(&bufferlimits_lock
);
608 void reset_bufferusage(struct conn
*cn
)
612 mutex_lock(&bufferlimits_lock
);
613 listlocked
= mutex_trylock(&buffer_conn_list_lock
);
614 mutex_lock(&(cn
->rcv_lock
));
616 if (cn
->sourcetype
!= SOURCE_IN
)
619 bufferusage_init
-= cn
->source
.in
.usage_init
;
620 bufferusage_speed
-= cn
->source
.in
.usage_speed
;
621 bufferusage_ata
-= cn
->source
.in
.usage_ata
;
622 bufferusage_reserve
-= cn
->source
.in
.usage_reserve
;
624 bufferassigned_init
-= cn
->source
.in
.buffer_init
;
625 bufferassigned_speed
-= cn
->source
.in
.buffer_speed
;
626 bufferassigned_ata
-= cn
->source
.in
.buffer_ata
;
628 if (listlocked
&& (cn
->source
.in
.buffer_list
.next
!= 0 ||
629 cn
->source
.in
.buffer_list
.prev
!= 0)) {
630 list_del(&(cn
->source
.in
.buffer_list
));
631 cn
->source
.in
.buffer_list
.next
= 0;
632 cn
->source
.in
.buffer_list
.prev
= 0;
633 kref_put(&(cn
->ref
), free_conn
);
637 mutex_unlock(&(cn
->rcv_lock
));
639 mutex_unlock(&buffer_conn_list_lock
);
640 mutex_unlock(&bufferlimits_lock
);
643 void refresh_speedstat(struct conn
*src_in_l
, __u32 written
)
645 unsigned long iflags
;
646 unsigned long jiffies_tmp
;
648 spin_lock_irqsave(&st_lock
, iflags
);
650 jiffies_tmp
= jiffies
;
652 if (src_in_l
->source
.in
.st
.jiffies_last_update
!= jiffies_tmp
)
653 get_speed(&(src_in_l
->source
.in
.st
), jiffies_tmp
);
654 if (src_in_l
->source
.in
.st
.bytes_curr
+ written
< written
)
655 src_in_l
->source
.in
.st
.bytes_curr
= -1;
657 src_in_l
->source
.in
.st
.bytes_curr
+= written
;
659 if (st
.jiffies_last_update
!= jiffies_tmp
)
660 get_speed(&st
, jiffies_tmp
);
661 if (st
.bytes_curr
+ written
< written
)
664 st
.bytes_curr
+= written
;
666 spin_unlock_irqrestore(&st_lock
, iflags
);
669 void reset_ooo_queue(struct conn
*src_in_l
)
673 BUG_ON(src_in_l
->sourcetype
!= SOURCE_IN
);
675 skb
= src_in_l
->source
.in
.reorder_queue
.next
;
677 while ((void *) skb
!= (void *) &(src_in_l
->source
.in
.reorder_queue
)) {
678 struct skb_procstate
*ps
= skb_pstate(skb
);
681 if (src_in_l
->source
.in
.next_seqno
!= ps
->funcstate
.rcv2
.seqno
)
684 drop
= receive_skb(src_in_l
, skb
);
688 skb_unlink(skb
, &(src_in_l
->source
.in
.reorder_queue
));
689 src_in_l
->source
.in
.ooo_packets
--;
690 atomic_dec(&(src_in_l
->source
.in
.nb
->ooo_packets
));
691 atomic_dec(&ooo_packets
);
693 src_in_l
->source
.in
.next_seqno
+= skb
->len
;
697 #warning todo overlapping seqno rcv
698 void drain_ooo_queue(struct conn
*src_in_l
)
700 BUG_ON(src_in_l
->sourcetype
!= SOURCE_IN
);
702 while (skb_queue_empty(&(src_in_l
->source
.in
.reorder_queue
)) == 0) {
703 struct sk_buff
*skb
= src_in_l
->source
.in
.reorder_queue
.next
;
704 skb_unlink(skb
, &(src_in_l
->source
.in
.reorder_queue
));
706 src_in_l
->source
.in
.ooo_packets
--;
707 atomic_dec(&(src_in_l
->source
.in
.nb
->ooo_packets
));
708 atomic_dec(&ooo_packets
);
712 static int _conn_rcv_ooo(struct conn
*src_in_l
, struct sk_buff
*skb
)
714 struct skb_procstate
*ps
= skb_pstate(skb
);
715 struct sk_buff_head
*reorder_queue
=
716 &(src_in_l
->source
.in
.reorder_queue
);
717 struct sk_buff
*curr
= reorder_queue
->next
;
721 #warning todo limit amount of data, not packet count
722 src_in_l
->source
.in
.ooo_packets
++;
723 if (src_in_l
->source
.in
.ooo_packets
> MAX_TOTAL_OOO_PER_CONN
)
726 ooo
= atomic_inc_return(&(src_in_l
->source
.in
.nb
->ooo_packets
));
727 if (ooo
> MAX_TOTAL_OOO_PER_NEIGH
)
730 ooo
= atomic_inc_return(&ooo_packets
);
731 if (ooo
> MAX_TOTAL_OOO_PACKETS
)
736 struct skb_procstate
*ps2
= skb_pstate(curr
);
738 if ((void *) curr
== (void *) reorder_queue
) {
739 skb_queue_tail(reorder_queue
, skb
);
743 if (ps
->funcstate
.rcv2
.seqno
> ps2
->funcstate
.rcv2
.seqno
) {
744 skb_insert(curr
, skb
, reorder_queue
);
753 atomic_dec(&ooo_packets
);
755 atomic_dec(&(src_in_l
->source
.in
.nb
->ooo_packets
));
757 src_in_l
->source
.in
.ooo_packets
--;
765 static void _conn_rcv(struct neighbor
*nb
, struct conn
*src_in
,
766 struct sk_buff
*skb
, __u32 conn_id
)
768 struct skb_procstate
*ps
= skb_pstate(skb
);
773 __u32 len
= skb
->len
;
775 mutex_lock(&(src_in
->rcv_lock
));
777 if (unlikely(unlikely(src_in
->isreset
!= 0) ||
778 unlikely(src_in
->sourcetype
!= SOURCE_IN
) ||
779 unlikely(src_in
->source
.in
.conn_id
!= conn_id
)))
783 if (unlikely(is_from_nb(skb
, src_in
->source
.in
.nb
) == 0))
786 if (unlikely(src_in
->source
.in
.nb
!= nb
))
790 set_last_act(src_in
);
792 if (((__s32
) (ps
->funcstate
.rcv2
.seqno
+ skb
->len
-
793 src_in
->source
.in
.window_seqnolimit
)) > 0)
796 in_order
= (src_in
->source
.in
.next_seqno
== ps
->funcstate
.rcv2
.seqno
);
798 drop
= _conn_rcv_ooo(src_in
, skb
);
800 src_in
->source
.in
.next_seqno
+= skb
->len
;
801 drop
= receive_skb(src_in
, skb
);
808 struct control_msg_out
*cm
=
809 alloc_control_msg(src_in
->source
.in
.nb
,
811 send_ack_conn_ooo(cm
, src_in
,
812 src_in
->reversedir
->target
.out
.conn_id
,
813 ps
->funcstate
.rcv2
.seqno
, len
);
815 drain_ooo_queue(src_in
);
816 src_in
->source
.in
.inorder_ack_needed
= 1;
820 send_ack_conn_ifneeded(src_in
);
822 mutex_unlock(&(src_in
->rcv_lock
));
824 if (unlikely(drop
)) {
826 } else if (in_order
== 1) {
831 static void conn_rcv(struct neighbor
*nb
, struct sk_buff
*skb
, __u32 conn_id
,
834 struct conn
*cn_src_in
;
835 struct skb_procstate
*ps
= skb_pstate(skb
);
837 memset(ps
, 0, sizeof(struct skb_procstate
));
839 ps
->funcstate
.rcv2
.seqno
= seqno
;
841 cn_src_in
= get_conn(conn_id
);
843 if (unlikely(cn_src_in
== 0)) {
845 printk(KERN_DEBUG
"unknown conn_id when receiving: %d",
849 nb
= get_neigh_by_mac(skb
);
854 if (likely(nb
!= 0)) {
855 send_connid_unknown(nb
, conn_id
);
857 kref_put(&(nb
->ref
), neighbor_free
);
862 _conn_rcv(nb
, cn_src_in
, skb
, conn_id
);
863 kref_put(&(cn_src_in
->ref
), free_conn
);
866 void conn_rcv_buildskb(struct neighbor
*nb
, char *data
, __u32 datalen
,
867 __u32 conn_id
, __u32 seqno
)
869 struct sk_buff
*skb
= alloc_skb(datalen
, GFP_KERNEL
);
870 char *dst
= skb_put(skb
, datalen
);
871 memcpy(dst
, data
, datalen
);
872 conn_rcv(nb
, skb
, conn_id
, seqno
);
875 static void rcv_data(struct sk_buff
*skb
)
880 char *connid_p
= cor_pull_skb(skb
, 4);
881 char *seqno_p
= cor_pull_skb(skb
, 4);
885 ((char *)&conn_id
)[0] = connid_p
[0];
886 ((char *)&conn_id
)[1] = connid_p
[1];
887 ((char *)&conn_id
)[2] = connid_p
[2];
888 ((char *)&conn_id
)[3] = connid_p
[3];
890 ((char *)&seqno
)[0] = seqno_p
[0];
891 ((char *)&seqno
)[1] = seqno_p
[1];
892 ((char *)&seqno
)[2] = seqno_p
[2];
893 ((char *)&seqno
)[3] = seqno_p
[3];
895 conn_id
= be32_to_cpu(conn_id
);
896 seqno
= be32_to_cpu(seqno
);
898 /* get_random_bytes(&rand, 1);
901 printk(KERN_ERR "drop %d %d %d %d %d", conn_id, seqno_p[0],
902 seqno_p[1], seqno_p[2], seqno_p[3]);
907 struct neighbor
*nb
= get_neigh_by_mac(skb
);
908 if (unlikely(nb
== 0))
910 kernel_packet(nb
, skb
, seqno
);
911 kref_put(&(nb
->ref
), neighbor_free
);
913 conn_rcv(0, skb
, conn_id
, seqno
);
922 static void rcv(struct work_struct
*work
)
924 struct sk_buff
*skb
= skb_from_pstate(container_of(work
,
925 struct skb_procstate
, funcstate
.rcv
.work
));
930 atomic_dec(&packets_in_workqueue
);
932 packet_type_p
= cor_pull_skb(skb
, 1);
934 if (unlikely(packet_type_p
== 0))
937 packet_type
= *packet_type_p
;
939 if (packet_type
== PACKET_TYPE_ANNOUNCE
) {
944 if (unlikely(packet_type
!= PACKET_TYPE_DATA
))
955 static int queue_rcv_processing(struct sk_buff
*skb
, struct net_device
*dev
,
956 struct packet_type
*pt
, struct net_device
*orig_dev
)
958 struct skb_procstate
*ps
= skb_pstate(skb
);
961 if (skb
->pkt_type
== PACKET_OTHERHOST
)
964 BUG_ON(skb
->next
!= 0);
966 queuelen
= atomic_inc_return(&packets_in_workqueue
);
968 BUG_ON(queuelen
<= 0);
970 #warning todo limit per interface, inbound credits
971 if (queuelen
> MAX_PACKETS_IN_RCVQUEUE
) {
972 atomic_dec(&packets_in_workqueue
);
976 INIT_WORK(&(ps
->funcstate
.rcv
.work
), rcv
);
977 queue_work(packet_wq
, &(ps
->funcstate
.rcv
.work
));
978 return NET_RX_SUCCESS
;
985 static struct packet_type ptype_cor
= {
986 .type
= htons(ETH_P_COR
),
988 .func
= queue_rcv_processing
991 int __init
cor_rcv_init(void)
993 bufferassigned_init
= 0;
994 bufferassigned_speed
= 0;
995 bufferassigned_ata
= 0;
997 bufferusage_init
= 0;
998 bufferusage_speed
= 0;
1000 bufferusage_reserve
= 0;
1002 memset(&st
, 0, sizeof(struct speedtracker
));
1004 BUG_ON(sizeof(struct skb_procstate
) > 48);
1005 packet_wq
= create_workqueue("cor_packet");
1006 INIT_WORK(&outofbufferspace_work
, outofbufferspace
);
1007 outofbufferspace_scheduled
= 0;
1009 dev_add_pack(&ptype_cor
);
1013 MODULE_LICENSE("GPL");