2 * Connection oriented routing
3 * Copyright (C) 2007-2008 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <linux/module.h>
22 #include <linux/version.h>
23 #include <linux/kernel.h>
24 #include <linux/init.h>
30 atomic_t packets_in_workqueue
= ATOMIC_INIT(0);
32 atomic_t ooo_packets
= ATOMIC_INIT(0);
34 static struct workqueue_struct
*packet_wq
;
36 void drain_ooo_queue(struct conn
*rconn
)
40 BUG_ON(SOURCE_IN
!= rconn
->sourcetype
);
42 skb
= rconn
->source
.in
.reorder_queue
.next
;
44 while ((void *) skb
!= (void *) &(rconn
->source
.in
.reorder_queue
)) {
45 struct skb_procstate
*ps
= skb_pstate(skb
);
48 if (rconn
->source
.in
.next_seqno
!= ps
->funcstate
.rcv2
.seqno
)
51 drop
= receive_skb(rconn
, skb
);
55 skb_unlink(skb
, &(rconn
->source
.in
.reorder_queue
));
56 rconn
->source
.in
.ooo_packets
--;
57 atomic_dec(&(rconn
->source
.in
.nb
->ooo_packets
));
58 atomic_dec(&ooo_packets
);
60 rconn
->source
.in
.next_seqno
+= skb
->len
;
64 static int _conn_rcv_ooo(struct conn
*rconn
, struct sk_buff
*skb
)
66 struct skb_procstate
*ps
= skb_pstate(skb
);
67 struct sk_buff_head
*reorder_queue
= &(rconn
->source
.in
.reorder_queue
);
68 struct sk_buff
*curr
= reorder_queue
->next
;
72 rconn
->source
.in
.ooo_packets
++;
73 if (rconn
->source
.in
.ooo_packets
> MAX_TOTAL_OOO_PER_CONN
)
76 ooo
= atomic_inc_return(&(rconn
->source
.in
.nb
->ooo_packets
));
77 if (ooo
> MAX_TOTAL_OOO_PER_NEIGH
)
80 ooo
= atomic_inc_return(&ooo_packets
);
81 if (ooo
> MAX_TOTAL_OOO_PACKETS
)
86 struct skb_procstate
*ps2
= skb_pstate(curr
);
88 if ((void *) curr
== (void *) reorder_queue
) {
89 skb_queue_tail(reorder_queue
, skb
);
93 if (ps
->funcstate
.rcv2
.seqno
> ps2
->funcstate
.rcv2
.seqno
) {
94 skb_insert(curr
, skb
, reorder_queue
);
103 atomic_dec(&ooo_packets
);
105 atomic_dec(&(rconn
->source
.in
.nb
->ooo_packets
));
107 rconn
->source
.in
.ooo_packets
--;
115 static void _conn_rcv(struct conn
*rconn
, struct sk_buff
*skb
)
117 struct skb_procstate
*ps
= skb_pstate(skb
);
118 struct control_msg_out
*cm
= alloc_control_msg(rconn
->source
.in
.nb
,
124 __u32 len
= skb
->len
;
126 BUG_ON(rconn
->sourcetype
!= SOURCE_IN
);
128 if (unlikely(cm
== 0)) {
133 mutex_lock(&(rconn
->rcv_lock
));
135 in_order
= (rconn
->source
.in
.next_seqno
== ps
->funcstate
.rcv2
.seqno
);
138 drop
= _conn_rcv_ooo(rconn
, skb
);
140 rconn
->source
.in
.next_seqno
+= skb
->len
;
141 drop
= receive_skb(rconn
, skb
);
146 free_control_msg(cm
);
150 #warning todo set window, balance credits
152 send_ack_conn_ooo(cm
, rconn
->reversedir
->target
.out
.conn_id
,
153 rconn
->source
.in
.next_seqno
, enc_window(65536),
154 ps
->funcstate
.rcv2
.seqno
, len
);
156 drain_ooo_queue(rconn
);
157 send_ack_conn(cm
, rconn
->reversedir
->target
.out
.conn_id
,
158 rconn
->source
.in
.next_seqno
, enc_window(65536));
162 mutex_unlock(&(rconn
->rcv_lock
));
165 static void conn_rcv(struct sk_buff
*skb
, __u32 conn_id
, __u32 seqno
)
168 struct skb_procstate
*ps
= skb_pstate(skb
);
170 ps
->funcstate
.rcv2
.seqno
= seqno
;
172 rconn
= get_conn(conn_id
);
174 if (unlikely(rconn
== 0)) {
175 printk(KERN_DEBUG
"unknown conn_id when receiving: %d", conn_id
);
179 _conn_rcv(rconn
, skb
);
180 kref_put(&(rconn
->ref
), free_conn
);
183 void conn_rcv_buildskb(char *data
, __u32 datalen
, __u32 conn_id
, __u32 seqno
)
185 struct sk_buff
*skb
= alloc_skb(datalen
, GFP_KERNEL
);
186 char *dst
= skb_put(skb
, datalen
);
187 memcpy(dst
, data
, datalen
);
188 conn_rcv(skb
, conn_id
, seqno
);
191 static void rcv_data(struct sk_buff
*skb
)
196 char *connid_p
= cor_pull_skb(skb
, 4);
197 char *seqno_p
= cor_pull_skb(skb
, 4);
201 ((char *)&conn_id
)[0] = connid_p
[0];
202 ((char *)&conn_id
)[1] = connid_p
[1];
203 ((char *)&conn_id
)[2] = connid_p
[2];
204 ((char *)&conn_id
)[3] = connid_p
[3];
206 ((char *)&seqno
)[0] = seqno_p
[0];
207 ((char *)&seqno
)[1] = seqno_p
[1];
208 ((char *)&seqno
)[2] = seqno_p
[2];
209 ((char *)&seqno
)[3] = seqno_p
[3];
211 conn_id
= be32_to_cpu(conn_id
);
212 seqno
= be32_to_cpu(seqno
);
214 /* get_random_bytes(&rand, 1);
217 printk(KERN_ERR "drop %d %d %d %d %d", conn_id, seqno_p[0],
218 seqno_p[1], seqno_p[2], seqno_p[3]);
223 struct neighbor
*nb
= get_neigh_by_mac(skb
);
224 if (unlikely(nb
== 0))
226 kernel_packet(nb
, skb
, seqno
);
227 kref_put(&(nb
->ref
), neighbor_free
);
229 conn_rcv(skb
, conn_id
, seqno
);
238 static void rcv(struct work_struct
*work
)
240 struct sk_buff
*skb
= skb_from_pstate(container_of(work
,
241 struct skb_procstate
, funcstate
.rcv
.work
));
246 atomic_dec(&packets_in_workqueue
);
248 packet_type_p
= cor_pull_skb(skb
, 1);
250 if (unlikely(packet_type_p
== 0))
253 packet_type
= *packet_type_p
;
255 if (packet_type
== PACKET_TYPE_ANNOUNCE
) {
260 if (unlikely(packet_type
!= PACKET_TYPE_DATA
))
271 static int queue_rcv_processing(struct sk_buff
*skb
, struct net_device
*dev
,
272 struct packet_type
*pt
, struct net_device
*orig_dev
)
274 struct skb_procstate
*ps
= skb_pstate(skb
);
277 if (skb
->pkt_type
== PACKET_OTHERHOST
)
280 BUG_ON(skb
->next
!= 0);
282 queuelen
= atomic_inc_return(&packets_in_workqueue
);
284 BUG_ON(queuelen
<= 0);
286 if (queuelen
> MAX_PACKETS_IN_RCVQUEUE
) {
287 atomic_dec(&packets_in_workqueue
);
291 INIT_WORK(&(ps
->funcstate
.rcv
.work
), rcv
);
292 queue_work(packet_wq
, &(ps
->funcstate
.rcv
.work
));
293 return NET_RX_SUCCESS
;
300 static struct packet_type ptype_cor
= {
301 .type
= htons(ETH_P_COR
),
303 .func
= queue_rcv_processing
306 int __init
cor_rcv_init(void)
308 BUG_ON(sizeof(struct skb_procstate
) > 48);
309 packet_wq
= create_workqueue("cor_packet");
310 dev_add_pack(&ptype_cor
);
314 MODULE_LICENSE("GPL");