credit system, reverse_connid_table insert bugfix
[cor_2_6_31.git] / net / cor / rcv.c
blob23ea91737d90c491cdd62eb2598583b51d87e5ba
1 /*
2 * Connection oriented routing
3 * Copyright (C) 2007-2008 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/module.h>
22 #include <linux/version.h>
23 #include <linux/kernel.h>
24 #include <linux/init.h>
25 #include <linux/in.h>
28 #include "cor.h"
30 atomic_t packets_in_workqueue = ATOMIC_INIT(0);
32 atomic_t ooo_packets = ATOMIC_INIT(0);
34 static struct workqueue_struct *packet_wq;
36 void drain_ooo_queue(struct conn *rconn)
38 struct sk_buff *skb;
40 BUG_ON(SOURCE_IN != rconn->sourcetype);
42 skb = rconn->source.in.reorder_queue.next;
44 while ((void *) skb != (void *) &(rconn->source.in.reorder_queue)) {
45 struct skb_procstate *ps = skb_pstate(skb);
46 int drop;
48 if (rconn->source.in.next_seqno != ps->funcstate.rcv2.seqno)
49 break;
51 drop = receive_skb(rconn, skb);
52 if (drop)
53 break;
55 skb_unlink(skb, &(rconn->source.in.reorder_queue));
56 rconn->source.in.ooo_packets--;
57 atomic_dec(&(rconn->source.in.nb->ooo_packets));
58 atomic_dec(&ooo_packets);
60 rconn->source.in.next_seqno += skb->len;
64 static int _conn_rcv_ooo(struct conn *rconn, struct sk_buff *skb)
66 struct skb_procstate *ps = skb_pstate(skb);
67 struct sk_buff_head *reorder_queue = &(rconn->source.in.reorder_queue);
68 struct sk_buff *curr = reorder_queue->next;
70 long ooo;
72 rconn->source.in.ooo_packets++;
73 if (rconn->source.in.ooo_packets > MAX_TOTAL_OOO_PER_CONN)
74 goto drop_ooo3;
76 ooo = atomic_inc_return(&(rconn->source.in.nb->ooo_packets));
77 if (ooo > MAX_TOTAL_OOO_PER_NEIGH)
78 goto drop_ooo2;
80 ooo = atomic_inc_return(&ooo_packets);
81 if (ooo > MAX_TOTAL_OOO_PACKETS)
82 goto drop_ooo1;
85 while (1) {
86 struct skb_procstate *ps2 = skb_pstate(curr);
88 if ((void *) curr == (void *) reorder_queue) {
89 skb_queue_tail(reorder_queue, skb);
90 break;
93 if (ps->funcstate.rcv2.seqno > ps2->funcstate.rcv2.seqno) {
94 skb_insert(curr, skb, reorder_queue);
95 break;
98 curr = curr->next;
101 if (0) {
102 drop_ooo1:
103 atomic_dec(&ooo_packets);
104 drop_ooo2:
105 atomic_dec(&(rconn->source.in.nb->ooo_packets));
106 drop_ooo3:
107 rconn->source.in.ooo_packets--;
109 return 1;
112 return 0;
115 static void _conn_rcv(struct conn *rconn, struct sk_buff *skb)
117 struct skb_procstate *ps = skb_pstate(skb);
118 struct control_msg_out *cm = alloc_control_msg(rconn->source.in.nb,
119 ACM_PRIORITY_MED);
121 int in_order;
122 int drop = 1;
124 __u32 len = skb->len;
126 BUG_ON(rconn->sourcetype != SOURCE_IN);
128 if (unlikely(cm == 0)) {
129 kfree_skb(skb);
130 return;
133 mutex_lock(&(rconn->rcv_lock));
135 in_order = (rconn->source.in.next_seqno == ps->funcstate.rcv2.seqno);
137 if (in_order == 0) {
138 drop = _conn_rcv_ooo(rconn, skb);
139 } else {
140 rconn->source.in.next_seqno += skb->len;
141 drop = receive_skb(rconn, skb);
144 if (drop) {
145 kfree_skb(skb);
146 free_control_msg(cm);
147 goto out;
150 #warning todo set window, balance credits
151 if (in_order == 0) {
152 send_ack_conn_ooo(cm, rconn->reversedir->target.out.conn_id,
153 rconn->source.in.next_seqno, enc_window(65536),
154 ps->funcstate.rcv2.seqno, len);
155 } else {
156 drain_ooo_queue(rconn);
157 send_ack_conn(cm, rconn->reversedir->target.out.conn_id,
158 rconn->source.in.next_seqno, enc_window(65536));
161 out:
162 mutex_unlock(&(rconn->rcv_lock));
165 static void conn_rcv(struct sk_buff *skb, __u32 conn_id, __u32 seqno)
167 struct conn *rconn;
168 struct skb_procstate *ps = skb_pstate(skb);
170 ps->funcstate.rcv2.seqno = seqno;
172 rconn = get_conn(conn_id);
174 if (unlikely(rconn == 0)) {
175 printk(KERN_DEBUG "unknown conn_id when receiving: %d", conn_id);
176 kfree_skb(skb);
177 return;
179 _conn_rcv(rconn, skb);
180 kref_put(&(rconn->ref), free_conn);
183 void conn_rcv_buildskb(char *data, __u32 datalen, __u32 conn_id, __u32 seqno)
185 struct sk_buff *skb = alloc_skb(datalen, GFP_KERNEL);
186 char *dst = skb_put(skb, datalen);
187 memcpy(dst, data, datalen);
188 conn_rcv(skb, conn_id, seqno);
191 static void rcv_data(struct sk_buff *skb)
193 __u32 conn_id;
194 __u32 seqno;
196 char *connid_p = cor_pull_skb(skb, 4);
197 char *seqno_p = cor_pull_skb(skb, 4);
199 /* __u8 rand; */
201 ((char *)&conn_id)[0] = connid_p[0];
202 ((char *)&conn_id)[1] = connid_p[1];
203 ((char *)&conn_id)[2] = connid_p[2];
204 ((char *)&conn_id)[3] = connid_p[3];
206 ((char *)&seqno)[0] = seqno_p[0];
207 ((char *)&seqno)[1] = seqno_p[1];
208 ((char *)&seqno)[2] = seqno_p[2];
209 ((char *)&seqno)[3] = seqno_p[3];
211 conn_id = be32_to_cpu(conn_id);
212 seqno = be32_to_cpu(seqno);
214 /* get_random_bytes(&rand, 1);
216 if (rand < 64) {
217 printk(KERN_ERR "drop %d %d %d %d %d", conn_id, seqno_p[0],
218 seqno_p[1], seqno_p[2], seqno_p[3]);
219 goto drop;
220 } */
222 if (conn_id == 0) {
223 struct neighbor *nb = get_neigh_by_mac(skb);
224 if (unlikely(nb == 0))
225 goto drop;
226 kernel_packet(nb, skb, seqno);
227 kref_put(&(nb->ref), neighbor_free);
228 } else {
229 conn_rcv(skb, conn_id, seqno);
232 if (0) {
233 drop:
234 kfree_skb(skb);
238 static void rcv(struct work_struct *work)
240 struct sk_buff *skb = skb_from_pstate(container_of(work,
241 struct skb_procstate, funcstate.rcv.work));
243 __u8 packet_type;
244 char *packet_type_p;
246 atomic_dec(&packets_in_workqueue);
248 packet_type_p = cor_pull_skb(skb, 1);
250 if (unlikely(packet_type_p == 0))
251 goto drop;
253 packet_type = *packet_type_p;
255 if (packet_type == PACKET_TYPE_ANNOUNCE) {
256 rcv_announce(skb);
257 return;
260 if (unlikely(packet_type != PACKET_TYPE_DATA))
261 goto drop;
263 rcv_data(skb);
265 if (0) {
266 drop:
267 kfree_skb(skb);
271 static int queue_rcv_processing(struct sk_buff *skb, struct net_device *dev,
272 struct packet_type *pt, struct net_device *orig_dev)
274 struct skb_procstate *ps = skb_pstate(skb);
275 long queuelen;
277 if (skb->pkt_type == PACKET_OTHERHOST)
278 goto drop;
280 BUG_ON(skb->next != 0);
282 queuelen = atomic_inc_return(&packets_in_workqueue);
284 BUG_ON(queuelen <= 0);
286 if (queuelen > MAX_PACKETS_IN_RCVQUEUE) {
287 atomic_dec(&packets_in_workqueue);
288 goto drop;
291 INIT_WORK(&(ps->funcstate.rcv.work), rcv);
292 queue_work(packet_wq, &(ps->funcstate.rcv.work));
293 return NET_RX_SUCCESS;
295 drop:
296 kfree_skb(skb);
297 return NET_RX_DROP;
300 static struct packet_type ptype_cor = {
301 .type = htons(ETH_P_COR),
302 .dev = 0,
303 .func = queue_rcv_processing
306 int __init cor_rcv_init(void)
308 BUG_ON(sizeof(struct skb_procstate) > 48);
309 packet_wq = create_workqueue("cor_packet");
310 dev_add_pack(&ptype_cor);
311 return 0;
314 MODULE_LICENSE("GPL");