rcv reorder queue bugfix
[cor_2_6_31.git] / net / cor / rcv.c
blobacfd4602886415f7ff5bca729cdd01abccde34af
1 /*
2 * Connection oriented routing
3 * Copyright (C) 2007-2008 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #ifndef _KERNEL_
22 #define _KERNEL_
23 #endif
25 #ifndef MODULE
26 #define MODULE
27 #endif
29 #include <linux/module.h>
30 #include <linux/version.h>
31 #include <linux/kernel.h>
32 #include <linux/init.h>
33 #include <linux/in.h>
36 #include "cor.h"
38 atomic_t packets_in_workqueue = ATOMIC_INIT(0);
40 atomic_t ooo_packets = ATOMIC_INIT(0);
42 static struct workqueue_struct *packet_wq;
44 void drain_ooo_queue(struct conn *rconn)
46 struct sk_buff *skb;
48 BUG_ON(SOURCE_IN != rconn->sourcetype);
50 skb = rconn->source.in.reorder_queue.next;
52 while ((void *) skb != (void *) &(rconn->source.in.reorder_queue)) {
53 struct skb_procstate *ps = skb_pstate(skb);
54 int drop;
56 BUG_ON(rconn != ps->rconn);
58 if (rconn->source.in.next_seqno != ps->funcstate.rcv2.seqno)
59 break;
61 drop = receive_skb(rconn, skb);
62 if (drop)
63 break;
65 skb_unlink(skb, &(rconn->source.in.reorder_queue));
66 rconn->source.in.ooo_packets--;
67 atomic_dec(&(rconn->source.in.nb->ooo_packets));
68 atomic_dec(&ooo_packets);
70 rconn->source.in.next_seqno += skb->len;
75 static int _conn_rcv_ooo(struct sk_buff *skb)
77 struct skb_procstate *ps = skb_pstate(skb);
78 struct conn *rconn = ps->rconn;
79 struct sk_buff_head *reorder_queue = &(rconn->source.in.reorder_queue);
80 struct sk_buff *curr = reorder_queue->next;
82 long ooo;
84 rconn->source.in.ooo_packets++;
85 if (rconn->source.in.ooo_packets > MAX_TOTAL_OOO_PER_CONN)
86 goto drop_ooo3;
88 ooo = atomic_inc_return(&(rconn->source.in.nb->ooo_packets));
89 if (ooo > MAX_TOTAL_OOO_PER_NEIGH)
90 goto drop_ooo2;
92 ooo = atomic_inc_return(&ooo_packets);
93 if (ooo > MAX_TOTAL_OOO_PACKETS)
94 goto drop_ooo1;
97 while (1) {
98 struct skb_procstate *ps2 = skb_pstate(curr);
100 if ((void *) curr == (void *) reorder_queue) {
101 skb_queue_tail(reorder_queue, skb);
102 break;
105 BUG_ON(rconn != ps2->rconn);
107 if (ps->funcstate.rcv2.seqno > ps2->funcstate.rcv2.seqno) {
108 skb_insert(curr, skb, reorder_queue);
109 break;
113 if (0) {
114 drop_ooo1:
115 atomic_dec(&ooo_packets);
116 drop_ooo2:
117 atomic_dec(&(rconn->source.in.nb->ooo_packets));
118 drop_ooo3:
119 rconn->source.in.ooo_packets--;
121 return 1;
124 return 0;
127 static void _conn_rcv(struct sk_buff *skb)
129 struct skb_procstate *ps = skb_pstate(skb);
130 struct conn *rconn = ps->rconn;
131 struct control_msg_out *cm = alloc_control_msg();
133 int in_order;
134 int drop = 1;
136 BUG_ON(rconn->sourcetype != SOURCE_IN);
138 if (unlikely(cm == 0)) {
139 kfree_skb(skb);
140 goto out;
143 mutex_lock(&(rconn->rcv_lock));
145 in_order = rconn->source.in.next_seqno != ps->funcstate.rcv2.seqno;
147 if (in_order == 0) {
148 drop = _conn_rcv_ooo(skb);
149 } else {
150 rconn->source.in.next_seqno += skb->len;
151 drop = receive_skb(rconn, skb);
154 if (drop) {
155 kfree_skb(skb);
156 free_control_msg(cm);
157 } else {
158 send_ack(cm, rconn->source.in.nb, rconn->source.in.conn_id,
159 ps->funcstate.rcv2.seqno);
162 if (in_order)
163 drain_ooo_queue(rconn);
165 mutex_unlock(&(rconn->rcv_lock));
166 out:
167 ref_counter_decr(&(rconn->refs));
170 static void conn_rcv(struct sk_buff *skb, __u32 conn_id, __u32 seqno)
172 struct skb_procstate *ps = skb_pstate(skb);
174 ps->funcstate.rcv2.conn_id = conn_id;
175 ps->funcstate.rcv2.seqno = seqno;
177 ps->rconn = get_conn(ps->funcstate.rcv2.conn_id);
178 _conn_rcv(skb);
181 void conn_rcv_buildskb(char *data, __u32 datalen, __u32 conn_id, __u32 seqno)
183 struct sk_buff *skb = alloc_skb(datalen, GFP_KERNEL);
184 char *dst = skb_put(skb, datalen);
185 memcpy(dst, data, datalen);
186 conn_rcv(skb, conn_id, seqno);
189 static void rcv_data(struct sk_buff *skb)
191 __u32 conn_id;
192 __u32 seqno;
194 char *connid_p = cor_pull_skb(skb, 4);
195 char *seqno_p = cor_pull_skb(skb, 4);
197 ((char *)&conn_id)[0] = connid_p[0];
198 ((char *)&conn_id)[1] = connid_p[1];
199 ((char *)&conn_id)[2] = connid_p[2];
200 ((char *)&conn_id)[3] = connid_p[3];
202 ((char *)&seqno)[0] = seqno_p[0];
203 ((char *)&seqno)[1] = seqno_p[1];
204 ((char *)&seqno)[2] = seqno_p[2];
205 ((char *)&seqno)[3] = seqno_p[3];
207 conn_id = be32_to_cpu(conn_id);
208 seqno = be32_to_cpu(seqno);
210 if (conn_id == 0) {
211 struct neighbor *nb = get_neigh_by_mac(skb);
212 printk(KERN_ERR "rcv kern");
213 if (unlikely(nb == 0))
214 goto drop;
215 kernel_packet(nb, skb, seqno);
216 ref_counter_decr(&(nb->refs));
217 } else {
218 printk(KERN_ERR "rcv conn");
219 conn_rcv(skb, conn_id, seqno);
222 if (0) {
223 drop:
224 printk(KERN_ERR "drop");
225 kfree_skb(skb);
229 static void rcv(struct work_struct *work)
231 struct sk_buff *skb = skb_from_pstate(container_of(work,
232 struct skb_procstate, funcstate.rcv.work));
234 __u8 packet_type;
235 char *packet_type_p;
237 atomic_dec(&packets_in_workqueue);
239 packet_type_p = cor_pull_skb(skb, 1);
241 if (packet_type_p == 0)
242 goto drop;
244 packet_type = *packet_type_p;
246 if (packet_type == PACKET_TYPE_ANNOUNCE) {
247 rcv_announce(skb);
248 return;
251 if (packet_type != PACKET_TYPE_DATA)
252 goto drop;
254 rcv_data(skb);
256 if (0) {
257 drop:
258 kfree_skb(skb);
262 static int queue_rcv_processing(struct sk_buff *skb, struct net_device *dev,
263 struct packet_type *pt, struct net_device *orig_dev)
265 struct skb_procstate *ps = skb_pstate(skb);
266 long queuelen;
268 BUG_ON(skb->next != 0);
270 queuelen = atomic_inc_return(&packets_in_workqueue);
272 BUG_ON(queuelen <= 0);
274 if (queuelen > MAX_PACKETS_IN_RCVQUEUE) {
275 atomic_dec(&packets_in_workqueue);
276 kfree_skb(skb);
277 return NET_RX_DROP;
280 INIT_WORK(&(ps->funcstate.rcv.work), rcv);
281 queue_work(packet_wq, &(ps->funcstate.rcv.work));
282 return NET_RX_SUCCESS;
285 static struct packet_type ptype_cor = {
286 .type = htons(ETH_P_COR),
287 .dev = 0,
288 .func = queue_rcv_processing
291 int __init cor_rcv_init(void)
293 BUG_ON(sizeof(struct skb_procstate) > 48);
294 packet_wq = create_workqueue("cor_packet");
295 dev_add_pack(&ptype_cor);
296 return 0;
299 MODULE_LICENSE("GPL");