buffering bugfixes
[cor_2_6_31.git] / net / cor / rcv.c
blobb21454761c7bba8c0aaa8c4cca3fae694c6a7b8e
1 /*
2 * Connection oriented routing
3 * Copyright (C) 2007-2010 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/module.h>
22 #include <linux/version.h>
23 #include <linux/kernel.h>
24 #include <linux/init.h>
25 #include <linux/in.h>
28 #include "cor.h"
30 atomic_t packets_in_workqueue = ATOMIC_INIT(0);
32 atomic_t ooo_packets = ATOMIC_INIT(0);
34 static struct workqueue_struct *packet_wq;
36 static struct workqueue_struct *outofbufferspace_wq;
37 struct work_struct outofbufferspace_work;
38 spinlock_t oobss_lock;
39 int outofbufferspace_scheduled;
41 /**
42 * buffering space is divided in 4 areas:
44 * buffer_init:
45 * distributed equally among all conns; shrinks and grows immediately when there
46 * are new connections or connections are reset
48 * buffer_speed:
49 * distributed proportional to speed; shrinks and grows constantly
51 * buffer_ata:
52 * used to make noise to make traffic analysis harder; may only shrink if data
53 * is either send on or "stuck" for a long time
55 * buffer_reserve:
56 * reserve in case where sudden shrinking causes some connections to contain
57 * more old data than allowed by the first 3 buffers. If this area is full, the
58 * out-of-memory conn-resetter is triggered
60 * Each area commits a certain amount of space to each connection. This is the
61 * maximum buffer space a connection is allowed to use. The space of a specific
62 * connection is first accounted to buffer_ata. If the buffer space allowed to
63 * use is exceeded, the rest is accounted to buffer_speed and then buffer_init.
64 * The reserve area will be used last. This should only be the case, if the
65 * assigned buffer space of the first 3 areas shrink suddenly. If this area is
66 * also used up, connections will be reset.
69 DEFINE_MUTEX(buffer_conn_list_lock);
70 LIST_HEAD(buffer_conn_list);
72 /**
73 * used to buffer inserts when main list is locked, moved to main list, after
74 * processing of main list finishes
76 LIST_HEAD(buffer_conn_tmp_list); /* protected by bufferlimits_lock */
79 DEFINE_MUTEX(bufferlimits_lock);
81 static __u64 bufferassigned_init;
82 static __u64 bufferassigned_speed;
83 static __u64 bufferassigned_ata;
84 static __u64 bufferassigned_reserve;
86 static __u64 bufferusage_init;
87 static __u64 bufferusage_speed;
88 static __u64 bufferusage_ata;
89 static __u64 bufferusage_reserve;
91 spinlock_t st_lock;
92 struct speedtracker st;
94 static __u64 desired_bufferusage(__u64 assigned, __u64 usage, __u64 assignlimit,
95 __u64 usagelimit)
97 __u64 ret;
98 __u64 load;
100 if (unlikely(assignlimit < usagelimit))
101 assignlimit = usagelimit;
103 if (multiply_div(usage, 9, 10) > usagelimit)
104 return multiply_div(usagelimit, 9, 10);
106 load = multiply_div(usage, 192, usagelimit);
108 /* slow limit increase, fast decrease */
109 if (load == 128) {
110 ret = assigned;
111 } else if (load < 128) {
112 if (load == 0)
113 return multiply_div(usagelimit, 9, 10);
115 if (load < 96)
116 load = 96;
117 ret = multiply_div(assigned, 128, load);
118 } else {
119 ret = multiply_div(assigned, 128, load + load - 128);
122 if (ret > assignlimit)
123 return assignlimit;
124 return ret;
127 static __u64 get_speed(struct speedtracker *st, unsigned long jiffies_tmp)
129 if (unlikely(time_after(st->jiffies_last_update, jiffies_tmp) ||
130 time_before(st->jiffies_last_update + HZ*10,
131 jiffies_tmp))) {
132 st->jiffies_last_update = jiffies_tmp;
133 st->speed = 0;
134 st->bytes_curr = 0;
135 return 0;
138 for (;time_before(st->jiffies_last_update, jiffies_tmp);
139 st->jiffies_last_update++) {
140 __u32 bytes_curr = 0;
141 if ((st->jiffies_last_update + 1) == jiffies_tmp) {
142 bytes_curr = st->bytes_curr;
143 st->bytes_curr = 0;
145 st->speed = (st->speed * (HZ-1) + (((__u64)bytes_curr)<<16))/HZ;
148 if ((st->jiffies_last_update + 1) == jiffies_tmp) {
149 st->jiffies_last_update++;
152 return (st->speed * (HZ*2 - 1) + (((__u64)st->bytes_curr) << 17))/HZ/2;
156 * val1[0], val2[2], res[0] ... least significant
157 * val1[val1len-1], val2[val2len-1], res[reslen-1] ... most significant
159 static void mul(__u32 *val1, unsigned int val1len, __u32 *val2,
160 unsigned int val2len, __u32 *res, int reslen)
162 int digits = val1len + val2len;
163 __u64 overflow = 0;
164 int i;
166 BUG_ON(val1len > 0 && val2len > 0 && reslen < digits);
168 memset(res, 0, reslen);
170 if (val1len == 0 || val2len == 0)
171 return;
173 for(i=0;i<digits;i++) {
174 int idx1;
175 res[i] = (__u32) overflow;
176 overflow = overflow >> 32;
177 for(idx1=0;idx1<val1len && idx1<=i;idx1++) {
178 int idx2 = i - idx1;
179 __u64 tmpres;
181 if (idx2 >= val2len)
182 continue;
184 tmpres = ((__u64) (val1[idx1])) *
185 ((__u64) (val2[idx2]));
186 overflow += tmpres >> 32;
187 tmpres = (tmpres << 32) >> 32;
188 if (res[i] + tmpres < res[i])
189 overflow++;
190 res[i] += tmpres;
193 BUG_ON(overflow != 0);
197 * return values:
198 * 1 == usage1/speed1 offends more
199 * 0 == both offend the same
200 * -1 == usage2/speed2 offends more
202 static int compare_scores(__u32 usage1, __u64 speed1, __u32 usage2,
203 __u64 speed2)
205 int i;
207 __u32 speed1squared[4];
208 __u32 speed2squared[4];
210 __u32 speed1squared_usage2[5];
211 __u32 speed2squared_usage1[5];
214 __u32 speed1_tmp[2];
215 __u32 speed2_tmp[2];
217 speed1_tmp[0] = (speed1 << 32) >> 32;
218 speed1_tmp[1] = (speed1 >> 32);
219 speed2_tmp[0] = (speed2 << 32) >> 32;
220 speed2_tmp[1] = (speed2 << 32);
222 mul(speed1_tmp, 2, speed1_tmp, 2, speed1squared,4);
223 mul(speed2_tmp, 2, speed2_tmp, 2, speed2squared, 4);
225 mul(speed1squared, 4, &usage2, 1, speed1squared_usage2, 5);
226 mul(speed2squared, 4, &usage2, 1, speed2squared_usage1, 5);
228 for(i=4;i>=0;i++) {
229 if (speed1squared_usage2[i] > speed2squared_usage1[i])
230 return -1;
231 if (speed1squared_usage2[i] < speed2squared_usage1[i])
232 return 1;
235 return 0;
238 #define OUTOFBUFFERSPACE_OFFENDERS 10
239 static void _outofbufferspace(void)
241 int i;
243 struct list_head *curr;
244 struct conn *offendingconns[OUTOFBUFFERSPACE_OFFENDERS];
245 __u32 offendingusage[OUTOFBUFFERSPACE_OFFENDERS];
246 __u64 offendingspeed[OUTOFBUFFERSPACE_OFFENDERS];
248 memset(&offendingconns, 0, sizeof(offendingconns));
250 mutex_lock(&buffer_conn_list_lock);
252 curr = buffer_conn_list.next;
253 while (curr != &buffer_conn_list) {
254 unsigned long iflags;
256 struct conn *conn = container_of(curr, struct conn,
257 source.in.buffer_list);
259 __u32 usage;
260 __u64 speed;
262 int i;
264 BUG_ON(conn->sourcetype != SOURCE_IN);
266 usage = atomic_read(&(conn->source.in.usage_reserve));
268 spin_lock_irqsave(&st_lock, iflags);
269 speed = get_speed(&(conn->source.in.st), jiffies);
270 spin_unlock_irqrestore(&st_lock, iflags);
272 if (offendingconns[OUTOFBUFFERSPACE_OFFENDERS-1] != 0 &&
273 compare_scores(
274 offendingusage[OUTOFBUFFERSPACE_OFFENDERS-1],
275 offendingspeed[OUTOFBUFFERSPACE_OFFENDERS-1],
276 usage, speed) >= 0)
277 continue;
279 offendingconns[OUTOFBUFFERSPACE_OFFENDERS-1] = conn;
280 offendingusage[OUTOFBUFFERSPACE_OFFENDERS-1] = usage;
281 offendingspeed[OUTOFBUFFERSPACE_OFFENDERS-1] = speed;
283 for (i=OUTOFBUFFERSPACE_OFFENDERS-2;i>=0;i++) {
284 struct conn *tmpconn;
285 __u32 usage_tmp;
286 __u64 speed_tmp;
288 if (offendingconns[i] != 0 && compare_scores(
289 offendingusage[i], offendingspeed[i],
290 offendingusage[i+1],
291 offendingspeed[i+1]) >= 0)
292 break;
294 tmpconn = offendingconns[i];
295 usage_tmp = offendingusage[i];
296 speed_tmp = offendingspeed[i];
298 offendingconns[i] = offendingconns[i+1];
299 offendingusage[i] = offendingusage[i+1];
300 offendingspeed[i] = offendingspeed[i+1];
302 offendingconns[i+1] = tmpconn;
303 offendingusage[i+1] = usage_tmp;
304 offendingspeed[i+1] = speed_tmp;
309 for (i=0;i<OUTOFBUFFERSPACE_OFFENDERS;i++) {
310 kref_get(&(offendingconns[i]->ref));
313 mutex_unlock(&buffer_conn_list_lock);
315 for (i=0;i<OUTOFBUFFERSPACE_OFFENDERS;i++) {
316 int resetneeded;
317 mutex_lock(&bufferlimits_lock);
318 resetneeded = ((bufferusage_reserve*4)/3 > BUFFERSPACE_RESERVE);
319 mutex_unlock(&bufferlimits_lock);
321 if (resetneeded == 0)
322 break;
324 reset_conn(offendingconns[i]);
325 kref_put(&(offendingconns[i]->ref), free_conn);
328 mutex_lock(&bufferlimits_lock);
329 mutex_lock(&buffer_conn_list_lock);
330 while(list_empty(&buffer_conn_tmp_list) == 0) {
331 curr = buffer_conn_tmp_list.next;
332 list_del(curr);
333 list_add(curr, &buffer_conn_list);
335 mutex_unlock(&buffer_conn_list_lock);
336 mutex_unlock(&bufferlimits_lock);
339 static void outofbufferspace(struct work_struct *work)
341 while (1) {
342 unsigned long iflags;
343 int resetneeded;
345 mutex_lock(&bufferlimits_lock);
346 spin_lock_irqsave(&oobss_lock, iflags);
347 resetneeded = (bufferusage_reserve > BUFFERSPACE_RESERVE);
349 if (resetneeded == 0)
350 outofbufferspace_scheduled = 0;
352 spin_unlock_irqrestore(&oobss_lock, iflags);
353 mutex_unlock(&bufferlimits_lock);
355 if (resetneeded == 0)
356 return;
358 _outofbufferspace();
362 static void refresh_bufferusage(struct conn *rconn)
364 BUG_ON(rconn->sourcetype != SOURCE_IN);
366 bufferusage_init -= rconn->source.in.usage_init;
367 bufferusage_speed -= rconn->source.in.usage_speed;
368 bufferusage_ata -= rconn->source.in.usage_ata;
369 bufferusage_reserve -= atomic_read(&(rconn->source.in.usage_reserve));
371 rconn->source.in.usage_ata = rconn->buf.totalsize;
372 if (rconn->source.in.usage_ata > rconn->source.in.buffer_ata)
373 rconn->source.in.usage_ata = rconn->source.in.buffer_ata;
376 if (rconn->source.in.usage_ata == rconn->buf.totalsize)
377 rconn->source.in.usage_speed = 0;
378 else
379 rconn->source.in.usage_speed = rconn->buf.totalsize -
380 rconn->source.in.usage_ata;
382 if (rconn->source.in.usage_speed > rconn->source.in.buffer_speed)
383 rconn->source.in.usage_speed = rconn->source.in.buffer_speed;
386 if ((rconn->source.in.usage_ata + rconn->source.in.usage_speed) ==
387 rconn->buf.totalsize)
388 rconn->source.in.usage_init = 0;
389 else
390 rconn->source.in.usage_init = rconn->buf.totalsize -
391 rconn->source.in.usage_ata -
392 rconn->source.in.usage_speed;
394 if (rconn->source.in.usage_init > rconn->source.in.buffer_init)
395 rconn->source.in.usage_init = rconn->source.in.buffer_init;
398 if ((rconn->source.in.usage_ata + rconn->source.in.usage_speed +
399 rconn->source.in.usage_init) == rconn->buf.totalsize)
400 atomic_set(&(rconn->source.in.usage_reserve), 0);
401 else
402 atomic_set(&(rconn->source.in.usage_reserve),
403 rconn->buf.totalsize -
404 rconn->source.in.usage_ata -
405 rconn->source.in.usage_speed -
406 rconn->source.in.usage_init);
408 bufferusage_init += rconn->source.in.usage_init;
409 bufferusage_speed += rconn->source.in.usage_speed;
410 bufferusage_ata += rconn->source.in.usage_ata;
411 bufferusage_reserve += atomic_read(&(rconn->source.in.usage_reserve));
413 if (bufferusage_reserve > BUFFERSPACE_RESERVE) {
414 unsigned long iflags;
415 spin_lock_irqsave(&oobss_lock, iflags);
416 if (outofbufferspace_scheduled == 0) {
417 INIT_WORK(&outofbufferspace_work, outofbufferspace);
418 queue_work(outofbufferspace_wq, &outofbufferspace_work);
421 spin_unlock_irqrestore(&oobss_lock, iflags);
425 static __u32 __get_window(struct conn *rconn)
427 __u64 window = 0;
429 if (atomic_read(&(rconn->source.in.usage_reserve)) != 0)
430 return 0;
432 BUG_ON(rconn->source.in.usage_init > rconn->source.in.buffer_init);
433 BUG_ON(rconn->source.in.usage_speed > rconn->source.in.buffer_speed);
434 BUG_ON(rconn->source.in.usage_ata > rconn->source.in.buffer_ata);
436 window += rconn->source.in.buffer_init;
437 window += rconn->source.in.buffer_speed;
438 window += rconn->source.in.buffer_ata;
440 window -= rconn->source.in.usage_init;
441 window -= rconn->source.in.usage_speed;
442 window -= rconn->source.in.usage_ata;
444 if (window > MAX_ANNOUNCE_WINDOW)
445 window = MAX_ANNOUNCE_WINDOW;
447 return window;
450 static __u32 _get_window(struct conn *rconn, int listlocked)
452 unsigned long iflags;
454 unsigned long jiffies_tmp;
456 __u32 window = 0;
458 __u32 conns;
459 __u64 bufferlimit_init;
460 __u64 connlimit_init;
462 __u64 totalspeed;
463 __u64 bufferlimit_speed;
464 __u64 connlimit_speed;
466 mutex_lock(&(rconn->rcv_lock));
468 BUG_ON(rconn->sourcetype != SOURCE_IN);
470 if (atomic_read(&(rconn->isreset)) != 0) {
471 if (listlocked && (rconn->source.in.buffer_list.next != 0 ||
472 rconn->source.in.buffer_list.prev != 0))
473 list_del(&(rconn->source.in.buffer_list));
474 rconn->source.in.buffer_list.next = 0;
475 rconn->source.in.buffer_list.prev = 0;
476 kref_put(&(rconn->ref), free_conn);
477 goto out;
480 if (listlocked){
481 if (rconn->source.in.buffer_list.next != 0 &&
482 rconn->source.in.buffer_list.prev != 0) {
483 list_del(&(rconn->source.in.buffer_list));
484 } else {
485 kref_get(&(rconn->ref));
487 list_add_tail(&(rconn->source.in.buffer_list),
488 &buffer_conn_list);
489 } else if (rconn->source.in.buffer_list.next != 0 ||
490 rconn->source.in.buffer_list.prev != 0) {
491 kref_get(&(rconn->ref));
492 list_add_tail(&(rconn->source.in.buffer_list),
493 &buffer_conn_tmp_list);
497 conns = atomic_read(&num_conns);
498 bufferlimit_init = desired_bufferusage(bufferassigned_init,
499 bufferusage_init, BUFFERASSIGN_INIT, BUFFERSPACE_INIT);
500 connlimit_init = (bufferlimit_init + conns - 1) / conns;
502 bufferassigned_init -= rconn->source.in.buffer_init;
503 if (((__u32) connlimit_init) != connlimit_init)
504 rconn->source.in.buffer_init = -1;
505 else
506 rconn->source.in.buffer_init = (__u32) connlimit_init;
507 bufferassigned_init += rconn->source.in.buffer_init;
510 spin_lock_irqsave(&st_lock, iflags);
511 jiffies_tmp = jiffies;
512 totalspeed = get_speed(&st, jiffies_tmp);
513 bufferlimit_speed = desired_bufferusage(bufferassigned_speed,
514 bufferusage_speed, BUFFERASSIGN_SPEED,
515 BUFFERSPACE_SPEED);
516 connlimit_speed = multiply_div(bufferlimit_speed,
517 get_speed(&(rconn->source.in.st), jiffies_tmp),
518 totalspeed);
519 spin_unlock_irqrestore(&st_lock, iflags);
521 bufferassigned_speed -= rconn->source.in.buffer_speed;
522 if (((__u32) connlimit_speed) != connlimit_speed)
523 rconn->source.in.buffer_speed = -1;
524 else
525 rconn->source.in.buffer_speed = (__u32) connlimit_speed;
526 bufferassigned_speed += rconn->source.in.buffer_speed;
528 refresh_bufferusage(rconn);
530 window = __get_window(rconn);
531 out:
532 mutex_unlock(&(rconn->rcv_lock));
534 return window;
537 /* do not hold rcv_lock while calling this */
538 __u32 get_window(struct conn *rconn)
540 struct conn *rconn2;
541 int listlocked;
543 __u32 window;
545 mutex_lock(&bufferlimits_lock);
546 listlocked = mutex_trylock(&buffer_conn_list_lock);
548 window = _get_window(rconn, listlocked);
550 if (listlocked){
552 * refresh window of idle conns as well to keep global counters
553 * accurate
556 rconn2 = container_of(buffer_conn_list.next, struct conn,
557 source.in.buffer_list);
559 if (list_empty(&buffer_conn_list) == 0 && rconn2 != rconn)
560 _get_window(rconn2, listlocked);
563 if (list_empty(&buffer_conn_tmp_list) == 0) {
564 rconn2 = container_of(buffer_conn_tmp_list.next,
565 struct conn, source.in.buffer_list);
566 BUG_ON(rconn2 == rconn);
567 _get_window(rconn2, listlocked);
570 mutex_unlock(&buffer_conn_list_lock);
573 mutex_unlock(&bufferlimits_lock);
575 return window;
578 void refresh_speedstat(struct conn *rconn, __u32 written)
580 unsigned long iflags;
581 unsigned long jiffies_tmp;
583 spin_lock_irqsave(&st_lock, iflags);
585 jiffies_tmp = jiffies;
587 if (rconn->source.in.st.jiffies_last_update != jiffies_tmp)
588 get_speed(&(rconn->source.in.st), jiffies_tmp);
589 if (rconn->source.in.st.bytes_curr + written < written)
590 rconn->source.in.st.bytes_curr = -1;
591 rconn->source.in.st.bytes_curr += written;
593 if (st.jiffies_last_update != jiffies_tmp)
594 get_speed(&st, jiffies_tmp);
595 if (st.bytes_curr + written < written)
596 st.bytes_curr = -1;
597 st.bytes_curr += written;
599 spin_unlock_irqrestore(&st_lock, iflags);
602 void drain_ooo_queue(struct conn *rconn)
604 struct sk_buff *skb;
606 BUG_ON(SOURCE_IN != rconn->sourcetype);
608 skb = rconn->source.in.reorder_queue.next;
610 while ((void *) skb != (void *) &(rconn->source.in.reorder_queue)) {
611 struct skb_procstate *ps = skb_pstate(skb);
612 int drop;
614 if (rconn->source.in.next_seqno != ps->funcstate.rcv2.seqno)
615 break;
617 drop = receive_skb(rconn, skb);
618 if (drop)
619 break;
621 skb_unlink(skb, &(rconn->source.in.reorder_queue));
622 rconn->source.in.ooo_packets--;
623 atomic_dec(&(rconn->source.in.nb->ooo_packets));
624 atomic_dec(&ooo_packets);
626 rconn->source.in.next_seqno += skb->len;
630 static int _conn_rcv_ooo(struct conn *rconn, struct sk_buff *skb)
632 struct skb_procstate *ps = skb_pstate(skb);
633 struct sk_buff_head *reorder_queue = &(rconn->source.in.reorder_queue);
634 struct sk_buff *curr = reorder_queue->next;
636 long ooo;
638 rconn->source.in.ooo_packets++;
639 if (rconn->source.in.ooo_packets > MAX_TOTAL_OOO_PER_CONN)
640 goto drop_ooo3;
642 ooo = atomic_inc_return(&(rconn->source.in.nb->ooo_packets));
643 if (ooo > MAX_TOTAL_OOO_PER_NEIGH)
644 goto drop_ooo2;
646 ooo = atomic_inc_return(&ooo_packets);
647 if (ooo > MAX_TOTAL_OOO_PACKETS)
648 goto drop_ooo1;
651 while (1) {
652 struct skb_procstate *ps2 = skb_pstate(curr);
654 if ((void *) curr == (void *) reorder_queue) {
655 skb_queue_tail(reorder_queue, skb);
656 break;
659 if (ps->funcstate.rcv2.seqno > ps2->funcstate.rcv2.seqno) {
660 skb_insert(curr, skb, reorder_queue);
661 break;
664 curr = curr->next;
667 if (0) {
668 drop_ooo1:
669 atomic_dec(&ooo_packets);
670 drop_ooo2:
671 atomic_dec(&(rconn->source.in.nb->ooo_packets));
672 drop_ooo3:
673 rconn->source.in.ooo_packets--;
675 return 1;
678 return 0;
681 static void _conn_rcv(struct conn *rconn, struct sk_buff *skb)
683 struct skb_procstate *ps = skb_pstate(skb);
684 struct control_msg_out *cm = alloc_control_msg(rconn->source.in.nb,
685 ACM_PRIORITY_MED);
687 int in_order;
688 int drop = 1;
690 __u32 len = skb->len;
692 BUG_ON(rconn->sourcetype != SOURCE_IN);
694 if (unlikely(cm == 0)) {
695 kfree_skb(skb);
696 return;
699 mutex_lock(&(rconn->rcv_lock));
701 in_order = (rconn->source.in.next_seqno == ps->funcstate.rcv2.seqno);
703 if (in_order == 0) {
704 drop = _conn_rcv_ooo(rconn, skb);
705 } else {
706 rconn->source.in.next_seqno += skb->len;
707 drop = receive_skb(rconn, skb);
710 if (drop) {
711 kfree_skb(skb);
712 free_control_msg(cm);
713 goto out;
716 #warning todo balance credits
717 if (in_order == 0) {
718 send_ack_conn_ooo(cm, rconn,
719 rconn->reversedir->target.out.conn_id,
720 rconn->source.in.next_seqno,
721 ps->funcstate.rcv2.seqno, len);
722 } else {
723 drain_ooo_queue(rconn);
724 send_ack_conn(cm, rconn, rconn->reversedir->target.out.conn_id,
725 rconn->source.in.next_seqno);
728 out:
729 mutex_unlock(&(rconn->rcv_lock));
732 static void conn_rcv(struct sk_buff *skb, __u32 conn_id, __u32 seqno)
734 struct conn *rconn;
735 struct skb_procstate *ps = skb_pstate(skb);
737 ps->funcstate.rcv2.seqno = seqno;
739 rconn = get_conn(conn_id);
741 if (unlikely(rconn == 0)) {
742 printk(KERN_DEBUG "unknown conn_id when receiving: %d", conn_id);
743 kfree_skb(skb);
744 return;
746 _conn_rcv(rconn, skb);
747 kref_put(&(rconn->ref), free_conn);
750 void conn_rcv_buildskb(char *data, __u32 datalen, __u32 conn_id, __u32 seqno)
752 struct sk_buff *skb = alloc_skb(datalen, GFP_KERNEL);
753 char *dst = skb_put(skb, datalen);
754 memcpy(dst, data, datalen);
755 conn_rcv(skb, conn_id, seqno);
758 static void rcv_data(struct sk_buff *skb)
760 __u32 conn_id;
761 __u32 seqno;
763 char *connid_p = cor_pull_skb(skb, 4);
764 char *seqno_p = cor_pull_skb(skb, 4);
766 /* __u8 rand; */
768 ((char *)&conn_id)[0] = connid_p[0];
769 ((char *)&conn_id)[1] = connid_p[1];
770 ((char *)&conn_id)[2] = connid_p[2];
771 ((char *)&conn_id)[3] = connid_p[3];
773 ((char *)&seqno)[0] = seqno_p[0];
774 ((char *)&seqno)[1] = seqno_p[1];
775 ((char *)&seqno)[2] = seqno_p[2];
776 ((char *)&seqno)[3] = seqno_p[3];
778 conn_id = be32_to_cpu(conn_id);
779 seqno = be32_to_cpu(seqno);
781 /* get_random_bytes(&rand, 1);
783 if (rand < 64) {
784 printk(KERN_ERR "drop %d %d %d %d %d", conn_id, seqno_p[0],
785 seqno_p[1], seqno_p[2], seqno_p[3]);
786 goto drop;
787 } */
789 if (conn_id == 0) {
790 struct neighbor *nb = get_neigh_by_mac(skb);
791 if (unlikely(nb == 0))
792 goto drop;
793 kernel_packet(nb, skb, seqno);
794 kref_put(&(nb->ref), neighbor_free);
795 } else {
796 conn_rcv(skb, conn_id, seqno);
799 if (0) {
800 drop:
801 kfree_skb(skb);
805 static void rcv(struct work_struct *work)
807 struct sk_buff *skb = skb_from_pstate(container_of(work,
808 struct skb_procstate, funcstate.rcv.work));
810 __u8 packet_type;
811 char *packet_type_p;
813 atomic_dec(&packets_in_workqueue);
815 packet_type_p = cor_pull_skb(skb, 1);
817 if (unlikely(packet_type_p == 0))
818 goto drop;
820 packet_type = *packet_type_p;
822 if (packet_type == PACKET_TYPE_ANNOUNCE) {
823 rcv_announce(skb);
824 return;
827 if (unlikely(packet_type != PACKET_TYPE_DATA))
828 goto drop;
830 rcv_data(skb);
832 if (0) {
833 drop:
834 kfree_skb(skb);
838 static int queue_rcv_processing(struct sk_buff *skb, struct net_device *dev,
839 struct packet_type *pt, struct net_device *orig_dev)
841 struct skb_procstate *ps = skb_pstate(skb);
842 long queuelen;
844 if (skb->pkt_type == PACKET_OTHERHOST)
845 goto drop;
847 BUG_ON(skb->next != 0);
849 queuelen = atomic_inc_return(&packets_in_workqueue);
851 BUG_ON(queuelen <= 0);
853 if (queuelen > MAX_PACKETS_IN_RCVQUEUE) {
854 atomic_dec(&packets_in_workqueue);
855 goto drop;
858 INIT_WORK(&(ps->funcstate.rcv.work), rcv);
859 queue_work(packet_wq, &(ps->funcstate.rcv.work));
860 return NET_RX_SUCCESS;
862 drop:
863 kfree_skb(skb);
864 return NET_RX_DROP;
867 static struct packet_type ptype_cor = {
868 .type = htons(ETH_P_COR),
869 .dev = 0,
870 .func = queue_rcv_processing
873 int __init cor_rcv_init(void)
875 bufferassigned_init = 0;
876 bufferassigned_speed = 0;
877 bufferassigned_ata = 0;
878 bufferassigned_reserve = 0;
880 bufferusage_init = 0;
881 bufferusage_speed = 0;
882 bufferusage_ata = 0;
883 bufferusage_reserve = 0;
885 spin_lock_init(&st_lock);
886 memset(&st, 0, sizeof(struct speedtracker));
888 BUG_ON(sizeof(struct skb_procstate) > 48);
889 packet_wq = create_workqueue("cor_packet");
890 outofbufferspace_wq = create_workqueue("cor_outofbufferspace");
891 spin_lock_init(&oobss_lock);
892 outofbufferspace_scheduled = 0;
894 dev_add_pack(&ptype_cor);
895 return 0;
898 MODULE_LICENSE("GPL");