neighbor reference counter
[cor_2_6_31.git] / net / cor / snd.c
blob67fc5c29a9be7ed753a389c3d2f1e603a40504e0
1 /*
2 * Connection oriented routing
3 * Copyright (C) 2007-2008 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/gfp.h>
22 #include <linux/jiffies.h>
23 #include <linux/slab.h>
25 #include "cor.h"
27 static struct htable retransmits;
29 static void free_skb(struct kref *ref)
31 struct skb_procstate *ps = container_of(ref, struct skb_procstate,
32 funcstate.retransmit_queue.ref);
34 struct sk_buff *skb = skb_from_pstate(ps);
35 kref_put(&(ps->funcstate.retransmit_queue.nb->ref), neighbor_free);
36 kfree_skb(skb);
39 struct retransmit_matchparam {
40 __u32 conn_id;
41 __u32 seqno;
42 struct neighbor *nb;
45 static __u32 rm_to_key(struct retransmit_matchparam *rm)
47 return rm->conn_id ^ rm->seqno;
50 /* static struct sk_buff * cor_dequeue(struct Qdisc *sch)
52 struct sk_buff *ret;
54 struct cor_sched_data *q = qdisc_priv(sch);
56 struct list_head *ln = q->conn_list.next;
57 struct conn *best = 0;
59 __u64 currcost_limit = 0;
60 __u64 currcost = 0;
62 spin_lock(&(q->lock));
64 if (!(skb_queue_empty(&(q->requeue_queue)))) {
65 ret = __skb_dequeue(&(q->requeue_queue));
66 goto out;
69 while (&(q->conn_list) != ln) {
70 __u32 max1, max2, maxcost;
71 struct conn *curr = (struct conn *)
72 (((char *) ln) - offsetof(struct conn,
73 target.out.queue_list));
75 BUG_ON(TARGET_OUT != curr->targettype);
76 max1 = (256 * ((__u64)curr->credits)) /
77 ((__u64)curr->bytes_queued + curr->avg_rate);
79 max2 = (256 * ((__u64)curr->credits +
80 curr->credit_sender - curr->credit_recp)) /
81 ((__u64)curr->bytes_queued + 2*curr->avg_rate);
83 maxcost = max((__u32) 0, min((max1), (max2)));
85 if (maxcost > currcost_limit) {
86 currcost = currcost_limit;
87 currcost_limit = maxcost;
88 best = curr;
91 ln = ln->next;
94 best->credits -= currcost;
96 ret = __skb_dequeue(&(best->target.out.queue));
98 if (skb_queue_empty(&(best->target.out.queue))) {
99 list_del(&(best->target.out.queue_list));
100 best->target.out.qdisc_active = 0;
103 out:
104 spin_unlock(&(q->lock));
106 if (likely(0 != ret)) {
107 sch->qstats.backlog -= ret->len;
108 sch->q.qlen--;
111 return ret;
114 static int cor_enqueue(struct sk_buff *skb, struct Qdisc *sch)
116 struct cor_sched_data *q = qdisc_priv(sch);
117 struct conn *rconn;
119 rconn = skb_pstate(skb)->rconn;
121 BUG_ON(TARGET_OUT != rconn->targettype);
123 spin_lock(&(rconn->target.out.qdisc_lock));
125 __skb_queue_tail(&(rconn->target.out.queue), skb);
127 if (unlikely(0 == rconn->target.out.qdisc_active)) {
128 spin_lock(&(q->lock));
129 list_add(&(rconn->target.out.queue_list), &(q->conn_list));
130 rconn->target.out.qdisc_active = 1;
131 spin_unlock(&(q->lock));
134 spin_unlock(&(rconn->target.out.qdisc_lock));
136 sch->bstats.bytes += skb->len;
137 sch->bstats.packets++;
138 sch->q.qlen++;
140 return NET_XMIT_SUCCESS;
141 } */
143 static void cor_xmit(struct sk_buff *skb, int atomic, int clone)
145 struct sk_buff *skb2;
147 BUG_ON(skb == 0);
149 if (clone) {
150 skb2 = skb_clone(skb, __GFP_DMA | (atomic ? GFP_ATOMIC : GFP_KERNEL));
152 if (skb2 == 0) {
153 printk(KERN_WARNING "cor_xmit: cannot clone skb, "
154 "allocation failure?");
155 return;
157 } else {
158 skb2 = skb;
161 dev_queue_xmit(skb2);
164 static void set_retrans_timeout(struct sk_buff *skb, struct neighbor *nb)
166 struct skb_procstate *ps = skb_pstate(skb);
167 ps->funcstate.retransmit_queue.timeout = jiffies + msecs_to_jiffies(
168 300 + ((__u32)atomic_read(&(nb->latency)))/1000);
171 void retransmit_timerfunc(unsigned long arg)
173 unsigned long iflags;
175 struct neighbor *nb = (struct neighbor *) arg;
176 struct sk_buff *skb = 0;
177 unsigned long timeout;
179 int nbstate;
180 int nbput = 0;
182 spin_lock_irqsave( &(nb->state_lock), iflags );
183 nbstate = nb->state;
184 spin_unlock_irqrestore( &(nb->state_lock), iflags );
186 while (1) {
187 spin_lock_irqsave( &(nb->retrans_lock), iflags );
188 skb = __skb_dequeue(&(nb->retrans_list));
190 if (0 == skb)
191 goto out;
193 if (nbstate == NEIGHBOR_STATE_KILLED) {
194 struct skb_procstate *ps = skb_pstate(skb);
195 struct retransmit_matchparam rm;
196 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
198 rm.conn_id = ps->funcstate.retransmit_queue.conn_id;
199 rm.seqno = ps->funcstate.retransmit_queue.seqno;
200 rm.nb = ps->funcstate.retransmit_queue.nb;
202 htable_delete(&retransmits, rm_to_key(&rm), &rm,
203 free_skb);
204 kref_put(&(ps->funcstate.retransmit_queue.ref),
205 free_skb);
206 continue;
209 timeout = skb_pstate(skb)->funcstate.retransmit_queue.timeout;
211 if (time_after(timeout, jiffies)) {
212 __skb_queue_head(&(nb->retrans_list), skb);
213 goto modtimer;
216 set_retrans_timeout(skb, nb);
218 __skb_queue_tail(&(nb->retrans_list), skb);
220 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
221 cor_xmit(skb, 1, 1);
224 modtimer:
225 mod_timer(&(nb->retrans_timer), timeout);
227 if (0) {
228 out:
229 nb->retrans_timer_running = 0;
230 nbput = 1;
233 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
235 if (nbput)
236 kref_put(&(nb->ref), neighbor_free);
239 static struct sk_buff *create_packet(struct neighbor *nb, int size,
240 gfp_t alloc_flags, __u32 conn_id, __u32 seqno,
241 struct conn *target)
243 struct net_device *dev = nb->dev;
244 struct sk_buff *ret;
245 struct skb_procstate *ps;
246 char *dest;
248 ret = alloc_skb(size + 9 + LL_ALLOCATED_SPACE(dev), alloc_flags);
249 if (unlikely(0 == ret))
250 return 0;
252 ret->protocol = htons(ETH_P_COR);
253 ret->dev = dev;
255 skb_reserve(ret, LL_RESERVED_SPACE(dev));
256 if(unlikely(dev_hard_header(ret, dev, ETH_P_COR, nb->mac,
257 dev->dev_addr, ret->len) < 0))
258 return 0;
259 skb_reset_network_header(ret);
261 ps = skb_pstate(ret);
262 memset(&(ps->funcstate.retransmit_queue), 0,
263 sizeof(ps->funcstate.retransmit_queue));
264 ps->funcstate.retransmit_queue.conn_id = conn_id;
265 ps->funcstate.retransmit_queue.seqno = seqno;
266 ps->funcstate.retransmit_queue.nb = nb;
267 /* nb_ref is in schedule_retransmit */
269 dest = skb_put(ret, 9);
270 BUG_ON(0 == dest);
272 dest[0] = PACKET_TYPE_DATA;
273 dest += 1;
275 put_u32(dest, conn_id, 1);
276 dest += 4;
277 put_u32(dest, seqno, 1);
278 dest += 4;
280 return ret;
283 struct sk_buff *create_packet_conn(struct conn *target, int size,
284 gfp_t alloc_flags)
286 __u32 connid = target->target.out.conn_id;
287 __u32 seqno;
289 BUG_ON(target->targettype != TARGET_OUT);
291 seqno = target->target.out.seqno;
292 target->target.out.seqno += size;
294 return create_packet(target->target.out.nb, size, alloc_flags,
295 connid, seqno, target);
298 struct sk_buff *create_packet_kernel(struct neighbor *nb, int size,
299 gfp_t alloc_flags)
301 __u32 seqno = atomic_add_return(1, &(nb->kpacket_seqno));
302 return create_packet(nb, size, alloc_flags, 0, seqno, 0);
305 void send_conn_flushdata(struct conn *rconn, char *data, __u32 datalen)
307 __u32 seqno;
308 struct control_msg_out *cm = alloc_control_msg();
310 seqno = rconn->target.out.seqno;
311 rconn->target.out.seqno += datalen;
313 #warning todo retransmit/controlmsg == 0
315 send_conndata(cm, rconn->target.out.nb, rconn->target.out.conn_id,
316 seqno, data, data, datalen);
319 static void schedule_retransmit(struct sk_buff *skb, struct neighbor *nb)
321 unsigned long iflags;
323 struct skb_procstate *ps = skb_pstate(skb);
324 struct retransmit_matchparam rm;
325 int first;
327 rm.conn_id = ps->funcstate.retransmit_queue.conn_id;
328 rm.seqno = ps->funcstate.retransmit_queue.seqno;
329 rm.nb = nb;
331 set_retrans_timeout(skb, nb);
332 kref_init(&(ps->funcstate.retransmit_queue.ref));
333 htable_insert(&retransmits, (char *) skb, rm_to_key(&rm));
334 spin_lock_irqsave( &(nb->retrans_lock), iflags );
335 first = unlikely(skb_queue_empty(&(nb->retrans_list)));
336 __skb_queue_tail(&(nb->retrans_list), skb);
338 if (unlikely(unlikely(first) &&
339 unlikely(nb->retrans_timer_running == 0))) {
340 mod_timer(&(nb->retrans_timer),
341 ps->funcstate.retransmit_queue.timeout);
342 nb->retrans_timer_running = 1;
343 kref_get(&(nb->ref));
346 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
348 kref_get(&(ps->funcstate.retransmit_queue.ref));
351 void send_packet(struct sk_buff *skb, struct neighbor *nb, int retransmit)
353 if (retransmit)
354 schedule_retransmit(skb, nb);
356 cor_xmit(skb, 0, retransmit);
359 void ack_received(struct neighbor *nb, __u32 conn_id, __u32 seqno)
361 unsigned long iflags;
363 struct sk_buff *skb = 0;
364 struct skb_procstate *ps;
365 struct retransmit_matchparam rm;
367 int first = 0;
369 int ret;
371 rm.conn_id = conn_id;
372 rm.seqno = seqno;
373 rm.nb = nb;
375 skb = (struct sk_buff *) htable_get(&retransmits, rm_to_key(&rm), &rm);
377 if (0 == skb) {
378 printk(KERN_ERR "bogus/duplicate ack received");
379 return;
382 ps = skb_pstate(skb);
384 ret = htable_delete(&retransmits, rm_to_key(&rm), &rm, free_skb);
385 if (ret) {
386 /* somebody else has already deleted it in the meantime */
387 return;
390 BUG_ON(ps->funcstate.retransmit_queue.nb != nb);
392 spin_lock_irqsave( &(nb->retrans_lock), iflags );
394 if (unlikely(nb->retrans_list.next == skb))
395 first = 1;
396 skb->next->prev = skb->prev;
397 skb->prev->next = skb->next;
399 spin_unlock_irqrestore( &(nb->retrans_lock), iflags );
401 kref_put(&(ps->funcstate.retransmit_queue.ref), free_skb);
404 void flush_out(struct conn *rconn)
406 int targetmss = mss(rconn->target.out.nb);
408 char *buf;
409 __u32 len;
411 if (unlikely(rconn->target.out.conn_id == 0))
412 return;
414 while (rconn->buf.read_remaining >= targetmss) {
415 struct sk_buff *newskb = create_packet_conn(rconn, targetmss,
416 GFP_ATOMIC);
418 char *dst = skb_put(newskb, targetmss);
419 databuf_pull(&(rconn->buf), dst, targetmss);
420 databuf_ackread(&(rconn->buf));
421 send_packet(newskb, rconn->target.out.nb, 1);
424 if (rconn->buf.read_remaining == 0)
425 return;
427 len = rconn->buf.read_remaining;
428 buf = kmalloc(len, GFP_KERNEL);
430 databuf_pull(&(rconn->buf), buf, len);
431 databuf_ackread(&(rconn->buf));
433 send_conn_flushdata(rconn, buf, len);
435 wake_sender(rconn);
438 static int matches_skb_connid_seqno(void *htentry, void *searcheditem)
440 struct sk_buff *skb = (struct sk_buff *) htentry;
441 struct skb_procstate *ps = skb_pstate(skb);
442 struct retransmit_matchparam *rm = (struct retransmit_matchparam *)
443 searcheditem;
445 return rm->conn_id == ps->funcstate.retransmit_queue.conn_id &&
446 rm->seqno == ps->funcstate.retransmit_queue.seqno &&
447 rm->nb == ps->funcstate.retransmit_queue.nb;
450 static inline __u32 retransmit_entryoffset(void)
452 return offsetof(struct sk_buff, cb) + offsetof(struct skb_procstate,
453 funcstate.retransmit_queue.htab_entry);
456 static inline __u32 retransmit_refoffset(void)
458 return offsetof(struct sk_buff, cb) + offsetof(struct skb_procstate,
459 funcstate.retransmit_queue.ref);
462 int __init cor_snd_init(void)
464 htable_init(&retransmits, matches_skb_connid_seqno,
465 retransmit_entryoffset(), retransmit_refoffset());
467 return 0;
470 MODULE_LICENSE("GPL");