2 * Connection oriented routing
3 * Copyright (C) 2007-2010 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <linux/module.h>
22 #include <linux/version.h>
23 #include <linux/kernel.h>
24 #include <linux/init.h>
30 atomic_t packets_in_workqueue
= ATOMIC_INIT(0);
32 atomic_t ooo_packets
= ATOMIC_INIT(0);
34 static struct workqueue_struct
*packet_wq
;
36 static struct workqueue_struct
*outofbufferspace_wq
;
37 struct work_struct outofbufferspace_work
;
38 spinlock_t oobss_lock
;
39 int outofbufferspace_scheduled
;
42 * buffering space is divided in 4 areas:
45 * distributed equally among all conns; shrinks and grows immediately when there
46 * are new connections or connections are reset
49 * distributed proportional to speed; shrinks and grows constantly
52 * used to make noise to make traffic analysis harder; may only shrink if data
53 * is either send on or "stuck" for a long time
56 * reserve in case where sudden shrinking causes some connections to contain
57 * more old data than allowed by the first 3 buffers. If this area is full, the
58 * out-of-memory conn-resetter is triggered
60 * Each area commits a certain amount of space to each connection. This is the
61 * maximum buffer space a connection is allowed to use. The space of a specific
62 * connection is first accounted to buffer_ata. If the buffer space allowed to
63 * use is exceeded, the rest is accounted to buffer_speed and then buffer_init.
64 * The reserve area will be used last. This should only be the case, if the
65 * assigned buffer space of the first 3 areas shrink suddenly. If this area is
66 * also used up, connections will be reset.
69 DEFINE_MUTEX(buffer_conn_list_lock
);
70 LIST_HEAD(buffer_conn_list
);
73 * used to buffer inserts when main list is locked, moved to main list, after
74 * processing of main list finishes
76 LIST_HEAD(buffer_conn_tmp_list
); /* protected by bufferlimits_lock */
79 DEFINE_MUTEX(bufferlimits_lock
);
81 static __u64 bufferassigned_init
;
82 static __u64 bufferassigned_speed
;
83 static __u64 bufferassigned_ata
;
85 static __u64 bufferusage_init
;
86 static __u64 bufferusage_speed
;
87 static __u64 bufferusage_ata
;
88 static __u64 bufferusage_reserve
;
91 struct speedtracker st
;
93 static __u64
desired_bufferusage(__u64 assigned
, __u64 usage
, __u64 assignlimit
,
99 if (unlikely(assignlimit
< usagelimit
))
100 assignlimit
= usagelimit
;
102 if (multiply_div(usage
, 9, 10) > usagelimit
)
103 return multiply_div(usagelimit
, 9, 10);
105 load
= multiply_div(usage
, 192, usagelimit
);
107 /* slow limit increase, fast decrease */
110 } else if (load
< 128) {
112 return multiply_div(usagelimit
, 9, 10);
116 ret
= multiply_div(assigned
, 128, load
);
118 ret
= multiply_div(assigned
, 128, load
+ load
- 128);
121 if (ret
> assignlimit
)
126 static __u64
get_speed(struct speedtracker
*st
, unsigned long jiffies_tmp
)
128 if (unlikely(time_after(st
->jiffies_last_update
, jiffies_tmp
) ||
129 time_before(st
->jiffies_last_update
+ HZ
*10,
131 st
->jiffies_last_update
= jiffies_tmp
;
137 for (;time_before(st
->jiffies_last_update
, jiffies_tmp
);
138 st
->jiffies_last_update
++) {
139 __u32 bytes_curr
= 0;
140 if ((st
->jiffies_last_update
+ 1) == jiffies_tmp
) {
141 bytes_curr
= st
->bytes_curr
;
144 st
->speed
= (st
->speed
* (HZ
-1) + (((__u64
)bytes_curr
)<<16))/HZ
;
147 if ((st
->jiffies_last_update
+ 1) == jiffies_tmp
) {
148 st
->jiffies_last_update
++;
151 return (st
->speed
* (HZ
*2 - 1) + (((__u64
)st
->bytes_curr
) << 17))/HZ
/2;
155 * val1[0], val2[2], res[0] ... least significant
156 * val1[val1len-1], val2[val2len-1], res[reslen-1] ... most significant
158 static void mul(__u32
*val1
, unsigned int val1len
, __u32
*val2
,
159 unsigned int val2len
, __u32
*res
, int reslen
)
161 int digits
= val1len
+ val2len
;
165 BUG_ON(val1len
> 0 && val2len
> 0 && reslen
< digits
);
167 memset(res
, 0, reslen
);
169 if (val1len
== 0 || val2len
== 0)
172 for(i
=0;i
<digits
;i
++) {
174 res
[i
] = (__u32
) overflow
;
175 overflow
= overflow
>> 32;
176 for(idx1
=0;idx1
<val1len
&& idx1
<=i
;idx1
++) {
183 tmpres
= ((__u64
) (val1
[idx1
])) *
184 ((__u64
) (val2
[idx2
]));
185 overflow
+= tmpres
>> 32;
186 tmpres
= (tmpres
<< 32) >> 32;
187 if (res
[i
] + tmpres
< res
[i
])
192 BUG_ON(overflow
!= 0);
197 * 1 == usage1/speed1 offends more
198 * 0 == both offend the same
199 * -1 == usage2/speed2 offends more
201 static int compare_scores(__u32 usage1
, __u64 speed1
, __u32 usage2
,
206 __u32 speed1squared
[4];
207 __u32 speed2squared
[4];
209 __u32 speed1squared_usage2
[5];
210 __u32 speed2squared_usage1
[5];
216 speed1_tmp
[0] = (speed1
<< 32) >> 32;
217 speed1_tmp
[1] = (speed1
>> 32);
218 speed2_tmp
[0] = (speed2
<< 32) >> 32;
219 speed2_tmp
[1] = (speed2
<< 32);
221 mul(speed1_tmp
, 2, speed1_tmp
, 2, speed1squared
,4);
222 mul(speed2_tmp
, 2, speed2_tmp
, 2, speed2squared
, 4);
224 mul(speed1squared
, 4, &usage2
, 1, speed1squared_usage2
, 5);
225 mul(speed2squared
, 4, &usage2
, 1, speed2squared_usage1
, 5);
228 if (speed1squared_usage2
[i
] > speed2squared_usage1
[i
])
230 if (speed1squared_usage2
[i
] < speed2squared_usage1
[i
])
238 static void _outofbufferspace(void)
242 struct list_head
*curr
;
243 struct conn
*offendingconns
[OOBS_SIZE
];
244 __u32 offendingusage
[OOBS_SIZE
];
245 __u64 offendingspeed
[OOBS_SIZE
];
247 memset(&offendingconns
, 0, sizeof(offendingconns
));
249 mutex_lock(&buffer_conn_list_lock
);
251 curr
= buffer_conn_list
.next
;
252 while (curr
!= &buffer_conn_list
) {
253 unsigned long iflags
;
255 struct conn
*conn
= container_of(curr
, struct conn
,
256 source
.in
.buffer_list
);
265 mutex_lock(&(conn
->rcv_lock
));
267 BUG_ON(conn
->sourcetype
!= SOURCE_IN
);
269 usage
= conn
->source
.in
.usage_reserve
;
271 spin_lock_irqsave(&st_lock
, iflags
);
272 speed
= get_speed(&(conn
->source
.in
.st
), jiffies
);
273 spin_unlock_irqrestore(&st_lock
, iflags
);
275 mutex_unlock(&(conn
->rcv_lock
));
277 if (offendingconns
[OOBS_SIZE
-1] != 0 &&
279 offendingusage
[OOBS_SIZE
-1],
280 offendingspeed
[OOBS_SIZE
-1],
284 if (offendingconns
[OOBS_SIZE
-1] != 0)
285 kref_put(&(offendingconns
[OOBS_SIZE
-1]->ref
),
288 kref_get(&(conn
->ref
));
289 offendingconns
[OOBS_SIZE
-1] = conn
;
290 offendingusage
[OOBS_SIZE
-1] = usage
;
291 offendingspeed
[OOBS_SIZE
-1] = speed
;
293 for (i
=OOBS_SIZE
-2;i
>=0;i
++) {
294 struct conn
*tmpconn
;
298 if (offendingconns
[i
] != 0 && compare_scores(
299 offendingusage
[i
], offendingspeed
[i
],
301 offendingspeed
[i
+1]) >= 0)
304 tmpconn
= offendingconns
[i
];
305 usage_tmp
= offendingusage
[i
];
306 speed_tmp
= offendingspeed
[i
];
308 offendingconns
[i
] = offendingconns
[i
+1];
309 offendingusage
[i
] = offendingusage
[i
+1];
310 offendingspeed
[i
] = offendingspeed
[i
+1];
312 offendingconns
[i
+1] = tmpconn
;
313 offendingusage
[i
+1] = usage_tmp
;
314 offendingspeed
[i
+1] = speed_tmp
;
318 for (i
=0;i
<OOBS_SIZE
;i
++) {
319 kref_get(&(offendingconns
[i
]->ref
));
322 mutex_unlock(&buffer_conn_list_lock
);
324 for (i
=0;i
<OOBS_SIZE
;i
++) {
327 if (offendingconns
[i
] == 0)
330 mutex_lock(&bufferlimits_lock
);
331 resetneeded
= ((bufferusage_reserve
*4)/3 > BUFFERSPACE_RESERVE
);
332 mutex_unlock(&bufferlimits_lock
);
335 reset_conn(offendingconns
[i
]);
336 kref_put(&(offendingconns
[i
]->ref
), free_conn
);
339 mutex_lock(&bufferlimits_lock
);
340 mutex_lock(&buffer_conn_list_lock
);
341 while(list_empty(&buffer_conn_tmp_list
) == 0) {
342 curr
= buffer_conn_tmp_list
.next
;
344 list_add(curr
, &buffer_conn_list
);
346 mutex_unlock(&buffer_conn_list_lock
);
347 mutex_unlock(&bufferlimits_lock
);
350 static void outofbufferspace(struct work_struct
*work
)
353 unsigned long iflags
;
356 mutex_lock(&bufferlimits_lock
);
357 spin_lock_irqsave(&oobss_lock
, iflags
);
358 resetneeded
= (bufferusage_reserve
> BUFFERSPACE_RESERVE
);
360 if (resetneeded
== 0)
361 outofbufferspace_scheduled
= 0;
363 spin_unlock_irqrestore(&oobss_lock
, iflags
);
364 mutex_unlock(&bufferlimits_lock
);
366 if (resetneeded
== 0)
373 static void refresh_bufferusage(struct conn
*rconn
)
375 BUG_ON(rconn
->sourcetype
!= SOURCE_IN
);
377 bufferusage_init
-= rconn
->source
.in
.usage_init
;
378 bufferusage_speed
-= rconn
->source
.in
.usage_speed
;
379 bufferusage_ata
-= rconn
->source
.in
.usage_ata
;
380 bufferusage_reserve
-= rconn
->source
.in
.usage_reserve
;
382 rconn
->source
.in
.usage_ata
= rconn
->buf
.totalsize
;
383 if (rconn
->source
.in
.usage_ata
> rconn
->source
.in
.buffer_ata
)
384 rconn
->source
.in
.usage_ata
= rconn
->source
.in
.buffer_ata
;
387 if (rconn
->source
.in
.usage_ata
== rconn
->buf
.totalsize
)
388 rconn
->source
.in
.usage_speed
= 0;
390 rconn
->source
.in
.usage_speed
= rconn
->buf
.totalsize
-
391 rconn
->source
.in
.usage_ata
;
393 if (rconn
->source
.in
.usage_speed
> rconn
->source
.in
.buffer_speed
)
394 rconn
->source
.in
.usage_speed
= rconn
->source
.in
.buffer_speed
;
397 if ((rconn
->source
.in
.usage_ata
+ rconn
->source
.in
.usage_speed
) ==
398 rconn
->buf
.totalsize
)
399 rconn
->source
.in
.usage_init
= 0;
401 rconn
->source
.in
.usage_init
= rconn
->buf
.totalsize
-
402 rconn
->source
.in
.usage_ata
-
403 rconn
->source
.in
.usage_speed
;
405 if (rconn
->source
.in
.usage_init
> rconn
->source
.in
.buffer_init
)
406 rconn
->source
.in
.usage_init
= rconn
->source
.in
.buffer_init
;
409 if ((rconn
->source
.in
.usage_ata
+ rconn
->source
.in
.usage_speed
+
410 rconn
->source
.in
.usage_init
) == rconn
->buf
.totalsize
)
411 rconn
->source
.in
.usage_reserve
= 0;
413 rconn
->source
.in
.usage_reserve
= rconn
->buf
.totalsize
-
414 rconn
->source
.in
.usage_ata
-
415 rconn
->source
.in
.usage_speed
-
416 rconn
->source
.in
.usage_init
;
418 bufferusage_init
+= rconn
->source
.in
.usage_init
;
419 bufferusage_speed
+= rconn
->source
.in
.usage_speed
;
420 bufferusage_ata
+= rconn
->source
.in
.usage_ata
;
421 bufferusage_reserve
+= rconn
->source
.in
.usage_reserve
;
423 if (bufferusage_reserve
> BUFFERSPACE_RESERVE
) {
424 unsigned long iflags
;
425 spin_lock_irqsave(&oobss_lock
, iflags
);
426 if (outofbufferspace_scheduled
== 0) {
427 INIT_WORK(&outofbufferspace_work
, outofbufferspace
);
428 queue_work(outofbufferspace_wq
, &outofbufferspace_work
);
431 spin_unlock_irqrestore(&oobss_lock
, iflags
);
435 static __u32
__get_window(struct conn
*rconn
)
439 if (rconn
->source
.in
.usage_reserve
!= 0)
442 BUG_ON(rconn
->source
.in
.usage_init
> rconn
->source
.in
.buffer_init
);
443 BUG_ON(rconn
->source
.in
.usage_speed
> rconn
->source
.in
.buffer_speed
);
444 BUG_ON(rconn
->source
.in
.usage_ata
> rconn
->source
.in
.buffer_ata
);
446 window
+= rconn
->source
.in
.buffer_init
;
447 window
+= rconn
->source
.in
.buffer_speed
;
448 window
+= rconn
->source
.in
.buffer_ata
;
450 window
-= rconn
->source
.in
.usage_init
;
451 window
-= rconn
->source
.in
.usage_speed
;
452 window
-= rconn
->source
.in
.usage_ata
;
454 if (window
> MAX_ANNOUNCE_WINDOW
)
455 window
= MAX_ANNOUNCE_WINDOW
;
460 static __u32
_get_window(struct conn
*rconn
, int listlocked
)
462 unsigned long iflags
;
464 unsigned long jiffies_tmp
;
469 __u64 bufferlimit_init
;
470 __u64 connlimit_init
;
473 __u64 bufferlimit_speed
;
474 __u64 connlimit_speed
;
476 mutex_lock(&(rconn
->rcv_lock
));
478 BUG_ON(rconn
->sourcetype
!= SOURCE_IN
);
480 if (atomic_read(&(rconn
->isreset
)) != 0) {
481 if (listlocked
&& (rconn
->source
.in
.buffer_list
.next
!= 0 ||
482 rconn
->source
.in
.buffer_list
.prev
!= 0)) {
483 list_del(&(rconn
->source
.in
.buffer_list
));
484 rconn
->source
.in
.buffer_list
.next
= 0;
485 rconn
->source
.in
.buffer_list
.prev
= 0;
486 kref_put(&(rconn
->ref
), free_conn
);
492 if (rconn
->source
.in
.buffer_list
.next
!= 0 ||
493 rconn
->source
.in
.buffer_list
.prev
!= 0) {
494 list_del(&(rconn
->source
.in
.buffer_list
));
496 kref_get(&(rconn
->ref
));
498 list_add_tail(&(rconn
->source
.in
.buffer_list
),
500 } else if (rconn
->source
.in
.buffer_list
.next
== 0 &&
501 rconn
->source
.in
.buffer_list
.prev
== 0) {
502 kref_get(&(rconn
->ref
));
503 list_add_tail(&(rconn
->source
.in
.buffer_list
),
504 &buffer_conn_tmp_list
);
508 conns
= atomic_read(&num_conns
);
509 bufferlimit_init
= desired_bufferusage(bufferassigned_init
,
510 bufferusage_init
, BUFFERASSIGN_INIT
, BUFFERSPACE_INIT
);
511 connlimit_init
= (bufferlimit_init
+ conns
- 1) / conns
;
513 bufferassigned_init
-= rconn
->source
.in
.buffer_init
;
514 if (((__u32
) connlimit_init
) != connlimit_init
)
515 rconn
->source
.in
.buffer_init
= -1;
517 rconn
->source
.in
.buffer_init
= (__u32
) connlimit_init
;
518 bufferassigned_init
+= rconn
->source
.in
.buffer_init
;
521 spin_lock_irqsave(&st_lock
, iflags
);
522 jiffies_tmp
= jiffies
;
523 totalspeed
= get_speed(&st
, jiffies_tmp
);
524 bufferlimit_speed
= desired_bufferusage(bufferassigned_speed
,
525 bufferusage_speed
, BUFFERASSIGN_SPEED
,
527 connlimit_speed
= multiply_div(bufferlimit_speed
,
528 get_speed(&(rconn
->source
.in
.st
), jiffies_tmp
),
530 spin_unlock_irqrestore(&st_lock
, iflags
);
532 bufferassigned_speed
-= rconn
->source
.in
.buffer_speed
;
533 if (((__u32
) connlimit_speed
) != connlimit_speed
)
534 rconn
->source
.in
.buffer_speed
= -1;
536 rconn
->source
.in
.buffer_speed
= (__u32
) connlimit_speed
;
537 bufferassigned_speed
+= rconn
->source
.in
.buffer_speed
;
539 refresh_bufferusage(rconn
);
541 window
= __get_window(rconn
);
543 mutex_unlock(&(rconn
->rcv_lock
));
548 /* do not hold rcv_lock while calling this */
549 __u32
get_window(struct conn
*rconn
)
556 mutex_lock(&bufferlimits_lock
);
557 listlocked
= mutex_trylock(&buffer_conn_list_lock
);
559 window
= _get_window(rconn
, listlocked
);
563 * refresh window of idle conns as well to keep global counters
567 rconn2
= container_of(buffer_conn_list
.next
, struct conn
,
568 source
.in
.buffer_list
);
570 if (list_empty(&buffer_conn_list
) == 0 && rconn2
!= rconn
)
571 _get_window(rconn2
, listlocked
);
574 if (list_empty(&buffer_conn_tmp_list
) == 0) {
575 rconn2
= container_of(buffer_conn_tmp_list
.next
,
576 struct conn
, source
.in
.buffer_list
);
577 BUG_ON(rconn2
== rconn
);
578 _get_window(rconn2
, listlocked
);
581 mutex_unlock(&buffer_conn_list_lock
);
584 mutex_unlock(&bufferlimits_lock
);
589 void reset_bufferusage(struct conn
*conn
)
593 mutex_lock(&bufferlimits_lock
);
594 listlocked
= mutex_trylock(&buffer_conn_list_lock
);
595 mutex_lock(&(conn
->rcv_lock
));
597 if (conn
->sourcetype
!= SOURCE_IN
)
600 bufferusage_init
-= conn
->source
.in
.usage_init
;
601 bufferusage_speed
-= conn
->source
.in
.usage_speed
;
602 bufferusage_ata
-= conn
->source
.in
.usage_ata
;
603 bufferusage_reserve
-= conn
->source
.in
.usage_reserve
;
605 bufferassigned_init
-= conn
->source
.in
.buffer_init
;
606 bufferassigned_speed
-= conn
->source
.in
.buffer_speed
;
607 bufferassigned_ata
-= conn
->source
.in
.buffer_ata
;
609 if (listlocked
&& (conn
->source
.in
.buffer_list
.next
!= 0 ||
610 conn
->source
.in
.buffer_list
.prev
!= 0)) {
611 list_del(&(conn
->source
.in
.buffer_list
));
612 conn
->source
.in
.buffer_list
.next
= 0;
613 conn
->source
.in
.buffer_list
.prev
= 0;
614 kref_put(&(conn
->ref
), free_conn
);
618 mutex_unlock(&(conn
->rcv_lock
));
620 mutex_unlock(&buffer_conn_list_lock
);
621 mutex_unlock(&bufferlimits_lock
);
624 void refresh_speedstat(struct conn
*rconn
, __u32 written
)
626 unsigned long iflags
;
627 unsigned long jiffies_tmp
;
629 spin_lock_irqsave(&st_lock
, iflags
);
631 jiffies_tmp
= jiffies
;
633 if (rconn
->source
.in
.st
.jiffies_last_update
!= jiffies_tmp
)
634 get_speed(&(rconn
->source
.in
.st
), jiffies_tmp
);
635 if (rconn
->source
.in
.st
.bytes_curr
+ written
< written
)
636 rconn
->source
.in
.st
.bytes_curr
= -1;
637 rconn
->source
.in
.st
.bytes_curr
+= written
;
639 if (st
.jiffies_last_update
!= jiffies_tmp
)
640 get_speed(&st
, jiffies_tmp
);
641 if (st
.bytes_curr
+ written
< written
)
643 st
.bytes_curr
+= written
;
645 spin_unlock_irqrestore(&st_lock
, iflags
);
648 void drain_ooo_queue(struct conn
*rconn
)
652 BUG_ON(SOURCE_IN
!= rconn
->sourcetype
);
654 skb
= rconn
->source
.in
.reorder_queue
.next
;
656 while ((void *) skb
!= (void *) &(rconn
->source
.in
.reorder_queue
)) {
657 struct skb_procstate
*ps
= skb_pstate(skb
);
660 if (rconn
->source
.in
.next_seqno
!= ps
->funcstate
.rcv2
.seqno
)
663 drop
= receive_skb(rconn
, skb
);
667 skb_unlink(skb
, &(rconn
->source
.in
.reorder_queue
));
668 rconn
->source
.in
.ooo_packets
--;
669 atomic_dec(&(rconn
->source
.in
.nb
->ooo_packets
));
670 atomic_dec(&ooo_packets
);
672 rconn
->source
.in
.next_seqno
+= skb
->len
;
676 static int _conn_rcv_ooo(struct conn
*rconn
, struct sk_buff
*skb
)
678 struct skb_procstate
*ps
= skb_pstate(skb
);
679 struct sk_buff_head
*reorder_queue
= &(rconn
->source
.in
.reorder_queue
);
680 struct sk_buff
*curr
= reorder_queue
->next
;
684 rconn
->source
.in
.ooo_packets
++;
685 if (rconn
->source
.in
.ooo_packets
> MAX_TOTAL_OOO_PER_CONN
)
688 ooo
= atomic_inc_return(&(rconn
->source
.in
.nb
->ooo_packets
));
689 if (ooo
> MAX_TOTAL_OOO_PER_NEIGH
)
692 ooo
= atomic_inc_return(&ooo_packets
);
693 if (ooo
> MAX_TOTAL_OOO_PACKETS
)
698 struct skb_procstate
*ps2
= skb_pstate(curr
);
700 if ((void *) curr
== (void *) reorder_queue
) {
701 skb_queue_tail(reorder_queue
, skb
);
705 if (ps
->funcstate
.rcv2
.seqno
> ps2
->funcstate
.rcv2
.seqno
) {
706 skb_insert(curr
, skb
, reorder_queue
);
715 atomic_dec(&ooo_packets
);
717 atomic_dec(&(rconn
->source
.in
.nb
->ooo_packets
));
719 rconn
->source
.in
.ooo_packets
--;
727 static void _conn_rcv(struct conn
*rconn
, struct sk_buff
*skb
)
729 struct skb_procstate
*ps
= skb_pstate(skb
);
730 struct control_msg_out
*cm
= alloc_control_msg(rconn
->source
.in
.nb
,
737 __u32 len
= skb
->len
;
739 BUG_ON(rconn
->sourcetype
!= SOURCE_IN
);
741 if (unlikely(cm
== 0)) {
746 mutex_lock(&(rconn
->rcv_lock
));
748 in_order
= (rconn
->source
.in
.next_seqno
== ps
->funcstate
.rcv2
.seqno
);
751 drop
= _conn_rcv_ooo(rconn
, skb
);
753 rconn
->source
.in
.next_seqno
+= skb
->len
;
754 drop
= receive_skb(rconn
, skb
);
759 free_control_msg(cm
);
763 #warning todo balance credits
766 send_ack_conn_ooo(cm
, rconn
,
767 rconn
->reversedir
->target
.out
.conn_id
,
768 rconn
->source
.in
.next_seqno
,
769 ps
->funcstate
.rcv2
.seqno
, len
);
771 drain_ooo_queue(rconn
);
772 send_ack_conn(cm
, rconn
, rconn
->reversedir
->target
.out
.conn_id
,
773 rconn
->source
.in
.next_seqno
);
777 mutex_unlock(&(rconn
->rcv_lock
));
782 static void conn_rcv(struct sk_buff
*skb
, __u32 conn_id
, __u32 seqno
)
785 struct skb_procstate
*ps
= skb_pstate(skb
);
787 ps
->funcstate
.rcv2
.seqno
= seqno
;
789 rconn
= get_conn(conn_id
);
791 if (unlikely(rconn
== 0)) {
792 printk(KERN_DEBUG
"unknown conn_id when receiving: %d", conn_id
);
796 _conn_rcv(rconn
, skb
);
797 kref_put(&(rconn
->ref
), free_conn
);
800 void conn_rcv_buildskb(char *data
, __u32 datalen
, __u32 conn_id
, __u32 seqno
)
802 struct sk_buff
*skb
= alloc_skb(datalen
, GFP_KERNEL
);
803 char *dst
= skb_put(skb
, datalen
);
804 memcpy(dst
, data
, datalen
);
805 conn_rcv(skb
, conn_id
, seqno
);
808 static void rcv_data(struct sk_buff
*skb
)
813 char *connid_p
= cor_pull_skb(skb
, 4);
814 char *seqno_p
= cor_pull_skb(skb
, 4);
818 ((char *)&conn_id
)[0] = connid_p
[0];
819 ((char *)&conn_id
)[1] = connid_p
[1];
820 ((char *)&conn_id
)[2] = connid_p
[2];
821 ((char *)&conn_id
)[3] = connid_p
[3];
823 ((char *)&seqno
)[0] = seqno_p
[0];
824 ((char *)&seqno
)[1] = seqno_p
[1];
825 ((char *)&seqno
)[2] = seqno_p
[2];
826 ((char *)&seqno
)[3] = seqno_p
[3];
828 conn_id
= be32_to_cpu(conn_id
);
829 seqno
= be32_to_cpu(seqno
);
831 /* get_random_bytes(&rand, 1);
834 printk(KERN_ERR "drop %d %d %d %d %d", conn_id, seqno_p[0],
835 seqno_p[1], seqno_p[2], seqno_p[3]);
840 struct neighbor
*nb
= get_neigh_by_mac(skb
);
841 if (unlikely(nb
== 0))
843 kernel_packet(nb
, skb
, seqno
);
844 kref_put(&(nb
->ref
), neighbor_free
);
846 conn_rcv(skb
, conn_id
, seqno
);
855 static void rcv(struct work_struct
*work
)
857 struct sk_buff
*skb
= skb_from_pstate(container_of(work
,
858 struct skb_procstate
, funcstate
.rcv
.work
));
863 atomic_dec(&packets_in_workqueue
);
865 packet_type_p
= cor_pull_skb(skb
, 1);
867 if (unlikely(packet_type_p
== 0))
870 packet_type
= *packet_type_p
;
872 if (packet_type
== PACKET_TYPE_ANNOUNCE
) {
877 if (unlikely(packet_type
!= PACKET_TYPE_DATA
))
888 static int queue_rcv_processing(struct sk_buff
*skb
, struct net_device
*dev
,
889 struct packet_type
*pt
, struct net_device
*orig_dev
)
891 struct skb_procstate
*ps
= skb_pstate(skb
);
894 if (skb
->pkt_type
== PACKET_OTHERHOST
)
897 BUG_ON(skb
->next
!= 0);
899 queuelen
= atomic_inc_return(&packets_in_workqueue
);
901 BUG_ON(queuelen
<= 0);
903 if (queuelen
> MAX_PACKETS_IN_RCVQUEUE
) {
904 atomic_dec(&packets_in_workqueue
);
908 INIT_WORK(&(ps
->funcstate
.rcv
.work
), rcv
);
909 queue_work(packet_wq
, &(ps
->funcstate
.rcv
.work
));
910 return NET_RX_SUCCESS
;
917 static struct packet_type ptype_cor
= {
918 .type
= htons(ETH_P_COR
),
920 .func
= queue_rcv_processing
923 int __init
cor_rcv_init(void)
925 bufferassigned_init
= 0;
926 bufferassigned_speed
= 0;
927 bufferassigned_ata
= 0;
929 bufferusage_init
= 0;
930 bufferusage_speed
= 0;
932 bufferusage_reserve
= 0;
934 spin_lock_init(&st_lock
);
935 memset(&st
, 0, sizeof(struct speedtracker
));
937 BUG_ON(sizeof(struct skb_procstate
) > 48);
938 packet_wq
= create_workqueue("cor_packet");
939 outofbufferspace_wq
= create_workqueue("cor_outofbufferspace");
940 spin_lock_init(&oobss_lock
);
941 outofbufferspace_scheduled
= 0;
943 dev_add_pack(&ptype_cor
);
947 MODULE_LICENSE("GPL");