2 * Connection oriented routing
3 * Copyright (C) 2007-2010 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <linux/module.h>
22 #include <linux/version.h>
23 #include <linux/kernel.h>
24 #include <linux/init.h>
30 atomic_t packets_in_workqueue
= ATOMIC_INIT(0);
32 atomic_t ooo_packets
= ATOMIC_INIT(0);
34 static struct workqueue_struct
*packet_wq
;
36 static struct workqueue_struct
*outofbufferspace_wq
;
37 struct work_struct outofbufferspace_work
;
38 spinlock_t oobss_lock
;
39 int outofbufferspace_scheduled
;
42 * buffering space is divided in 4 areas:
45 * distributed equally among all conns; shrinks and grows immediately when there
46 * are new connections or connections are reset
49 * distributed proportional to speed; shrinks and grows constantly
52 * used to make noise to make traffic analysis harder; may only shrink if data
53 * is either send on or "stuck" for a long time
56 * reserve in case where sudden shrinking causes some connections to contain
57 * more old data than allowed by the first 3 buffers. If this area is full, the
58 * out-of-memory conn-resetter is triggered
60 * Each area commits a certain amount of space to each connection. This is the
61 * maximum buffer space a connection is allowed to use. The space of a specific
62 * connection is first accounted to buffer_ata. If the buffer space allowed to
63 * use is exceeded, the rest is accounted to buffer_speed and then buffer_init.
64 * The reserve area will be used last. This should only be the case, if the
65 * assigned buffer space of the first 3 areas shrink suddenly. If this area is
66 * also used up, connections will be reset.
69 DEFINE_MUTEX(buffer_conn_list_lock
);
70 LIST_HEAD(buffer_conn_list
);
73 * used to buffer inserts when main list is locked, moved to main list, after
74 * processing of main list finishes
76 LIST_HEAD(buffer_conn_tmp_list
); /* protected by bufferlimits_lock */
79 DEFINE_MUTEX(bufferlimits_lock
);
81 static __u64 bufferassigned_init
;
82 static __u64 bufferassigned_speed
;
83 static __u64 bufferassigned_ata
;
84 static __u64 bufferassigned_reserve
;
86 static __u64 bufferusage_init
;
87 static __u64 bufferusage_speed
;
88 static __u64 bufferusage_ata
;
89 static __u64 bufferusage_reserve
;
92 struct speedtracker st
;
94 static __u64
desired_bufferusage(__u64 assigned
, __u64 usage
, __u64 assignlimit
,
100 if (unlikely(assignlimit
< usagelimit
))
101 assignlimit
= usagelimit
;
103 if (multiply_div(usage
, 9, 10) > usagelimit
)
104 return multiply_div(usagelimit
, 9, 10);
106 load
= multiply_div(usage
, 192, usagelimit
);
108 /* slow limit increase, fast decrease */
111 } else if (load
< 128) {
113 return multiply_div(usagelimit
, 9, 10);
117 ret
= multiply_div(assigned
, 128, load
);
119 ret
= multiply_div(assigned
, 128, load
+ load
- 128);
122 if (ret
> assignlimit
)
127 static __u64
get_speed(struct speedtracker
*st
, unsigned long jiffies_tmp
)
129 if (unlikely(time_after(st
->jiffies_last_update
, jiffies_tmp
) ||
130 time_before(st
->jiffies_last_update
+ HZ
*10,
132 st
->jiffies_last_update
= jiffies_tmp
;
138 for (;time_before(st
->jiffies_last_update
, jiffies_tmp
);
139 st
->jiffies_last_update
++) {
140 __u32 bytes_curr
= 0;
141 if ((st
->jiffies_last_update
+ 1) == jiffies_tmp
) {
142 bytes_curr
= st
->bytes_curr
;
145 st
->speed
= (st
->speed
* (HZ
-1) + (((__u64
)bytes_curr
)<<16))/HZ
;
148 if ((st
->jiffies_last_update
+ 1) == jiffies_tmp
) {
149 st
->jiffies_last_update
++;
152 return (st
->speed
* (HZ
*2 - 1) + (((__u64
)st
->bytes_curr
) << 17))/HZ
/2;
156 * val1[0], val2[2], res[0] ... least significant
157 * val1[val1len-1], val2[val2len-1], res[reslen-1] ... most significant
159 static void mul(__u32
*val1
, unsigned int val1len
, __u32
*val2
,
160 unsigned int val2len
, __u32
*res
, int reslen
)
162 int digits
= val1len
+ val2len
;
166 BUG_ON(val1len
> 0 && val2len
> 0 && reslen
< digits
);
168 memset(res
, 0, reslen
);
170 if (val1len
== 0 || val2len
== 0)
173 for(i
=0;i
<digits
;i
++) {
175 res
[i
] = (__u32
) overflow
;
176 overflow
= overflow
>> 32;
177 for(idx1
=0;idx1
<val1len
&& idx1
<=i
;idx1
++) {
184 tmpres
= ((__u64
) (val1
[idx1
])) *
185 ((__u64
) (val2
[idx2
]));
186 overflow
+= tmpres
>> 32;
187 tmpres
= (tmpres
<< 32) >> 32;
188 if (res
[i
] + tmpres
< res
[i
])
193 BUG_ON(overflow
!= 0);
198 * 1 == usage1/speed1 offends more
199 * 0 == both offend the same
200 * -1 == usage2/speed2 offends more
202 static int compare_scores(__u32 usage1
, __u64 speed1
, __u32 usage2
,
207 __u32 speed1squared
[4];
208 __u32 speed2squared
[4];
210 __u32 speed1squared_usage2
[5];
211 __u32 speed2squared_usage1
[5];
217 speed1_tmp
[0] = (speed1
<< 32) >> 32;
218 speed1_tmp
[1] = (speed1
>> 32);
219 speed2_tmp
[0] = (speed2
<< 32) >> 32;
220 speed2_tmp
[1] = (speed2
<< 32);
222 mul(speed1_tmp
, 2, speed1_tmp
, 2, speed1squared
,4);
223 mul(speed2_tmp
, 2, speed2_tmp
, 2, speed2squared
, 4);
225 mul(speed1squared
, 4, &usage2
, 1, speed1squared_usage2
, 5);
226 mul(speed2squared
, 4, &usage2
, 1, speed2squared_usage1
, 5);
229 if (speed1squared_usage2
[i
] > speed2squared_usage1
[i
])
231 if (speed1squared_usage2
[i
] < speed2squared_usage1
[i
])
238 #define OUTOFBUFFERSPACE_OFFENDERS 10
239 static void _outofbufferspace(void)
243 struct list_head
*curr
;
244 struct conn
*offendingconns
[OUTOFBUFFERSPACE_OFFENDERS
];
245 __u32 offendingusage
[OUTOFBUFFERSPACE_OFFENDERS
];
246 __u64 offendingspeed
[OUTOFBUFFERSPACE_OFFENDERS
];
248 memset(&offendingconns
, 0, sizeof(offendingconns
));
250 mutex_lock(&buffer_conn_list_lock
);
252 curr
= buffer_conn_list
.next
;
253 while (curr
!= &buffer_conn_list
) {
254 unsigned long iflags
;
256 struct conn
*conn
= container_of(curr
, struct conn
,
257 source
.in
.buffer_list
);
264 BUG_ON(conn
->sourcetype
!= SOURCE_IN
);
266 usage
= atomic_read(&(conn
->source
.in
.usage_reserve
));
268 spin_lock_irqsave(&st_lock
, iflags
);
269 speed
= get_speed(&(conn
->source
.in
.st
), jiffies
);
270 spin_unlock_irqrestore(&st_lock
, iflags
);
272 if (offendingconns
[OUTOFBUFFERSPACE_OFFENDERS
-1] != 0 &&
274 offendingusage
[OUTOFBUFFERSPACE_OFFENDERS
-1],
275 offendingspeed
[OUTOFBUFFERSPACE_OFFENDERS
-1],
279 offendingconns
[OUTOFBUFFERSPACE_OFFENDERS
-1] = conn
;
280 offendingusage
[OUTOFBUFFERSPACE_OFFENDERS
-1] = usage
;
281 offendingspeed
[OUTOFBUFFERSPACE_OFFENDERS
-1] = speed
;
283 for (i
=OUTOFBUFFERSPACE_OFFENDERS
-2;i
>=0;i
++) {
284 struct conn
*tmpconn
;
288 if (offendingconns
[i
] != 0 && compare_scores(
289 offendingusage
[i
], offendingspeed
[i
],
291 offendingspeed
[i
+1]) >= 0)
294 tmpconn
= offendingconns
[i
];
295 usage_tmp
= offendingusage
[i
];
296 speed_tmp
= offendingspeed
[i
];
298 offendingconns
[i
] = offendingconns
[i
+1];
299 offendingusage
[i
] = offendingusage
[i
+1];
300 offendingspeed
[i
] = offendingspeed
[i
+1];
302 offendingconns
[i
+1] = tmpconn
;
303 offendingusage
[i
+1] = usage_tmp
;
304 offendingspeed
[i
+1] = speed_tmp
;
309 for (i
=0;i
<OUTOFBUFFERSPACE_OFFENDERS
;i
++) {
310 kref_get(&(offendingconns
[i
]->ref
));
313 mutex_unlock(&buffer_conn_list_lock
);
315 for (i
=0;i
<OUTOFBUFFERSPACE_OFFENDERS
;i
++) {
317 mutex_lock(&bufferlimits_lock
);
318 resetneeded
= ((bufferusage_reserve
*4)/3 > BUFFERSPACE_RESERVE
);
319 mutex_unlock(&bufferlimits_lock
);
321 if (resetneeded
== 0)
324 reset_conn(offendingconns
[i
]);
325 kref_put(&(offendingconns
[i
]->ref
), free_conn
);
328 mutex_lock(&bufferlimits_lock
);
329 mutex_lock(&buffer_conn_list_lock
);
330 while(list_empty(&buffer_conn_tmp_list
) == 0) {
331 curr
= buffer_conn_tmp_list
.next
;
333 list_add(curr
, &buffer_conn_list
);
335 mutex_unlock(&buffer_conn_list_lock
);
336 mutex_unlock(&bufferlimits_lock
);
339 static void outofbufferspace(struct work_struct
*work
)
342 unsigned long iflags
;
345 mutex_lock(&bufferlimits_lock
);
346 spin_lock_irqsave(&oobss_lock
, iflags
);
347 resetneeded
= (bufferusage_reserve
> BUFFERSPACE_RESERVE
);
349 if (resetneeded
== 0)
350 outofbufferspace_scheduled
= 0;
352 spin_unlock_irqrestore(&oobss_lock
, iflags
);
353 mutex_unlock(&bufferlimits_lock
);
355 if (resetneeded
== 0)
362 static void refresh_bufferusage(struct conn
*rconn
)
364 BUG_ON(rconn
->sourcetype
!= SOURCE_IN
);
366 bufferusage_init
-= rconn
->source
.in
.usage_init
;
367 bufferusage_speed
-= rconn
->source
.in
.usage_speed
;
368 bufferusage_ata
-= rconn
->source
.in
.usage_ata
;
369 bufferusage_reserve
-= atomic_read(&(rconn
->source
.in
.usage_reserve
));
371 rconn
->source
.in
.usage_ata
= rconn
->buf
.totalsize
;
372 if (rconn
->source
.in
.usage_ata
> rconn
->source
.in
.buffer_ata
)
373 rconn
->source
.in
.usage_ata
= rconn
->source
.in
.buffer_ata
;
376 if (rconn
->source
.in
.usage_ata
== rconn
->buf
.totalsize
)
377 rconn
->source
.in
.usage_speed
= 0;
379 rconn
->source
.in
.usage_speed
= rconn
->buf
.totalsize
-
380 rconn
->source
.in
.usage_ata
;
382 if (rconn
->source
.in
.usage_speed
> rconn
->source
.in
.buffer_speed
)
383 rconn
->source
.in
.usage_speed
= rconn
->source
.in
.buffer_speed
;
386 if ((rconn
->source
.in
.usage_ata
+ rconn
->source
.in
.usage_speed
) ==
387 rconn
->buf
.totalsize
)
388 rconn
->source
.in
.usage_init
= 0;
390 rconn
->source
.in
.usage_init
= rconn
->buf
.totalsize
-
391 rconn
->source
.in
.usage_ata
-
392 rconn
->source
.in
.usage_speed
;
394 if (rconn
->source
.in
.usage_init
> rconn
->source
.in
.buffer_init
)
395 rconn
->source
.in
.usage_init
= rconn
->source
.in
.buffer_init
;
398 if ((rconn
->source
.in
.usage_ata
+ rconn
->source
.in
.usage_speed
+
399 rconn
->source
.in
.usage_init
) == rconn
->buf
.totalsize
)
400 atomic_set(&(rconn
->source
.in
.usage_reserve
), 0);
402 atomic_set(&(rconn
->source
.in
.usage_reserve
),
403 rconn
->buf
.totalsize
-
404 rconn
->source
.in
.usage_ata
-
405 rconn
->source
.in
.usage_speed
-
406 rconn
->source
.in
.usage_init
);
408 bufferusage_init
+= rconn
->source
.in
.usage_init
;
409 bufferusage_speed
+= rconn
->source
.in
.usage_speed
;
410 bufferusage_ata
+= rconn
->source
.in
.usage_ata
;
411 bufferusage_reserve
+= atomic_read(&(rconn
->source
.in
.usage_reserve
));
413 if (bufferusage_reserve
> BUFFERSPACE_RESERVE
) {
414 unsigned long iflags
;
415 spin_lock_irqsave(&oobss_lock
, iflags
);
416 if (outofbufferspace_scheduled
== 0) {
417 INIT_WORK(&outofbufferspace_work
, outofbufferspace
);
418 queue_work(outofbufferspace_wq
, &outofbufferspace_work
);
421 spin_unlock_irqrestore(&oobss_lock
, iflags
);
425 static __u32
__get_window(struct conn
*rconn
)
429 if (atomic_read(&(rconn
->source
.in
.usage_reserve
)) != 0)
432 BUG_ON(rconn
->source
.in
.usage_init
> rconn
->source
.in
.buffer_init
);
433 BUG_ON(rconn
->source
.in
.usage_speed
> rconn
->source
.in
.buffer_speed
);
434 BUG_ON(rconn
->source
.in
.usage_ata
> rconn
->source
.in
.buffer_ata
);
436 window
+= rconn
->source
.in
.buffer_init
;
437 window
+= rconn
->source
.in
.buffer_speed
;
438 window
+= rconn
->source
.in
.buffer_ata
;
440 window
-= rconn
->source
.in
.usage_init
;
441 window
-= rconn
->source
.in
.usage_speed
;
442 window
-= rconn
->source
.in
.usage_ata
;
444 if (window
> MAX_ANNOUNCE_WINDOW
)
445 window
= MAX_ANNOUNCE_WINDOW
;
450 static __u32
_get_window(struct conn
*rconn
, int listlocked
)
452 unsigned long iflags
;
454 unsigned long jiffies_tmp
;
459 __u64 bufferlimit_init
;
460 __u64 connlimit_init
;
463 __u64 bufferlimit_speed
;
464 __u64 connlimit_speed
;
466 mutex_lock(&(rconn
->rcv_lock
));
468 BUG_ON(rconn
->sourcetype
!= SOURCE_IN
);
470 if (atomic_read(&(rconn
->isreset
)) != 0) {
471 if (listlocked
&& (rconn
->source
.in
.buffer_list
.next
!= 0 ||
472 rconn
->source
.in
.buffer_list
.prev
!= 0))
473 list_del(&(rconn
->source
.in
.buffer_list
));
474 rconn
->source
.in
.buffer_list
.next
= 0;
475 rconn
->source
.in
.buffer_list
.prev
= 0;
476 kref_put(&(rconn
->ref
), free_conn
);
481 if (rconn
->source
.in
.buffer_list
.next
!= 0 &&
482 rconn
->source
.in
.buffer_list
.prev
!= 0) {
483 list_del(&(rconn
->source
.in
.buffer_list
));
485 kref_get(&(rconn
->ref
));
487 list_add_tail(&(rconn
->source
.in
.buffer_list
),
489 } else if (rconn
->source
.in
.buffer_list
.next
!= 0 ||
490 rconn
->source
.in
.buffer_list
.prev
!= 0) {
491 kref_get(&(rconn
->ref
));
492 list_add_tail(&(rconn
->source
.in
.buffer_list
),
493 &buffer_conn_tmp_list
);
497 conns
= atomic_read(&num_conns
);
498 bufferlimit_init
= desired_bufferusage(bufferassigned_init
,
499 bufferusage_init
, BUFFERASSIGN_INIT
, BUFFERSPACE_INIT
);
500 connlimit_init
= (bufferlimit_init
+ conns
- 1) / conns
;
502 bufferassigned_init
-= rconn
->source
.in
.buffer_init
;
503 if (((__u32
) connlimit_init
) != connlimit_init
)
504 rconn
->source
.in
.buffer_init
= -1;
506 rconn
->source
.in
.buffer_init
= (__u32
) connlimit_init
;
507 bufferassigned_init
+= rconn
->source
.in
.buffer_init
;
510 spin_lock_irqsave(&st_lock
, iflags
);
511 jiffies_tmp
= jiffies
;
512 totalspeed
= get_speed(&st
, jiffies_tmp
);
513 bufferlimit_speed
= desired_bufferusage(bufferassigned_speed
,
514 bufferusage_speed
, BUFFERASSIGN_SPEED
,
516 connlimit_speed
= multiply_div(bufferlimit_speed
,
517 get_speed(&(rconn
->source
.in
.st
), jiffies_tmp
),
519 spin_unlock_irqrestore(&st_lock
, iflags
);
521 bufferassigned_speed
-= rconn
->source
.in
.buffer_speed
;
522 if (((__u32
) connlimit_speed
) != connlimit_speed
)
523 rconn
->source
.in
.buffer_speed
= -1;
525 rconn
->source
.in
.buffer_speed
= (__u32
) connlimit_speed
;
526 bufferassigned_speed
+= rconn
->source
.in
.buffer_speed
;
528 refresh_bufferusage(rconn
);
530 window
= __get_window(rconn
);
532 mutex_unlock(&(rconn
->rcv_lock
));
537 /* do not hold rcv_lock while calling this */
538 __u32
get_window(struct conn
*rconn
)
545 mutex_lock(&bufferlimits_lock
);
546 listlocked
= mutex_trylock(&buffer_conn_list_lock
);
548 window
= _get_window(rconn
, listlocked
);
552 * refresh window of idle conns as well to keep global counters
556 rconn2
= container_of(buffer_conn_list
.next
, struct conn
,
557 source
.in
.buffer_list
);
559 if (list_empty(&buffer_conn_list
) == 0 && rconn2
!= rconn
)
560 _get_window(rconn2
, listlocked
);
563 if (list_empty(&buffer_conn_tmp_list
) == 0) {
564 rconn2
= container_of(buffer_conn_tmp_list
.next
,
565 struct conn
, source
.in
.buffer_list
);
566 BUG_ON(rconn2
== rconn
);
567 _get_window(rconn2
, listlocked
);
570 mutex_unlock(&buffer_conn_list_lock
);
573 mutex_unlock(&bufferlimits_lock
);
578 void refresh_speedstat(struct conn
*rconn
, __u32 written
)
580 unsigned long iflags
;
581 unsigned long jiffies_tmp
;
583 spin_lock_irqsave(&st_lock
, iflags
);
585 jiffies_tmp
= jiffies
;
587 if (rconn
->source
.in
.st
.jiffies_last_update
!= jiffies_tmp
)
588 get_speed(&(rconn
->source
.in
.st
), jiffies_tmp
);
589 if (rconn
->source
.in
.st
.bytes_curr
+ written
< written
)
590 rconn
->source
.in
.st
.bytes_curr
= -1;
591 rconn
->source
.in
.st
.bytes_curr
+= written
;
593 if (st
.jiffies_last_update
!= jiffies_tmp
)
594 get_speed(&st
, jiffies_tmp
);
595 if (st
.bytes_curr
+ written
< written
)
597 st
.bytes_curr
+= written
;
599 spin_unlock_irqrestore(&st_lock
, iflags
);
602 void drain_ooo_queue(struct conn
*rconn
)
606 BUG_ON(SOURCE_IN
!= rconn
->sourcetype
);
608 skb
= rconn
->source
.in
.reorder_queue
.next
;
610 while ((void *) skb
!= (void *) &(rconn
->source
.in
.reorder_queue
)) {
611 struct skb_procstate
*ps
= skb_pstate(skb
);
614 if (rconn
->source
.in
.next_seqno
!= ps
->funcstate
.rcv2
.seqno
)
617 drop
= receive_skb(rconn
, skb
);
621 skb_unlink(skb
, &(rconn
->source
.in
.reorder_queue
));
622 rconn
->source
.in
.ooo_packets
--;
623 atomic_dec(&(rconn
->source
.in
.nb
->ooo_packets
));
624 atomic_dec(&ooo_packets
);
626 rconn
->source
.in
.next_seqno
+= skb
->len
;
630 static int _conn_rcv_ooo(struct conn
*rconn
, struct sk_buff
*skb
)
632 struct skb_procstate
*ps
= skb_pstate(skb
);
633 struct sk_buff_head
*reorder_queue
= &(rconn
->source
.in
.reorder_queue
);
634 struct sk_buff
*curr
= reorder_queue
->next
;
638 rconn
->source
.in
.ooo_packets
++;
639 if (rconn
->source
.in
.ooo_packets
> MAX_TOTAL_OOO_PER_CONN
)
642 ooo
= atomic_inc_return(&(rconn
->source
.in
.nb
->ooo_packets
));
643 if (ooo
> MAX_TOTAL_OOO_PER_NEIGH
)
646 ooo
= atomic_inc_return(&ooo_packets
);
647 if (ooo
> MAX_TOTAL_OOO_PACKETS
)
652 struct skb_procstate
*ps2
= skb_pstate(curr
);
654 if ((void *) curr
== (void *) reorder_queue
) {
655 skb_queue_tail(reorder_queue
, skb
);
659 if (ps
->funcstate
.rcv2
.seqno
> ps2
->funcstate
.rcv2
.seqno
) {
660 skb_insert(curr
, skb
, reorder_queue
);
669 atomic_dec(&ooo_packets
);
671 atomic_dec(&(rconn
->source
.in
.nb
->ooo_packets
));
673 rconn
->source
.in
.ooo_packets
--;
681 static void _conn_rcv(struct conn
*rconn
, struct sk_buff
*skb
)
683 struct skb_procstate
*ps
= skb_pstate(skb
);
684 struct control_msg_out
*cm
= alloc_control_msg(rconn
->source
.in
.nb
,
690 __u32 len
= skb
->len
;
692 BUG_ON(rconn
->sourcetype
!= SOURCE_IN
);
694 if (unlikely(cm
== 0)) {
699 mutex_lock(&(rconn
->rcv_lock
));
701 in_order
= (rconn
->source
.in
.next_seqno
== ps
->funcstate
.rcv2
.seqno
);
704 drop
= _conn_rcv_ooo(rconn
, skb
);
706 rconn
->source
.in
.next_seqno
+= skb
->len
;
707 drop
= receive_skb(rconn
, skb
);
712 free_control_msg(cm
);
716 #warning todo balance credits
718 send_ack_conn_ooo(cm
, rconn
,
719 rconn
->reversedir
->target
.out
.conn_id
,
720 rconn
->source
.in
.next_seqno
,
721 ps
->funcstate
.rcv2
.seqno
, len
);
723 drain_ooo_queue(rconn
);
724 send_ack_conn(cm
, rconn
, rconn
->reversedir
->target
.out
.conn_id
,
725 rconn
->source
.in
.next_seqno
);
729 mutex_unlock(&(rconn
->rcv_lock
));
732 static void conn_rcv(struct sk_buff
*skb
, __u32 conn_id
, __u32 seqno
)
735 struct skb_procstate
*ps
= skb_pstate(skb
);
737 ps
->funcstate
.rcv2
.seqno
= seqno
;
739 rconn
= get_conn(conn_id
);
741 if (unlikely(rconn
== 0)) {
742 printk(KERN_DEBUG
"unknown conn_id when receiving: %d", conn_id
);
746 _conn_rcv(rconn
, skb
);
747 kref_put(&(rconn
->ref
), free_conn
);
750 void conn_rcv_buildskb(char *data
, __u32 datalen
, __u32 conn_id
, __u32 seqno
)
752 struct sk_buff
*skb
= alloc_skb(datalen
, GFP_KERNEL
);
753 char *dst
= skb_put(skb
, datalen
);
754 memcpy(dst
, data
, datalen
);
755 conn_rcv(skb
, conn_id
, seqno
);
758 static void rcv_data(struct sk_buff
*skb
)
763 char *connid_p
= cor_pull_skb(skb
, 4);
764 char *seqno_p
= cor_pull_skb(skb
, 4);
768 ((char *)&conn_id
)[0] = connid_p
[0];
769 ((char *)&conn_id
)[1] = connid_p
[1];
770 ((char *)&conn_id
)[2] = connid_p
[2];
771 ((char *)&conn_id
)[3] = connid_p
[3];
773 ((char *)&seqno
)[0] = seqno_p
[0];
774 ((char *)&seqno
)[1] = seqno_p
[1];
775 ((char *)&seqno
)[2] = seqno_p
[2];
776 ((char *)&seqno
)[3] = seqno_p
[3];
778 conn_id
= be32_to_cpu(conn_id
);
779 seqno
= be32_to_cpu(seqno
);
781 /* get_random_bytes(&rand, 1);
784 printk(KERN_ERR "drop %d %d %d %d %d", conn_id, seqno_p[0],
785 seqno_p[1], seqno_p[2], seqno_p[3]);
790 struct neighbor
*nb
= get_neigh_by_mac(skb
);
791 if (unlikely(nb
== 0))
793 kernel_packet(nb
, skb
, seqno
);
794 kref_put(&(nb
->ref
), neighbor_free
);
796 conn_rcv(skb
, conn_id
, seqno
);
805 static void rcv(struct work_struct
*work
)
807 struct sk_buff
*skb
= skb_from_pstate(container_of(work
,
808 struct skb_procstate
, funcstate
.rcv
.work
));
813 atomic_dec(&packets_in_workqueue
);
815 packet_type_p
= cor_pull_skb(skb
, 1);
817 if (unlikely(packet_type_p
== 0))
820 packet_type
= *packet_type_p
;
822 if (packet_type
== PACKET_TYPE_ANNOUNCE
) {
827 if (unlikely(packet_type
!= PACKET_TYPE_DATA
))
838 static int queue_rcv_processing(struct sk_buff
*skb
, struct net_device
*dev
,
839 struct packet_type
*pt
, struct net_device
*orig_dev
)
841 struct skb_procstate
*ps
= skb_pstate(skb
);
844 if (skb
->pkt_type
== PACKET_OTHERHOST
)
847 BUG_ON(skb
->next
!= 0);
849 queuelen
= atomic_inc_return(&packets_in_workqueue
);
851 BUG_ON(queuelen
<= 0);
853 if (queuelen
> MAX_PACKETS_IN_RCVQUEUE
) {
854 atomic_dec(&packets_in_workqueue
);
858 INIT_WORK(&(ps
->funcstate
.rcv
.work
), rcv
);
859 queue_work(packet_wq
, &(ps
->funcstate
.rcv
.work
));
860 return NET_RX_SUCCESS
;
867 static struct packet_type ptype_cor
= {
868 .type
= htons(ETH_P_COR
),
870 .func
= queue_rcv_processing
873 int __init
cor_rcv_init(void)
875 bufferassigned_init
= 0;
876 bufferassigned_speed
= 0;
877 bufferassigned_ata
= 0;
878 bufferassigned_reserve
= 0;
880 bufferusage_init
= 0;
881 bufferusage_speed
= 0;
883 bufferusage_reserve
= 0;
885 spin_lock_init(&st_lock
);
886 memset(&st
, 0, sizeof(struct speedtracker
));
888 BUG_ON(sizeof(struct skb_procstate
) > 48);
889 packet_wq
= create_workqueue("cor_packet");
890 outofbufferspace_wq
= create_workqueue("cor_outofbufferspace");
891 spin_lock_init(&oobss_lock
);
892 outofbufferspace_scheduled
= 0;
894 dev_add_pack(&ptype_cor
);
898 MODULE_LICENSE("GPL");